dl_data_pipeline.pipeline package

Submodules

dl_data_pipeline.pipeline.data_pipeline module

data_pipeline.py

This module defines the Pipeline class, which is used to create and manage data processing pipelines.

A Pipeline consists of interconnected PipeNode objects that define a sequence of operations, allowing for the execution of complex data processing flows. The Pipeline class supports validation of the processed data through user-defined Validator objects and ensures that the data flows correctly from inputs to outputs.

Classes:
Pipeline: Manages the flow of data from input PipeNode objects through a series of operations

to produce output PipeNode objects. Supports validation and execution of the pipeline.

class dl_data_pipeline.pipeline.data_pipeline.Pipeline(inputs: PipelineNode | list[PipelineNode], outputs: PipelineNode | list[PipelineNode])[source]

Bases: object

A class that represents a data processing pipeline consisting of interconnected PipeNode objects.

The Pipeline class manages the flow of data from input nodes through a series of operations to produce output nodes. It supports validation of output data through user-defined Validator objects.

Parameters:
  • inputs (PipeNode | list[PipeNode]) – A single PipeNode or a list of PipeNode objects that serve as the inputs to the pipeline.

  • outputs (PipeNode | list[PipeNode]) – A single PipeNode or a list of PipeNode objects that serve as the outputs of the pipeline.

Raises:

ValueError – If inputs or outputs is not a PipeNode or a list of PipeNode.

Example:

>>> # Create a simple pipeline
>>> input_node = PipeNode()
>>> output_node = PipeNode(parent=input_node)
>>> pipeline = Pipeline(inputs=input_node, outputs=output_node)
>>> # Add a validator to the output
>>> validator = MyValidator()
>>> pipeline.add_validator(validator, output_index=0)
>>> # Execute the pipeline
>>> result = pipeline(input_data)
__call__(*args: Any) Any[source]

Init the input node of the graph, execute in topological order and return the output nodes.

Raises:
  • ValueError – args isn’t same length as input length.

  • ValidationError – schema of output doesn’t validate the output.

  • RuntimeError – a node produced a runtime error.

Returns:

output of the graph.

Return type:

Any

add_validator(validator: Validator, output_index: int) None[source]

Add a validator for an output

Parameters:
  • validator (Validator) – validator object.

  • output_index (int) – index of the output to validate.

Raises:
  • TypeError – Not a Validator type.

  • IndexError – Index is out of bound for outputs list.

as_deferred(*args) PipelineNode[source]

Use pipeline as a Node function for an other pipeline

Raises:

ValueError – No PipeNode were passed as argument.

Returns:

The corresponding Node.

Return type:

PipeNode

dl_data_pipeline.pipeline.input_node module

input_node.py

This module defines the InputNode class, a specialized subclass of PipeNode that represents an input node in a data processing pipeline.

An InputNode is used to provide initial data into a pipeline, serving as the starting point for data flow. It does not perform any computation itself but instead holds a value that can be used by other nodes in the pipeline.

Classes:

InputNode: A subclass of PipeNode designed to serve as an input node in a pipeline.

class dl_data_pipeline.pipeline.input_node.InputNode(name: str = '')[source]

Bases: PipelineNode

A class representing an input node in a data processing pipeline.

The InputNode class is a specialized subclass of PipeNode that is designed to hold initial input data for a pipeline. Unlike other nodes, InputNode does not perform any computation; it simply stores a value that can be accessed by other nodes in the pipeline.

Parameters:

name (str, optional) – An optional name for the input node. Defaults to an empty string.

__getitem__(key: int | str | Any) PipelineNode

Access an item in a subscriptable value of the node.

This method creates a new PipelineNode that represents the key-th or key-named item from the current node’s value if the value is subscriptable (supports indexing). If the value is not subscriptable, a TypeError is raised.

Parameters:

key (int | str | Any) – The key to access the item from the subscriptable value. Can be an integer (for index-based access) or a string (for key-based access).

Raises:

TypeError – If the node’s value is not subscriptable.

Returns:

A new node representing the accessed item from the subscriptable object.

Return type:

PipelineNode

Example:

>>> # create a node
>>> node = PipeNode()
>>> # execute a function on the node that could result in a subscriptable object
>>> # for example, a function that reads some columns from a CSV row and returns
>>> # an iterable of values
>>> x = read_cols_in_csv(node, "name", "path")
>>> # get the results in separate nodes
>>> name = x[0]  # node representing the 'name'
>>> path = x[1]  # node representing the 'path'
_set_value(value: Any) None

Set the value of the current node.

Parameters:

value (Any) – value to be stored in the node.

execute() None

Excecute the function stored in the node with parent values as argument.

property parent: list[PipelineNode]

Parents of the node

Returns:

a list of the parents of this node.

Return type:

list[PipeNode]

unwrap(num: int) PipelineNode

Prepare the node for iteration with a specified number of outputs.

This method sets the number of iterations (num) that the node should be unwrapped into. It allows the node to be used in a context where multiple outputs are expected, like tuple unpacking.

Parameters:

num (int) – The number of outputs expected from the node.

Returns:

The node itself, ready to be iterated over the specified

number of outputs.

Return type:

PipelineNode

Example:

>>> # Define a function that returns a subscriptable object (e.g., a tuple)
>>> @deferred_execution
>>> def min_max(v1, v2):
>>>     return np.min(v1), np.max(v2)
>>> # Define the pipeline
>>> input_node = InputNode(name="1")
>>> x = ...  # Add some processing functions here ...
>>> min_node, max_node = min_max(x).unwrap(2)
>>> pipeline = Pipeline(inputs=[input_node], outputs=[min_node, max_node])
>>> # Unwrap allows you to get multiple nodes from a single node when the value
>>> # is a subscriptable object, like a tuple.
>>> min_value, max_value = pipeline(data)  # 'data' must be defined with the expected input.
property value: Any

Current value of the node.

Returns:

value of the node.

Return type:

Any

dl_data_pipeline.pipeline.pipe_node module

pipe_node.py

This module defines the PipeNode class, which is a fundamental component for building and managing data processing pipelines.

A PipeNode represents a node in a directed acyclic graph (DAG) that performs an operation on its parent nodes. The PipeNode class supports deferred execution, where data processing operations are only executed when explicitly called. It allows for building complex data processing workflows in a modular and flexible manner.

Classes:

PipeNode: A node in a pipeline that can execute a function based on the values of its parent nodes.

Usage Example:

>>> from dl_data_pipeline import PipeNode
>>> def add(x, y):
>>>     return x + y
>>> # Create nodes
>>> node1 = PipeNode(name="input1")
>>> node2 = PipeNode(name="input2")
>>> node3 = PipeNode(func=add, parent=[node1, node2], name="sum")
>>> # Set values for input nodes
>>> node1._set_value(3)
>>> node2._set_value(4)
>>> # Execute the node with the function
>>> node3.execute()
>>> # Get the result
>>> result = node3.value  # result should be 7
class dl_data_pipeline.pipeline.pipe_node.PipelineNode(func: Callable = None, parent: list[PipelineNode] | None = None, name: str | None = None)[source]

Bases: object

A class representing a node in a data processing pipeline.

The PipeNode class is used to build directed acyclic graphs (DAGs) where each node can execute a function based on the values of its parent nodes. This allows for deferred execution of data processing steps, enabling the construction of complex and modular data pipelines.

Parameters:
  • func (Callable, optional) – A function that takes the values from the parent nodes as inputs and returns the computed result. Defaults to None.

  • parent (list[PipeNode], optional) – A list of parent PipeNode objects whose values are used as inputs to the function. Defaults to None.

  • name (str | None, optional) – An optional name for the node. Defaults to None.

__getitem__(key: int | str | Any) PipelineNode[source]

Access an item in a subscriptable value of the node.

This method creates a new PipelineNode that represents the key-th or key-named item from the current node’s value if the value is subscriptable (supports indexing). If the value is not subscriptable, a TypeError is raised.

Parameters:

key (int | str | Any) – The key to access the item from the subscriptable value. Can be an integer (for index-based access) or a string (for key-based access).

Raises:

TypeError – If the node’s value is not subscriptable.

Returns:

A new node representing the accessed item from the subscriptable object.

Return type:

PipelineNode

Example:

>>> # create a node
>>> node = PipeNode()
>>> # execute a function on the node that could result in a subscriptable object
>>> # for example, a function that reads some columns from a CSV row and returns
>>> # an iterable of values
>>> x = read_cols_in_csv(node, "name", "path")
>>> # get the results in separate nodes
>>> name = x[0]  # node representing the 'name'
>>> path = x[1]  # node representing the 'path'
_set_value(value: Any) None[source]

Set the value of the current node.

Parameters:

value (Any) – value to be stored in the node.

execute() None[source]

Excecute the function stored in the node with parent values as argument.

property parent: list[PipelineNode]

Parents of the node

Returns:

a list of the parents of this node.

Return type:

list[PipeNode]

unwrap(num: int) PipelineNode[source]

Prepare the node for iteration with a specified number of outputs.

This method sets the number of iterations (num) that the node should be unwrapped into. It allows the node to be used in a context where multiple outputs are expected, like tuple unpacking.

Parameters:

num (int) – The number of outputs expected from the node.

Returns:

The node itself, ready to be iterated over the specified

number of outputs.

Return type:

PipelineNode

Example:

>>> # Define a function that returns a subscriptable object (e.g., a tuple)
>>> @deferred_execution
>>> def min_max(v1, v2):
>>>     return np.min(v1), np.max(v2)
>>> # Define the pipeline
>>> input_node = InputNode(name="1")
>>> x = ...  # Add some processing functions here ...
>>> min_node, max_node = min_max(x).unwrap(2)
>>> pipeline = Pipeline(inputs=[input_node], outputs=[min_node, max_node])
>>> # Unwrap allows you to get multiple nodes from a single node when the value
>>> # is a subscriptable object, like a tuple.
>>> min_value, max_value = pipeline(data)  # 'data' must be defined with the expected input.
property value: Any

Current value of the node.

Returns:

value of the node.

Return type:

Any

Module contents

This package provides the core components for building and managing data processing pipelines.

The package includes the following classes:

Classes:

Pipeline: Manages the flow of data from input PipeNode objects through a series of operations to produce output PipeNode objects. Supports validation and execution of the pipeline. PipeNode: A node in a pipeline that can execute a function based on the values of its parent nodes, allowing for deferred execution and modular data processing. InputNode: A specialized PipeNode that represents the entry point of data into the pipeline, holding initial input values without performing any computation.

Usage Example:

>>> # Import lib
>>> from dl_data_pipeline import Pipeline, InputNode, deferred_execution
>>> # Create input nodes
>>> input_node1 = InputNode(name="1")   # InputNode base name is input, it then concatenate
>>> input_node2 = InputNode(name="2")   # Any subname passed by user
>>> # use any function, you can also create one
>>> @deferred_execution
>>> def sum(v1, v2):
>>>     return v1 + v2
>>> # create a functional PipelineNode with this function
>>> add_node = sum(input_node1, input_node2)
>>> # Create a pipeline
>>> pipeline = Pipeline(inputs=[input_node1, input_node2], outputs=add_node)
>>> # Compute pipe
>>> result = pipeline(20, 10) # will be 30
Modules:

data_pipeline: Contains the Pipeline class for managing the data flow in a pipeline. pipe_node: Contains the PipeNode class, the basic building block for creating data processing graphs. input_node: Contains the InputNode class, a specialized node for inputting data into the pipeline.