This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 6e04ecf6c820a96804272f5d9d18570930dd93b7 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Wed Apr 6 11:54:08 2022 +0200 [WAYANG-#8] add TextFileSourceOperator Signed-off-by: bertty <[email protected]> --- python/src/pywayang/platforms/python/channels.py | 19 +++++++++-- python/src/pywayang/platforms/python/mappings.py | 1 + .../python/operators/PyTextFileSourceOperator.py | 38 ++++++++++++++++++++++ .../platforms/python/operators/__init__.py | 4 ++- 4 files changed, 59 insertions(+), 3 deletions(-) diff --git a/python/src/pywayang/platforms/python/channels.py b/python/src/pywayang/platforms/python/channels.py index f611a258..a2863677 100644 --- a/python/src/pywayang/platforms/python/channels.py +++ b/python/src/pywayang/platforms/python/channels.py @@ -32,7 +32,7 @@ class PyIteratorChannel(Channel): def provide_iterable(self) -> Iterable: return self.iterable - def accept_iterable(self, iterable) -> 'PyIteratorChannel': + def accept_iterable(self, iterable: Iterable) -> 'PyIteratorChannel': self.iterable = iterable return self @@ -56,5 +56,20 @@ class PyCallableChannel(Channel): return function_a(function_b(iterable)) return executable +class PyFileChannel(Channel): + + path : str + + def __init__(self): + Channel.__init__(self) + + def provide_path(self) -> str: + return self.path + + def accept_path(self, path: str) -> 'PyIteratorChannel': + self.path = path + return self + PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False) -PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False) \ No newline at end of file +PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False) +PyFileChannelDescriptor = ChannelDescriptor(type(PyFileChannel()), False, False) \ No newline at end of file diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py index 977ccada..55a80180 100644 --- a/python/src/pywayang/platforms/python/mappings.py +++ b/python/src/pywayang/platforms/python/mappings.py @@ -32,4 +32,5 @@ class Mapping: OperatorMappings = Mapping() OperatorMappings.add_mapping(PyFilterOperator()) +OperatorMappings.add_mapping(PyTextFileSourceOperator()) diff --git a/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py new file mode 100644 index 00000000..ccfbec48 --- /dev/null +++ b/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py @@ -0,0 +1,38 @@ +from pywayang.operator.source import TextFileSource +from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator +from pywayang.platforms.python.channels import ( + Channel, + ChannelDescriptor, + PyIteratorChannel, + PyIteratorChannelDescriptor, + PyFileChannelDescriptor + ) +from typing import Set + +class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator): + + def __init__(self, origin: TextFileSource = None): + path = None if origin is None else origin.path + super().__init__(path) + pass + + def execute(self, inputs: Channel, outputs: Channel): + self.validateChannels(inputs, outputs) + if isinstance(outputs[0], PyIteratorChannel) : + py_out_iter_channel: PyIteratorChannel = outputs[0] + py_out_iter_channel.accept_iterable( + open( + self.path, + 'r' + ) + ) + + else: + raise Exception("Channel Type does not supported") + + + def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]: + raise Exception("The PyTextFileSource does not support Input Channels") + + def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]: + return {PyIteratorChannelDescriptor} diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py index 208a2fc0..5db92431 100644 --- a/python/src/pywayang/platforms/python/operators/__init__.py +++ b/python/src/pywayang/platforms/python/operators/__init__.py @@ -1,7 +1,9 @@ from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator +from pywayang.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator __ALL__ = [ PythonExecutionOperator, - PyFilterOperator + PyFilterOperator, + PyTextFileSourceOperator ] \ No newline at end of file
