Skip to content

Pipe

Modification of the https://github.com/JulienPalard/Pipe project to support arbitrary objects as input.

See https://stackoverflow.com/a/69790644.

EnterPipe

Processing using pipes. Wrap input value in EnterPipe, chain processing functions, then mark the end of the pipe using ExitPipe.

Parameters:

Name Type Description Default
obj Any

The wrapped pipe input object.

required

Example:

xs = np.array([1, 2, 3])
ys = EnterPipe(xs) | (lambda x: 2 * x) | np.median | ExitPipe
print(ys) # 4.0
Source code in opskrift/pipe.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class EnterPipe:
    """Processing using pipes.
    Wrap input value in `EnterPipe`, chain processing functions,
    then mark the end of the pipe using `ExitPipe`.

    Args:
        obj (Any): The wrapped pipe input object.

    Example:
    ```
    xs = np.array([1, 2, 3])
    ys = EnterPipe(xs) | (lambda x: 2 * x) | np.median | ExitPipe
    print(ys) # 4.0
    ```
    """

    def __init__(self, obj: Any):
        self._object = obj

    def __or__(self, other):
        if other is ExitPipe:
            return self._object

        if not callable(other):
            raise TypeError(f"pipe element '{other}' is not callable")

        return EnterPipe(other(self._object))