This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 8b5048fd79f50b1b50c84e483207e1a45ef811ef Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Fri Apr 8 12:01:53 2022 +0200 [WAYANG-#8] add PyMapOperator and minnors Signed-off-by: bertty <[email protected]> --- python/src/pywy/core/channel.py | 1 + python/src/pywy/core/plan.py | 4 +- python/src/pywy/core/translator.py | 2 +- python/src/pywy/dataquanta.py | 5 ++- python/src/pywy/graph/graph.py | 8 ++-- python/src/pywy/graph/types.py | 34 +++++++-------- python/src/pywy/operators/base.py | 16 ++++--- python/src/pywy/operators/sink.py | 4 +- python/src/pywy/operators/unary.py | 7 ---- python/src/pywy/platforms/python/execution.py | 37 +++++++++------- python/src/pywy/platforms/python/mappings.py | 1 + .../src/pywy/platforms/python/operator/__init__.py | 4 +- .../python/operator/py_execution_operator.py | 2 +- .../platforms/python/operator/py_sink_textfile.py | 12 ++++-- .../pywy/platforms/python/operator/py_unary_map.py | 49 ++++++++++++++++++++++ .../pywy/tests/integration/python_platform_test.py | 31 +++++++++++++- 16 files changed, 155 insertions(+), 62 deletions(-) diff --git a/python/src/pywy/core/channel.py b/python/src/pywy/core/channel.py index c10cc09c..e915c98a 100644 --- a/python/src/pywy/core/channel.py +++ b/python/src/pywy/core/channel.py @@ -25,3 +25,4 @@ class ChannelDescriptor: CH_T = TypeVar('CH_T', bound=Channel) +CHD_T = TypeVar('CHD_T', bound=ChannelDescriptor) diff --git a/python/src/pywy/core/plan.py b/python/src/pywy/core/plan.py index 1826f90e..fc8f7360 100644 --- a/python/src/pywy/core/plan.py +++ b/python/src/pywy/core/plan.py @@ -35,7 +35,7 @@ class PywyPlan: ) ) - self.graph.traversal(None, self.graph.starting_nodes, print_plan) + self.graph.traversal(self.graph.starting_nodes, print_plan) def printTuple(self): def print_plan(current: NodeVec, previous: NodeVec): @@ -56,4 +56,4 @@ class PywyPlan: ) ) - self.graph.traversal(None, self.graph.starting_nodes, print_plan) + self.graph.traversal(self.graph.starting_nodes, print_plan) diff --git a/python/src/pywy/core/translator.py b/python/src/pywy/core/translator.py index 0e0acfcb..72b6f01a 100644 --- a/python/src/pywy/core/translator.py +++ b/python/src/pywy/core/translator.py @@ -32,7 +32,7 @@ class Translator: # TODO not necesary it it 0 current_op.current[1].connect(0, next_op.current[1], 0) - graph.traversal(None, graph.starting_nodes, translate2plugin) + graph.traversal(graph.starting_nodes, translate2plugin) node = [] for elem in graph.starting_nodes: diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index 283286e5..956d815e 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -1,6 +1,7 @@ from typing import Set, List, cast from pywy.core import Translator +from pywy.operators.base import PO_T from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableOut, T, In, Out) from pywy.operators import * from pywy.core import PywyPlan @@ -64,7 +65,7 @@ class DataQuanta(GenericTco): return DataQuanta(self.context, self._connect(FlatmapOperator(f))) def store_textfile(self: "DataQuanta[In]", path: str): - last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path)))] + last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path, self.operator.outputSlot[0])))] plan = PywyPlan(self.context.plugins, last) plug = self.context.plugins.pop() @@ -73,7 +74,7 @@ class DataQuanta(GenericTco): plug.get_executor().execute(new_plan) # TODO add the logic to execute the plan - def _connect(self, op: PywyOperator, port_op: int = 0) -> PywyOperator: + def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator: self.operator.connect(0, op, port_op) return op diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py index 863e4a6a..c6bbd5b5 100644 --- a/python/src/pywy/graph/graph.py +++ b/python/src/pywy/graph/graph.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from pywy.types import T, K from typing import (Iterable, Dict, Callable, Any, Generic, Optional, List) @@ -22,7 +24,7 @@ class GraphNode(Generic[K, T]): if len(adjacent) == 0: return [] - def wrap(op: T) -> 'GraphNode[K, T]': + def wrap(op: T) -> Optional['GraphNode[K, T]'] | None: if op is None: return None if op not in created: @@ -59,12 +61,12 @@ class WayangGraph(Generic[K, T]): def traversal( self, - origin: GraphNode[K, T], nodes: Iterable[GraphNode[K, T]], udf: Callable[[GraphNode[K, T], GraphNode[K, T]], Any], + origin: Optional[GraphNode[K, T]] = None, visit_status: bool = True ): for node in nodes: adjacent = node.walk(self.created_nodes) - self.traversal(node, adjacent, udf, visit_status) + self.traversal(adjacent, udf, node, visit_status) node.visit(origin, udf) diff --git a/python/src/pywy/graph/types.py b/python/src/pywy/graph/types.py index bc99efa0..c72876fd 100644 --- a/python/src/pywy/graph/types.py +++ b/python/src/pywy/graph/types.py @@ -1,45 +1,45 @@ from typing import (Iterable, List) from pywy.graph.graph import (GraphNode, WayangGraph) -from pywy.operators.base import PywyOperator +from pywy.operators.base import PywyOperator, PO_T -class NodeOperator(GraphNode[PywyOperator, PywyOperator]): +class NodeOperator(GraphNode[PO_T, PO_T]): - def __init__(self, op: PywyOperator): + def __init__(self, op: PO_T): super(NodeOperator, self).__init__(op) - def get_adjacents(self) -> List[PywyOperator]: - operator: PywyOperator = self.current + def get_adjacents(self) -> List[PO_T]: + operator: PO_T = self.current if operator is None or operator.inputs == 0: return [] return operator.inputOperator - def build_node(self, t: PywyOperator) -> 'NodeOperator': + def build_node(self, t: PO_T) -> 'NodeOperator': return NodeOperator(t) -class WGraphOfOperator(WayangGraph[PywyOperator, NodeOperator]): +class WGraphOfOperator(WayangGraph[PO_T, NodeOperator]): - def __init__(self, nodes: Iterable[PywyOperator]): + def __init__(self, nodes: Iterable[PO_T]): super(WGraphOfOperator, self).__init__(nodes) - def build_node(self, t: PywyOperator) -> NodeOperator: + def build_node(self, t: PO_T) -> NodeOperator: return NodeOperator(t) -class NodeVec(GraphNode[PywyOperator, List[PywyOperator]]): +class NodeVec(GraphNode[PO_T, List[PO_T]]): - def __init__(self, op: PywyOperator): + def __init__(self, op: PO_T): super(NodeVec, self).__init__([op, None]) - def get_adjacents(self) -> List[PywyOperator]: - operator: PywyOperator = self.current[0] + def get_adjacents(self) -> List[PO_T]: + operator: PO_T = self.current[0] if operator is None or operator.inputs == 0: return [] return operator.inputOperator - def build_node(self, t: PywyOperator) -> 'NodeVec': + def build_node(self, t: PO_T) -> 'NodeVec': return NodeVec(t) def __str__(self): @@ -49,10 +49,10 @@ class NodeVec(GraphNode[PywyOperator, List[PywyOperator]]): return self.__str__() -class WGraphOfVec(WayangGraph[PywyOperator, NodeVec]): +class WGraphOfVec(WayangGraph[PO_T, NodeVec]): - def __init__(self, nodes: Iterable[PywyOperator]): + def __init__(self, nodes: Iterable[PO_T]): super(WGraphOfVec, self).__init__(nodes) - def build_node(self, t: PywyOperator) -> NodeVec: + def build_node(self, t: PO_T) -> NodeVec: return NodeVec(t) diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py index 3b5cca9e..c8c4b6bf 100644 --- a/python/src/pywy/operators/base.py +++ b/python/src/pywy/operators/base.py @@ -1,17 +1,18 @@ from typing import (TypeVar, Optional, List, Set) -from pywy.core import ChannelDescriptor, Channel +from pywy.core import ChannelDescriptor +from pywy.core.channel import CH_T, CHD_T class PywyOperator: inputSlot: List[TypeVar] - inputChannel: List[Channel] - inputChannelDescriptor: List[ChannelDescriptor] + inputChannel: List[CH_T] + inputChannelDescriptor: List[CHD_T] inputOperator: List['PywyOperator'] inputs: int outputSlot: List[TypeVar] - outputChannel: List[Channel] - outputChannelDescriptor: List[ChannelDescriptor] + outputChannel: List[CH_T] + outputChannelDescriptor: List[CHD_T] outputOperator: List['PywyOperator'] outputs: int @@ -54,7 +55,7 @@ class PywyOperator: self.validate_inputs(inputs) self.validate_outputs(outputs) - def connect(self, port: int, that: 'PywyOperator', port_that: int): + def connect(self, port: int, that: 'PO_T', port_that: int): self.outputOperator[port] = that that.inputOperator[port_that] = self @@ -86,3 +87,6 @@ class PywyOperator: def __repr__(self): return self.__str__() + + +PO_T = TypeVar('PO_T', bound=PywyOperator) diff --git a/python/src/pywy/operators/sink.py b/python/src/pywy/operators/sink.py index 6b13b676..1f78a63c 100644 --- a/python/src/pywy/operators/sink.py +++ b/python/src/pywy/operators/sink.py @@ -26,8 +26,8 @@ class TextFileSink(SinkUnaryOperator): path: str - def __init__(self, path: str): - super().__init__('TextFile') + def __init__(self, path: str, input_type: GenericTco): + super().__init__('TextFile', input_type) self.path = path def __str__(self): diff --git a/python/src/pywy/operators/unary.py b/python/src/pywy/operators/unary.py index cfd585f1..03d6f118 100644 --- a/python/src/pywy/operators/unary.py +++ b/python/src/pywy/operators/unary.py @@ -52,13 +52,6 @@ class MapOperator(UnaryToUnaryOperator): super().__init__("Map", types[0], types[1]) self.function = function - # TODO remove wrapper - def getWrapper(self): - udf = self.function - def func(iterator): - return map(udf, iterator) - return func - def __str__(self): return super().__str__() diff --git a/python/src/pywy/platforms/python/execution.py b/python/src/pywy/platforms/python/execution.py index 7f8803eb..bab62ecc 100644 --- a/python/src/pywy/platforms/python/execution.py +++ b/python/src/pywy/platforms/python/execution.py @@ -1,9 +1,8 @@ -from typing import List - from pywy.graph.types import WGraphOfOperator, NodeOperator -from pywy.core import Channel +from pywy.core import ChannelDescriptor from pywy.core import Executor from pywy.core import PywyPlan +from pywy.platforms.python.channels import PY_ITERATOR_CHANNEL_DESCRIPTOR from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator @@ -16,7 +15,10 @@ class PyExecutor(Executor): pywyPlan: PywyPlan = plan graph = WGraphOfOperator(pywyPlan.sinks) - def exec(op_current: NodeOperator, op_next: NodeOperator): + # TODO get this information by a configuration and ideally by the context + descriptor_default: ChannelDescriptor = PY_ITERATOR_CHANNEL_DESCRIPTOR + + def execute(op_current: NodeOperator, op_next: NodeOperator): if op_current is None: return @@ -42,21 +44,26 @@ class PyExecutor(Executor): inputs ) ) + if len(intersect) > 1: - raise Exception( - "The interaction between the operator (A) {} and (B) {}, " - "can't be decided because are several channel availables {}".format( - py_current, - py_next, - intersect + if descriptor_default is None: + raise Exception( + "The interaction between the operator (A) {} and (B) {}, " + "can't be decided because are several channel availables {}".format( + py_current, + py_next, + intersect + ) ) - ) - #TODO validate if is valite for several output - py_current.outputChannel: List[Channel] = [intersect.pop().create_instance()] + descriptor = descriptor_default + else: + descriptor = intersect.pop() + + # TODO validate if is valite for several output + py_current.outputChannel[0] = descriptor.create_instance() py_current.execute(py_current.inputChannel, py_current.outputChannel) py_next.inputChannel = py_current.outputChannel - - graph.traversal(None, graph.starting_nodes, exec) + graph.traversal(graph.starting_nodes, execute) diff --git a/python/src/pywy/platforms/python/mappings.py b/python/src/pywy/platforms/python/mappings.py index 9adc062a..50a6ddeb 100644 --- a/python/src/pywy/platforms/python/mappings.py +++ b/python/src/pywy/platforms/python/mappings.py @@ -7,4 +7,5 @@ PYWY_OPERATOR_MAPPINGS = Mapping() PYWY_OPERATOR_MAPPINGS.add_mapping(PyFilterOperator()) PYWY_OPERATOR_MAPPINGS.add_mapping(PyTextFileSourceOperator()) PYWY_OPERATOR_MAPPINGS.add_mapping(PyTextFileSinkOperator()) +PYWY_OPERATOR_MAPPINGS.add_mapping(PyMapOperator()) diff --git a/python/src/pywy/platforms/python/operator/__init__.py b/python/src/pywy/platforms/python/operator/__init__.py index ef9d94b8..51b6f409 100644 --- a/python/src/pywy/platforms/python/operator/__init__.py +++ b/python/src/pywy/platforms/python/operator/__init__.py @@ -1,5 +1,6 @@ from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator from pywy.platforms.python.operator.py_unary_filter import PyFilterOperator +from pywy.platforms.python.operator.py_unary_map import PyMapOperator from pywy.platforms.python.operator.py_source_textfile import PyTextFileSourceOperator from pywy.platforms.python.operator.py_sink_textfile import PyTextFileSinkOperator @@ -7,5 +8,6 @@ __ALL__ = [ PyExecutionOperator, PyFilterOperator, PyTextFileSourceOperator, - PyTextFileSinkOperator + PyTextFileSinkOperator, + PyMapOperator, ] diff --git a/python/src/pywy/platforms/python/operator/py_execution_operator.py b/python/src/pywy/platforms/python/operator/py_execution_operator.py index 9b5f1a75..c63e88b5 100644 --- a/python/src/pywy/platforms/python/operator/py_execution_operator.py +++ b/python/src/pywy/platforms/python/operator/py_execution_operator.py @@ -9,5 +9,5 @@ class PyExecutionOperator(PywyOperator): def prefix(self) -> str: return 'Py' - def execute(self, inputs: List[Type[CH_T]], output: List[Type[CH_T]]): + def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]): pass diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py index ab5f5af7..801387cd 100644 --- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py @@ -14,8 +14,8 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator): def __init__(self, origin: TextFileSink = None): path = None if origin is None else origin.path - super().__init__(path) - pass + type_class = None if origin is None else origin.inputSlot[0] + super().__init__(path, type_class) def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): self.validate_channels(inputs, outputs) @@ -23,8 +23,12 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator): file = open(self.path, 'w') py_in_iter_channel: PyIteratorChannel = inputs[0] iterable = py_in_iter_channel.provide_iterable() - for element in iterable: - file.write(str(element)) + if self.inputSlot[0] == str: + for element in iterable: + file.write(element) + else: + for element in iterable: + file.write("{}\n".format(str(element))) file.close() else: diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py new file mode 100644 index 00000000..bf6ef3f0 --- /dev/null +++ b/python/src/pywy/platforms/python/operator/py_unary_map.py @@ -0,0 +1,49 @@ +from typing import Set, List, Type + +from pywy.core.channel import CH_T +from pywy.operators.unary import MapOperator +from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator +from pywy.platforms.python.channels import ( + ChannelDescriptor, + PyIteratorChannel, + PY_ITERATOR_CHANNEL_DESCRIPTOR, + PY_CALLABLE_CHANNEL_DESCRIPTOR, + PyCallableChannel + ) + + +class PyMapOperator(MapOperator, PyExecutionOperator): + + def __init__(self, origin: MapOperator = None): + function = None if origin is None else origin.function + super().__init__(function) + pass + + def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]): + self.validate_channels(inputs, outputs) + udf = self.function + if isinstance(inputs[0], PyIteratorChannel): + py_in_iter_channel: PyIteratorChannel = inputs[0] + py_out_iter_channel: PyIteratorChannel = outputs[0] + py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable())) + elif isinstance(inputs[0], PyCallableChannel): + py_in_call_channel: PyCallableChannel = inputs[0] + py_out_call_channel: PyCallableChannel = outputs[0] + + def func(iterator): + return map(udf, iterator) + + py_out_call_channel.accept_callable( + PyCallableChannel.concatenate( + func, + py_in_call_channel.provide_callable() + ) + ) + else: + raise Exception("Channel Type does not supported") + + def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} + + def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} diff --git a/python/src/pywy/tests/integration/python_platform_test.py b/python/src/pywy/tests/integration/python_platform_test.py index 64a7510d..d2ec4f22 100644 --- a/python/src/pywy/tests/integration/python_platform_test.py +++ b/python/src/pywy/tests/integration/python_platform_test.py @@ -1,7 +1,6 @@ import os import unittest import tempfile -from os import fdopen from typing import List from pywy.config import RC_TEST_DIR as ROOT @@ -42,3 +41,33 @@ class TestIntegrationPythonPlatform(unittest.TestCase): self.assertEqual(selectivity, elements) self.assertEqual(lines_filter, lines_platform) + + def test_dummy_map(self): + def pre(a: str) -> bool: + return 'six' in a + + def convert(a: str) -> int: + return len(a) + + fd, path_tmp = tempfile.mkstemp() + + WayangContext() \ + .register(PYTHON) \ + .textfile(self.file_10e0) \ + .filter(pre) \ + .map(convert) \ + .store_textfile(path_tmp) + + lines_filter: List[int] + with open(self.file_10e0, 'r') as f: + lines_filter = list(map(convert, filter(pre, f.readlines()))) + selectivity = len(list(lines_filter)) + + lines_platform: List[int] + with open(path_tmp, 'r') as fp: + lines_platform = list(map(lambda x: int(x), fp.readlines())) + elements = len(lines_platform) + os.remove(path_tmp) + + self.assertEqual(selectivity, elements) + self.assertEqual(lines_filter, lines_platform)
