Source code for dl_data_pipeline.deferred.deferred_wrapper

from functools import wraps
from contextlib import contextmanager
from ..pipeline.pipe_node import PipelineNode

_DEFERRED_EXECUTION_MODE = True

[docs] @contextmanager def instant_excecution() -> None: # type: ignore """ Context manager to temporarily disable deferred execution and execute functions immediately. This context manager allows the user to execute functions immediately even when they are decorated with `@deferred_function`, bypassing the default deferred execution mode. Upon entering the context (`with instant_execution():`), deferred execution is disabled. Once the block is exited, deferred execution is restored to its previous state, even if an exception occurs within the block. Note: This context manager ensures that deferred execution is always restored after the *with* block, even if an exception is raised. Example usage: >>> @deferred_function >>> def add(a, b): >>> return a + b >>> # Deferred execution mode (default) >>> result_node = add(PipelineNode(), 5) >>> # Instant execution mode (within context) >>> with instant_execution(): >>> result = add(3, 5) >>> # Executes immediately and returns 8 """ global _DEFERRED_EXECUTION_MODE _DEFERRED_EXECUTION_MODE = False try: yield # Execute the block finally: _DEFERRED_EXECUTION_MODE = True # Restore previous mode
[docs] def _create_pipeline_node(func, *args, **kwargs) -> PipelineNode: # Split args into Node / Non Node args_no_data = [] parents = [] for arg in args: if isinstance(arg, PipelineNode): parents.append(arg) else: args_no_data.append(arg) # ensure there is at least ONE PipeNode if len(parents) == 0: raise TypeError("Found no argument with type PipelineNode as positional argument" "If you write a pipeline function, test it without `deferred_execution`" "decorator") # TODO change to an other exception # kw_no_data = {k: v for k, v in kwargs.items() if not isinstance(v, PipeNode)} deferred_func = lambda *data: func(*data, *args_no_data, **kwargs) deferred_func.__name__ = func.__name__ return PipelineNode(deferred_func, parents)
[docs] def deferred_execution(func): """ A decorator that defers the execution of a function until the actual data is provided. The first argument of the function must be either DATA_PLACEHOLDER or None, indicating that the function execution should be deferred. When the function is eventually called with the actual data, the deferred execution is triggered. Args: func (callable): The function to be deferred. Returns: callable: A lambda function that takes the data as its first argument and executes the original function with the deferred arguments. """ @wraps(func) def wrapper(*args, **kwargs): # check if execute_now in kwargs if _DEFERRED_EXECUTION_MODE: return _create_pipeline_node(func, *args, **kwargs) # check if any PipelineNode in args, else error elif any([isinstance(arg, PipelineNode) for arg in args]): raise TypeError(f"function `{func.__name__}` received PipelineNode argument" "in a context where execution is non deferred" "It's probably in incorrect usage of `with instant_excecution()`.") else: return func(*args, **kwargs) return wrapper