This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit aafc8a026bba31c9b42ee56e0661129d8b710034 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Fri Apr 8 10:31:07 2022 +0200 [WAYANG-#8] Correction in the channel types Signed-off-by: bertty <[email protected]> --- python/src/pywy/core/channel.py | 6 ++++++ .../src/pywy/platforms/python/operator/py_execution_operator.py | 6 ++++-- python/src/pywy/platforms/python/operator/py_sink_textfile.py | 9 +++++---- python/src/pywy/platforms/python/operator/py_source_textfile.py | 7 ++++--- python/src/pywy/platforms/python/operator/py_unary_filter.py | 7 ++++--- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/python/src/pywy/core/channel.py b/python/src/pywy/core/channel.py index a4f3bb97..c10cc09c 100644 --- a/python/src/pywy/core/channel.py +++ b/python/src/pywy/core/channel.py @@ -1,3 +1,6 @@ +from typing import TypeVar + + class Channel: def __init__(self): @@ -19,3 +22,6 @@ class ChannelDescriptor: def create_instance(self) -> Channel: return self.channelType() + + +CH_T = TypeVar('CH_T', bound=Channel) diff --git a/python/src/pywy/platforms/python/operator/py_execution_operator.py b/python/src/pywy/platforms/python/operator/py_execution_operator.py index a2e82590..9b5f1a75 100644 --- a/python/src/pywy/platforms/python/operator/py_execution_operator.py +++ b/python/src/pywy/platforms/python/operator/py_execution_operator.py @@ -1,5 +1,7 @@ +from typing import List, Type + +from pywy.core.channel import CH_T from pywy.operators.base import PywyOperator -from pywy.platforms.python.channels import Channel class PyExecutionOperator(PywyOperator): @@ -7,5 +9,5 @@ class PyExecutionOperator(PywyOperator): def prefix(self) -> str: return 'Py' - def execute(self, inputs: Channel, output: Channel): + def execute(self, inputs: List[Type[CH_T]], output: List[Type[CH_T]]): pass diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py index 429da96c..ab5f5af7 100644 --- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py @@ -1,8 +1,9 @@ -from typing import Set +from typing import Set, List, Type + +from pywy.core.channel import CH_T from pywy.operators.sink import TextFileSink from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator from pywy.platforms.python.channels import ( - Channel, ChannelDescriptor, PyIteratorChannel, PY_ITERATOR_CHANNEL_DESCRIPTOR @@ -16,12 +17,12 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator): super().__init__(path) pass - def execute(self, inputs: Channel, outputs: Channel): + def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): self.validate_channels(inputs, outputs) if isinstance(inputs[0], PyIteratorChannel): file = open(self.path, 'w') py_in_iter_channel: PyIteratorChannel = inputs[0] - iterable = py_in_iter_channel.provide_iterable(); + iterable = py_in_iter_channel.provide_iterable() for element in iterable: file.write(str(element)) file.close() diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py index 7fdbf740..067ec72b 100644 --- a/python/src/pywy/platforms/python/operator/py_source_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py @@ -1,8 +1,9 @@ -from typing import Set +from typing import Set, List, Type + +from pywy.core.channel import CH_T from pywy.operators.source import TextFileSource from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator from pywy.platforms.python.channels import ( - Channel, ChannelDescriptor, PyIteratorChannel, PY_ITERATOR_CHANNEL_DESCRIPTOR @@ -16,7 +17,7 @@ class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator): super().__init__(path) pass - def execute(self, inputs: Channel, outputs: Channel): + def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): self.validate_channels(inputs, outputs) if isinstance(outputs[0], PyIteratorChannel): py_out_iter_channel: PyIteratorChannel = outputs[0] diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py index b08bb5ca..c5b2cde7 100644 --- a/python/src/pywy/platforms/python/operator/py_unary_filter.py +++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py @@ -1,8 +1,9 @@ -from typing import Set +from typing import Set, List, Type + +from pywy.core.channel import CH_T from pywy.operators.unary import FilterOperator from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator from pywy.platforms.python.channels import ( - Channel, ChannelDescriptor, PyIteratorChannel, PY_ITERATOR_CHANNEL_DESCRIPTOR, @@ -18,7 +19,7 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator): super().__init__(predicate) pass - def execute(self, inputs: Channel, outputs: Channel): + def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): self.validate_channels(inputs, outputs) udf = self.predicate if isinstance(inputs[0], PyIteratorChannel):
