"""
data_pipeline.py
This module defines the `Pipeline` class, which is used to create and manage data processing pipelines.
A `Pipeline` consists of interconnected `PipeNode` objects that define a sequence of operations, allowing
for the execution of complex data processing flows. The `Pipeline` class supports validation of the processed
data through user-defined `Validator` objects and ensures that the data flows correctly from inputs to outputs.
Classes:
Pipeline: Manages the flow of data from input `PipeNode` objects through a series of operations
to produce output `PipeNode` objects. Supports validation and execution of the pipeline.
"""
from collections.abc import Callable
from typing import List, Any, Tuple, Dict
from .pipe_node import PipelineNode
from ..validator import Validator
[docs]
class Pipeline:
"""
A class that represents a data processing pipeline consisting of interconnected `PipeNode` objects.
The `Pipeline` class manages the flow of data from input nodes through a series of operations
to produce output nodes. It supports validation of output data through user-defined `Validator` objects.
Args:
inputs (PipeNode | list[PipeNode]): A single `PipeNode` or a list of `PipeNode` objects
that serve as the inputs to the pipeline.
outputs (PipeNode | list[PipeNode]): A single `PipeNode` or a list of `PipeNode` objects
that serve as the outputs of the pipeline.
Raises:
ValueError: If `inputs` or `outputs` is not a `PipeNode` or a list of `PipeNode`.
Example:
>>> # Create a simple pipeline
>>> input_node = PipeNode()
>>> output_node = PipeNode(parent=input_node)
>>> pipeline = Pipeline(inputs=input_node, outputs=output_node)
>>> # Add a validator to the output
>>> validator = MyValidator()
>>> pipeline.add_validator(validator, output_index=0)
>>> # Execute the pipeline
>>> result = pipeline(input_data)
"""
def __init__(self, inputs: PipelineNode | list[PipelineNode], outputs: PipelineNode | list[PipelineNode]) -> None:
if not (isinstance(inputs, PipelineNode) or (isinstance(inputs, list) and all(isinstance(x, PipelineNode) for x in inputs))):
raise TypeError("inputs must be a PipeNode or a list of PipeNode")
if not (isinstance(outputs, PipelineNode) or (isinstance(outputs, list) and all(isinstance(x, PipelineNode) for x in outputs))):
raise TypeError("outputs must be a PipeNode or a list of PipeNode")
self.__inputs = inputs if isinstance(inputs, list) else [inputs]
self.__outputs = outputs if isinstance(outputs, list) else [outputs]
self.__validators: list[list[Validator]] = [[] for _ in range(len(self.__outputs))]
# init an exec graph
exec_graph: list[PipelineNode] = []
visited = set()
virtual_node = PipelineNode(parent = self.__outputs)
# lambda func to get ordered in topological order
def build_topo(v: PipelineNode):
if v not in visited:
visited.add(v)
for child in v.parent:
build_topo(child)
exec_graph.append(v)
# build the actual graph
build_topo(virtual_node)
self.__exec_graph = exec_graph[:-1] # remove the last ghost node
[docs]
def add_validator(self, validator: Validator, output_index: int) -> None:
"""Add a validator for an output
Args:
validator (Validator): validator object.
output_index (int): index of the output to validate.
Raises:
TypeError: Not a Validator type.
IndexError: Index is out of bound for outputs list.
"""
if not isinstance(validator, Validator):
raise TypeError(f"validator must be Validator type, not {type(validator)}")
if output_index >= len(self.__outputs) or output_index < 0:
raise IndexError(f"output_index isn't included in [0, len(outputs)[")
self.__validators[output_index].append(validator)
[docs]
def __call__(self, *args: Any) -> Any:
"""Init the input node of the graph, execute in topological order
and return the output nodes.
Raises:
ValueError: args isn't same length as input length.
ValidationError: schema of output doesn't validate the output.
RuntimeError: a node produced a runtime error.
Returns:
Any: output of the graph.
"""
if len(args) != len(self.__inputs):
raise ValueError(f"Pipeline takes {len(self.__inputs)} positional(s) argument(s), "
f"but {len(args)} were(was) provided")
# set value in inputs node
for node, arg in zip(self.__inputs, args):
node._set_value(arg)
# excecute
for node in self.__exec_graph:
try:
node.execute()
except Exception as e:
raise RuntimeError("Error at runtime when excecuting the graph"
f"during the node : {node}."
f"Exception : {e}.")
# return the output node value
output = [node.value for node in self.__outputs]
# validate data
for output_data, validators in zip(output, self.__validators):
for validator in validators:
validator.validate(output_data)
if len(output) == 1:
return output[0]
return output
[docs]
def as_deferred(self, *args) -> PipelineNode:
"""Use pipeline as a Node function for an other pipeline
Raises:
ValueError: No PipeNode were passed as argument.
Returns:
PipeNode: The corresponding Node.
"""
# Split args into Node / Non Node
args_no_data = []
parents = []
for arg in args:
if isinstance(arg, PipelineNode):
parents.append(arg)
else:
args_no_data.append(arg)
# ensure there is at least ONE PipeNode
if len(parents) == 0:
raise TypeError("Found no PipeNode as positional argument"
"If you write a pipeline function, test it without `deferred_execution`"
"decorator")
# kw_no_data = {k: v for k, v in kwargs.items() if not isinstance(v, PipeNode)}
deferred_func = lambda *data: self(*data, *args_no_data)
deferred_func.__name__ = "Pipeline"
return PipelineNode(deferred_func, parents)
def __repr__(self) -> str:
return (f"{self.__class__.__name__}("
f"num_inputs = {len(self.__inputs)}, "
f"num_outputs = {len(self.__outputs)}, "
f"num_validators = {len(self.__validators)})")