This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 1b2ab52933c993f8b300e546ed8dcfcf289a8003 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Thu Apr 7 22:48:43 2022 +0200 [WAYANG-#8] remove the warning and clean code Signed-off-by: bertty <[email protected]> --- python/old_code/test.py | 4 +- python/old_code/tests/full_java_test.py | 4 +- python/old_code/tests/full_spark_test.py | 6 +-- python/src/pywy/core/__init__.py | 2 +- python/src/pywy/core/channel.py | 15 +++---- python/src/pywy/core/executor.py | 3 +- python/src/pywy/core/mapping.py | 4 +- python/src/pywy/core/plan.py | 24 +++++------ python/src/pywy/core/platform.py | 4 +- python/src/pywy/core/plugin.py | 5 +-- python/src/pywy/core/translator.py | 46 +++++++-------------- python/src/pywy/graph/graph.py | 47 +++++++++++----------- python/src/pywy/graph/types.py | 27 +++++++------ python/src/pywy/operators/__init__.py | 12 +++--- python/src/pywy/operators/base.py | 46 ++++++++++----------- python/src/pywy/operators/sink.py | 9 +++-- python/src/pywy/operators/source.py | 17 ++++---- python/src/pywy/operators/unary.py | 41 ++++++++++--------- python/src/pywy/platforms/python/channels.py | 16 +++++--- python/src/pywy/platforms/python/execution.py | 20 ++++----- python/src/pywy/platforms/python/mappings.py | 8 ++-- .../src/pywy/platforms/python/operator/__init__.py | 2 +- .../python/operator/py_execution_operator.py | 3 +- .../platforms/python/operator/py_sink_textfile.py | 17 ++++---- .../python/operator/py_source_textfile.py | 14 +++---- .../platforms/python/operator/py_unary_filter.py | 13 +++--- python/src/pywy/platforms/python/platform.py | 3 +- python/src/pywy/platforms/python/plugin.py | 6 +-- python/src/pywy/plugins.py | 8 ++-- python/src/pywy/types.py | 24 +++++++++-- 30 files changed, 224 insertions(+), 226 deletions(-) diff --git a/python/old_code/test.py b/python/old_code/test.py index 1b3111c5..46d8b342 100644 --- a/python/old_code/test.py +++ b/python/old_code/test.py @@ -1,5 +1,5 @@ from pywy.dataquanta import WayangContext -from pywy.plugins import python +from pywy.plugins import PYTHON # p = Platform("nana") # print("LALA "+str(p)) @@ -30,7 +30,7 @@ for index in range(0, 1): print(index) tic = time.perf_counter() fileop = WayangContext()\ - .register(python)\ + .register(PYTHON)\ .textfile("/Users/bertty/databloom/blossom/python/resources/tmp" + str(index))\ .filter(pre)\ .store_textfile("/Users/bertty/databloom/blossom/python/resources/out" + str(index)) diff --git a/python/old_code/tests/full_java_test.py b/python/old_code/tests/full_java_test.py index e1e512b1..e16126f5 100644 --- a/python/old_code/tests/full_java_test.py +++ b/python/old_code/tests/full_java_test.py @@ -26,7 +26,7 @@ class MyTestCase(unittest.TestCase): def test_most_basic(self): descriptor = Descriptor() - descriptor.add_plugin(Descriptor.Plugin.java) + descriptor.add_plugin(Descriptor.Plugin.JAVA) plan = DataQuantaBuilder(descriptor) sink_dataquanta = \ @@ -38,7 +38,7 @@ class MyTestCase(unittest.TestCase): def test_single_juncture(self): descriptor = Descriptor() - descriptor.add_plugin(Descriptor.Plugin.java) + descriptor.add_plugin(Descriptor.Plugin.JAVA) plan = DataQuantaBuilder(descriptor) dq_source_a = plan.source(input_1) diff --git a/python/old_code/tests/full_spark_test.py b/python/old_code/tests/full_spark_test.py index 98efd9e9..42cd02c8 100644 --- a/python/old_code/tests/full_spark_test.py +++ b/python/old_code/tests/full_spark_test.py @@ -18,7 +18,7 @@ def test_most_basic(self): descriptor = Descriptor() - descriptor.add_plugin(Descriptor.Plugin.spark) + descriptor.add_plugin(Descriptor.Plugin.SPARK) plan = DataQuantaBuilder(descriptor) sink_dataquanta = \ @@ -30,7 +30,7 @@ def test_most_basic(self): def test_single_juncture(self): descriptor = Descriptor() - descriptor.add_plugin(Descriptor.Plugin.spark) + descriptor.add_plugin(Descriptor.Plugin.SPARK) plan = DataQuantaBuilder(descriptor) dq_source_a = plan.source("../test/lines.txt") @@ -43,7 +43,7 @@ def test_single_juncture(self): def test_multiple_juncture(self): descriptor = Descriptor() - descriptor.add_plugin(Descriptor.Plugin.spark) + descriptor.add_plugin(Descriptor.Plugin.SPARK) plan = DataQuantaBuilder(descriptor) dq_source_a = plan.source("../test/lines.txt") diff --git a/python/src/pywy/core/__init__.py b/python/src/pywy/core/__init__.py index 90da09e4..291a401d 100644 --- a/python/src/pywy/core/__init__.py +++ b/python/src/pywy/core/__init__.py @@ -15,4 +15,4 @@ __ALL__ = [ Platform, Plugin, Translator -] \ No newline at end of file +] diff --git a/python/src/pywy/core/channel.py b/python/src/pywy/core/channel.py index 18d5c3d3..a4f3bb97 100644 --- a/python/src/pywy/core/channel.py +++ b/python/src/pywy/core/channel.py @@ -3,18 +3,19 @@ class Channel: def __init__(self): pass - def getchannel(self) -> 'Channel': + def get_channel(self) -> 'Channel': return self - def gettype(self): + def get_type(self): return type(self) + class ChannelDescriptor: - def __init__(self, channelType: type, isReusable: bool, isSuitableForBreakpoint: bool): - self.channelType = channelType - self.isReusable = isReusable - self.isSuitableForBreakpoint = isSuitableForBreakpoint + def __init__(self, channel_type: type, is_reusable: bool, is_suitable_for_breakpoint: bool): + self.channelType = channel_type + self.isReusable = is_reusable + self.isSuitableForBreakpoint = is_suitable_for_breakpoint def create_instance(self) -> Channel: - return self.channelType() \ No newline at end of file + return self.channelType() diff --git a/python/src/pywy/core/executor.py b/python/src/pywy/core/executor.py index 31fcb2c7..553abdcf 100644 --- a/python/src/pywy/core/executor.py +++ b/python/src/pywy/core/executor.py @@ -1,9 +1,8 @@ - class Executor: def __init__(self): pass def execute(self, plan): - pass \ No newline at end of file + pass diff --git a/python/src/pywy/core/mapping.py b/python/src/pywy/core/mapping.py index db377928..dbe299dc 100644 --- a/python/src/pywy/core/mapping.py +++ b/python/src/pywy/core/mapping.py @@ -1,6 +1,7 @@ from typing import Dict from pywy.operators.base import PywyOperator + class Mapping: mappings: Dict[str, type] @@ -20,9 +21,8 @@ class Mapping: ) return template(operator) - def __str__(self): return str(self.mappings) def __repr__(self): - return self.__str__() \ No newline at end of file + return self.__str__() diff --git a/python/src/pywy/core/plan.py b/python/src/pywy/core/plan.py index 69be6280..1826f90e 100644 --- a/python/src/pywy/core/plan.py +++ b/python/src/pywy/core/plan.py @@ -1,13 +1,12 @@ -from typing import ( Iterable, Set ) +from typing import (Iterable, Set) from pywy.graph.graph import WayangGraph -from pywy.graph.types import ( NodeOperator, WGraphOfVec, NodeVec ) +from pywy.graph.types import (NodeOperator, WGraphOfVec, NodeVec) from pywy.operators.sink import SinkOperator from pywy.core.plugin import Plugin class PywyPlan: - graph: WayangGraph def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]): @@ -30,14 +29,13 @@ class PywyPlan: return print( - "===========\n{}\n@@@@@ => previous is\n{}\n===========\n" - .format( - current.current, - previous.current - ) + "===========\n{}\n@@@@@ => previous is\n{}\n===========\n".format( + current.current, + previous.current + ) ) - self.graph.traversal(None, self.graph.starting_nodes, print_plan) + self.graph.traversal(None, self.graph.starting_nodes, print_plan) def printTuple(self): def print_plan(current: NodeVec, previous: NodeVec): @@ -53,9 +51,9 @@ class PywyPlan: print( "############\n{}\n@@@@@ => previous is\n{}\n############\n" .format( - current.current, - previous.current - ) + current.current, + previous.current + ) ) - self.graph.traversal(None, self.graph.starting_nodes, print_plan) + self.graph.traversal(None, self.graph.starting_nodes, print_plan) diff --git a/python/src/pywy/core/platform.py b/python/src/pywy/core/platform.py index caece352..c3f6b968 100644 --- a/python/src/pywy/core/platform.py +++ b/python/src/pywy/core/platform.py @@ -10,8 +10,7 @@ class Platform: platform name, it uses as identification """ - name : str - #configuration : dict[str, str] + name: str def __init__(self, name): self.name = name @@ -21,4 +20,3 @@ class Platform: def __repr__(self): return self.__str__() - diff --git a/python/src/pywy/core/plugin.py b/python/src/pywy/core/plugin.py index b6f84d88..be2d691c 100644 --- a/python/src/pywy/core/plugin.py +++ b/python/src/pywy/core/plugin.py @@ -4,6 +4,7 @@ from pywy.core.executor import Executor from pywy.core.platform import Platform from pywy.core.mapping import Mapping + class Plugin: """ A plugin contributes the following components to a :class:`Context` @@ -16,7 +17,7 @@ class Plugin: platforms: Set[Platform] mappings: Mapping - def __init__(self, platforms:Set[Platform], mappings: Mapping = Mapping()): + def __init__(self, platforms: Set[Platform], mappings: Mapping = Mapping()): self.platforms = platforms self.mappings = mappings @@ -31,5 +32,3 @@ class Plugin: def __repr__(self): return self.__str__() - - diff --git a/python/src/pywy/core/translator.py b/python/src/pywy/core/translator.py index f70b2cd1..0e0acfcb 100644 --- a/python/src/pywy/core/translator.py +++ b/python/src/pywy/core/translator.py @@ -1,59 +1,41 @@ -from pywy.graph.types import ( WGraphOfVec, NodeVec ) +from pywy.graph.types import (WGraphOfVec, NodeVec) from pywy.core.plugin import Plugin from pywy.core.plan import PywyPlan from pywy.core.mapping import Mapping + class Translator: plugin: Plugin - plan : PywyPlan + plan: PywyPlan def __init__(self, plugin: Plugin, plan: PywyPlan): self.plugin = plugin self.plan = plan def translate(self): - mappings:Mapping = self.plugin.get_mappings() + mappings: Mapping = self.plugin.get_mappings() graph = WGraphOfVec(self.plan.sinks) - def translate2plugin(current: NodeVec, next: NodeVec): - if current is None: + + def translate2plugin(current_op: NodeVec, next_op: NodeVec): + if current_op is None: return - if current.current[1] is None: - current.current[1] = mappings.get_instanceof(current.current[0]) + if current_op.current[1] is None: + current_op.current[1] = mappings.get_instanceof(current_op.current[0]) - if next is None: + if next_op is None: return - if next.current[1] is None: - next.current[1] = mappings.get_instanceof(next.current[0]) + if next_op.current[1] is None: + next_op.current[1] = mappings.get_instanceof(next_op.current[0]) # TODO not necesary it it 0 - current.current[1].connect(0, next.current[1], 0) + current_op.current[1].connect(0, next_op.current[1], 0) graph.traversal(None, graph.starting_nodes, translate2plugin) - # def print_plan(current: NodeVec, previous: NodeVec): - # if current is None: - # print("this is source") - # print(previous.current) - # return - # if previous is None: - # print("this is sink") - # print(current.current) - # return - # - # print( - # "############\n{}\n@@@@@ => previous is\n{}\n############\n" - # .format( - # current.current, - # previous.current - # ) - # ) - # - # graph.traversal(None, graph.starting_nodes, print_plan, False) - node = [] for elem in graph.starting_nodes: node.append(elem.current[1]) - return PywyPlan(self.plugin, node) \ No newline at end of file + return PywyPlan({self.plugin}, node) diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py index ff1c68c1..10624f3b 100644 --- a/python/src/pywy/graph/graph.py +++ b/python/src/pywy/graph/graph.py @@ -1,9 +1,8 @@ from pywy.types import T -from typing import ( Iterable, Dict, Callable, List, Any, Generic ) +from typing import (Iterable, Dict, Callable, Any, Generic, Optional) class GraphNode(Generic[T]): - current: T visited: bool @@ -11,19 +10,16 @@ class GraphNode(Generic[T]): self.current = op self.visited = False - def getadjacents(self) -> Iterable[T]: + def get_adjacents(self) -> Iterable[T]: pass - def build_node(self, t:T) -> 'GraphNode[T]': + def build_node(self, t: T) -> 'GraphNode[T]': pass - def adjacents(self, created: Dict[T, 'GraphNode[T]']) -> Iterable['GraphNode[T]']: - adjacent = self.getadjacents() - - if len(adjacent) == 0: - return [] + def walk(self, created: Dict[T, 'GraphNode[T]']) -> Iterable['GraphNode[T]']: + adjacent = self.get_adjacents() - def wrap(op:T): + def wrap(op: T): if op is None: return None if op not in created: @@ -32,37 +28,40 @@ class GraphNode(Generic[T]): return map(wrap, adjacent) - def visit(self, parent: 'GraphNode[T]', udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], visit_status: bool = True): - if(self.visited == visit_status): + def visit(self, + parent: 'GraphNode[T]', + udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], + visit_status: bool = True): + if self.visited == visit_status: return - self.visited != visit_status + self.visited = ~ visit_status return udf(self, parent) class WayangGraph(Generic[T]): + starting_nodes: Iterable[GraphNode[T]] + created_nodes: Dict[T, GraphNode[T]] - starting_nodes : List[GraphNode[T]] - created_nodes : Dict[T, GraphNode[T]] - - def __init__(self, nodes: List[T]): + def __init__(self, nodes: Iterable[T]): self.created_nodes = {} - self.starting_nodes = list() + start = list() for node in nodes: tmp = self.build_node(node) - self.starting_nodes.append(tmp) + start.append(tmp) self.created_nodes[node] = tmp + self.starting_nodes = start - def build_node(self, t:T) -> GraphNode[T]: + def build_node(self, t: T) -> GraphNode[T]: pass def traversal( self, origin: GraphNode[T], nodes: Iterable[GraphNode[T]], - udf: Callable[['GraphNode[T]', 'GraphNode[T]'], Any], + udf: Callable[[GraphNode[T], GraphNode[T]], Any], visit_status: bool = True ): for node in nodes: - adjacents = node.adjacents(self.created_nodes) - self.traversal(node, adjacents, udf, visit_status) - node.visit(origin, udf) \ No newline at end of file + adjacent = node.walk(self.created_nodes) + self.traversal(node, adjacent, udf, visit_status) + node.visit(origin, udf) diff --git a/python/src/pywy/graph/types.py b/python/src/pywy/graph/types.py index 2c014263..72f48bb3 100644 --- a/python/src/pywy/graph/types.py +++ b/python/src/pywy/graph/types.py @@ -1,28 +1,30 @@ -from typing import ( Iterable, List ) +from typing import (Iterable, List) -from pywy.graph.graph import ( GraphNode, WayangGraph ) +from pywy.graph.graph import (GraphNode, WayangGraph) from pywy.operators.base import PywyOperator + class NodeOperator(GraphNode[PywyOperator]): def __init__(self, op: PywyOperator): super(NodeOperator, self).__init__(op) - def getadjacents(self) -> Iterable[PywyOperator]: + def get_adjacents(self) -> Iterable[PywyOperator]: operator: PywyOperator = self.current if operator is None or operator.inputs == 0: return [] return operator.inputOperator - def build_node(self, t:PywyOperator) -> 'NodeOperator': + def build_node(self, t: PywyOperator) -> 'NodeOperator': return NodeOperator(t) + class WGraphOfOperator(WayangGraph[NodeOperator]): - def __init__(self, nodes: List[PywyOperator]): + def __init__(self, nodes: Iterable[PywyOperator]): super(WGraphOfOperator, self).__init__(nodes) - def build_node(self, t:PywyOperator) -> NodeOperator: + def build_node(self, t: PywyOperator) -> NodeOperator: return NodeOperator(t) @@ -31,13 +33,13 @@ class NodeVec(GraphNode[List[PywyOperator]]): def __init__(self, op: PywyOperator): super(NodeVec, self).__init__([op, None]) - def getadjacents(self) -> Iterable[List[PywyOperator]]: + def get_adjacents(self) -> Iterable[List[PywyOperator]]: operator: PywyOperator = self.current[0] if operator is None or operator.inputs == 0: return [] - return operator.inputOperator + return [operator.inputOperator, None] - def build_node(self, t:PywyOperator) -> 'NodeVec': + def build_node(self, t: PywyOperator) -> 'NodeVec': return NodeVec(t) def __str__(self): @@ -46,10 +48,11 @@ class NodeVec(GraphNode[List[PywyOperator]]): def __repr__(self): return self.__str__() + class WGraphOfVec(WayangGraph[NodeVec]): - def __init__(self, nodes: List[PywyOperator]): + def __init__(self, nodes: Iterable[PywyOperator]): super(WGraphOfVec, self).__init__(nodes) - def build_node(self, t:PywyOperator) -> NodeVec: - return NodeVec(t) \ No newline at end of file + def build_node(self, t: PywyOperator) -> NodeVec: + return NodeVec(t) diff --git a/python/src/pywy/operators/__init__.py b/python/src/pywy/operators/__init__.py index 4c1c7181..e2dc0fdc 100644 --- a/python/src/pywy/operators/__init__.py +++ b/python/src/pywy/operators/__init__.py @@ -2,13 +2,13 @@ from pywy.operators.base import PywyOperator from pywy.operators.sink import TextFileSink, SinkOperator from pywy.operators.source import TextFileSource from pywy.operators.unary import FilterOperator, MapOperator, FlatmapOperator -# -__ALL__= [ + +__ALL__ = [ PywyOperator, TextFileSink, TextFileSource, FilterOperator, - SinkOperator -# MapOperator, -# FlatmapOperator -] \ No newline at end of file + SinkOperator, + MapOperator, + FlatmapOperator +] diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py index 26e8bf9b..3b5cca9e 100644 --- a/python/src/pywy/operators/base.py +++ b/python/src/pywy/operators/base.py @@ -1,14 +1,15 @@ -from typing import ( TypeVar, Optional, List, Set ) +from typing import (TypeVar, Optional, List, Set) from pywy.core import ChannelDescriptor, Channel + class PywyOperator: - inputSlot : List[TypeVar] - inputChannel : List[Channel] - inputChannelDescriptor : List[ChannelDescriptor] + inputSlot: List[TypeVar] + inputChannel: List[Channel] + inputChannelDescriptor: List[ChannelDescriptor] inputOperator: List['PywyOperator'] - inputs : int - outputSlot : List[TypeVar] + inputs: int + outputSlot: List[TypeVar] outputChannel: List[Channel] outputChannelDescriptor: List[ChannelDescriptor] outputOperator: List['PywyOperator'] @@ -16,16 +17,16 @@ class PywyOperator: def __init__(self, name: str, - input: Optional[TypeVar] = None, - output: Optional[TypeVar] = None, - input_lenght: Optional[int] = 1, - output_lenght: Optional[int] = 1 - ): + input_type: TypeVar = None, + output_type: TypeVar = None, + input_length: Optional[int] = 1, + output_length: Optional[int] = 1 + ): self.name = (self.prefix() + name + self.postfix()).strip() - self.inputSlot = input - self.inputs = input_lenght - self.outputSlot = output - self.outputs = output_lenght + self.inputSlot = [input_type] + self.inputs = input_length + self.outputSlot = [output_type] + self.outputs = output_length self.inputOperator = [None] * self.inputs self.outputOperator = [None] * self.outputs self.inputChannel = [None] * self.inputs @@ -39,6 +40,7 @@ class PywyOperator: self.inputs ) ) + def validate_outputs(self, vec): if len(vec) != self.outputs: raise Exception( @@ -47,11 +49,12 @@ class PywyOperator: self.inputs ) ) - def validate_channels(self, input, output): - self.validate_inputs(input) - self.validate_outputs(output) - def connect(self, port:int, that: 'PywyOperator', port_that:int): + def validate_channels(self, inputs, outputs): + self.validate_inputs(inputs) + self.validate_outputs(outputs) + + def connect(self, port: int, that: 'PywyOperator', port_that: int): self.outputOperator[port] = that that.inputOperator[port_that] = self @@ -67,7 +70,7 @@ class PywyOperator: def postfix(self) -> str: return '' - def name_basic(self, with_prefix: bool = False, with_postfix:bool = True): + def name_basic(self, with_prefix: bool = False, with_postfix: bool = True): prefix = len(self.prefix()) if not with_prefix else 0 postfix = len(self.postfix()) if not with_postfix else 0 return self.name[prefix:len(self.name) - postfix] @@ -83,6 +86,3 @@ class PywyOperator: def __repr__(self): return self.__str__() - - - diff --git a/python/src/pywy/operators/sink.py b/python/src/pywy/operators/sink.py index e3ddd7d8..6b13b676 100644 --- a/python/src/pywy/operators/sink.py +++ b/python/src/pywy/operators/sink.py @@ -3,15 +3,17 @@ from typing import Any from pywy.types import GenericTco from pywy.operators.base import PywyOperator + class SinkOperator(PywyOperator): def postfix(self) -> str: return 'Sink' + class SinkUnaryOperator(SinkOperator): - def __init__(self, name:str, input:GenericTco=Any): - super().__init__(name, input, None, 1, 0) + def __init__(self, name: str, input_type: GenericTco = Any): + super().__init__(name, input_type, None, 1, 0) def __str__(self): return super().__str__() @@ -20,7 +22,6 @@ class SinkUnaryOperator(SinkOperator): return super().__repr__() - class TextFileSink(SinkUnaryOperator): path: str @@ -33,4 +34,4 @@ class TextFileSink(SinkUnaryOperator): return super().__str__() def __repr__(self): - return super().__repr__() \ No newline at end of file + return super().__repr__() diff --git a/python/src/pywy/operators/source.py b/python/src/pywy/operators/source.py index 42af498e..fdbd0b93 100644 --- a/python/src/pywy/operators/source.py +++ b/python/src/pywy/operators/source.py @@ -1,14 +1,15 @@ from pywy.operators.base import PywyOperator + class SourceUnaryOperator(PywyOperator): - def __init__(self, name:str): + def __init__(self, name: str): super(SourceUnaryOperator, self).__init__( - name = name, - input = None, - output = str, - input_lenght = 0, - output_lenght = 1 + name=name, + input_type=None, + output_type=str, + input_length=0, + output_length=1 ) def postfix(self) -> str: @@ -21,9 +22,7 @@ class SourceUnaryOperator(PywyOperator): return super().__repr__() - class TextFileSource(SourceUnaryOperator): - path: str def __init__(self, path: str): @@ -34,4 +33,4 @@ class TextFileSource(SourceUnaryOperator): return super().__str__() def __repr__(self): - return super().__repr__() \ No newline at end of file + return super().__repr__() diff --git a/python/src/pywy/operators/unary.py b/python/src/pywy/operators/unary.py index 198a9fbf..cfd585f1 100644 --- a/python/src/pywy/operators/unary.py +++ b/python/src/pywy/operators/unary.py @@ -1,22 +1,21 @@ from itertools import chain from pywy.operators.base import PywyOperator from pywy.types import ( - GenericTco, - GenericUco, - Predicate, - get_type_predicate, - Function, - get_type_function, - FlatmapFunction, - get_type_flatmap_function + GenericTco, + GenericUco, + Predicate, + get_type_predicate, + Function, + get_type_function, + FlatmapFunction, + get_type_flatmap_function ) - class UnaryToUnaryOperator(PywyOperator): - def __init__(self, name:str, input:GenericTco, output:GenericUco): - super().__init__(name, input, output, 1, 1) + def __init__(self, name: str, input_type: GenericTco, output_type: GenericUco): + super().__init__(name, input_type, output_type, 1, 1) def postfix(self) -> str: return 'OperatorUnary' @@ -28,14 +27,13 @@ class UnaryToUnaryOperator(PywyOperator): return super().__repr__() - class FilterOperator(UnaryToUnaryOperator): predicate: Predicate def __init__(self, predicate: Predicate): - type = get_type_predicate(predicate) if predicate else None - super().__init__("Filter", type, type) + predicate_type = get_type_predicate(predicate) if predicate else None + super().__init__("Filter", predicate_type, predicate_type) self.predicate = predicate def __str__(self): @@ -44,6 +42,7 @@ class FilterOperator(UnaryToUnaryOperator): def __repr__(self): return super().__repr__() + class MapOperator(UnaryToUnaryOperator): function: Function @@ -53,6 +52,7 @@ class MapOperator(UnaryToUnaryOperator): super().__init__("Map", types[0], types[1]) self.function = function + # TODO remove wrapper def getWrapper(self): udf = self.function def func(iterator): @@ -68,15 +68,16 @@ class MapOperator(UnaryToUnaryOperator): class FlatmapOperator(UnaryToUnaryOperator): - fmfunction: FlatmapFunction + fm_function: FlatmapFunction - def __init__(self, fmfunction: FlatmapFunction): - types = get_type_flatmap_function(fmfunction) if fmfunction else (None, None) + def __init__(self, fm_function: FlatmapFunction): + types = get_type_flatmap_function(fm_function) if fm_function else (None, None) super().__init__("Flatmap", types[0], types[1]) - self.fmfunction = fmfunction + self.fm_function = fm_function + # TODO remove wrapper def getWrapper(self): - udf = self.fmfunction + udf = self.fm_function def func(iterator): return chain.from_iterable(map(udf, iterator)) return func @@ -85,4 +86,4 @@ class FlatmapOperator(UnaryToUnaryOperator): return super().__str__() def __repr__(self): - return super().__repr__() \ No newline at end of file + return super().__repr__() diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/python/channels.py index dc976bb6..0dee78a1 100644 --- a/python/src/pywy/platforms/python/channels.py +++ b/python/src/pywy/platforms/python/channels.py @@ -1,9 +1,10 @@ from typing import ( Iterable, Callable ) from pywy.core import (Channel, ChannelDescriptor) + class PyIteratorChannel(Channel): - iterable : Iterable + iterable: Iterable def __init__(self): Channel.__init__(self) @@ -15,9 +16,10 @@ class PyIteratorChannel(Channel): self.iterable = iterable return self + class PyCallableChannel(Channel): - udf : Callable + udf: Callable def __init__(self): Channel.__init__(self) @@ -35,9 +37,10 @@ class PyCallableChannel(Channel): return function_a(function_b(iterable)) return executable + class PyFileChannel(Channel): - path : str + path: str def __init__(self): Channel.__init__(self) @@ -49,6 +52,7 @@ class PyFileChannel(Channel): self.path = path return self -PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False) -PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False) -PyFileChannelDescriptor = ChannelDescriptor(type(PyFileChannel()), False, False) \ No newline at end of file + +PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False) +PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False) +PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False) diff --git a/python/src/pywy/platforms/python/execution.py b/python/src/pywy/platforms/python/execution.py index 398ef892..7f8803eb 100644 --- a/python/src/pywy/platforms/python/execution.py +++ b/python/src/pywy/platforms/python/execution.py @@ -16,26 +16,26 @@ class PyExecutor(Executor): pywyPlan: PywyPlan = plan graph = WGraphOfOperator(pywyPlan.sinks) - def exec(current: NodeOperator, next: NodeOperator): - if current is None: + def exec(op_current: NodeOperator, op_next: NodeOperator): + if op_current is None: return - py_current: PyExecutionOperator = current.current + py_current: PyExecutionOperator = op_current.current if py_current.outputs == 0: py_current.execute(py_current.inputChannel, []) return - if next is None: + if op_next is None: return - py_next: PyExecutionOperator = next.current + py_next: PyExecutionOperator = op_next.current outputs = py_current.get_output_channeldescriptors() inputs = py_next.get_input_channeldescriptors() intersect = outputs.intersection(inputs) if len(intersect) == 0: raise Exception( - "The operator(A) {} can't connect with (B) {}, because the output of (A) is {} and the input of (B) is {}" - .format( + "The operator(A) {} can't connect with (B) {}, " + "because the output of (A) is {} and the input of (B) is {} ".format( py_current, py_next, outputs, @@ -44,8 +44,8 @@ class PyExecutor(Executor): ) if len(intersect) > 1: raise Exception( - "The interaction between the operator (A) {} and (B) {}, can't be decided because are several channel availables {}" - .format( + "The interaction between the operator (A) {} and (B) {}, " + "can't be decided because are several channel availables {}".format( py_current, py_next, intersect @@ -59,4 +59,4 @@ class PyExecutor(Executor): py_next.inputChannel = py_current.outputChannel - graph.traversal(None, graph.starting_nodes, exec) \ No newline at end of file + graph.traversal(None, graph.starting_nodes, exec) diff --git a/python/src/pywy/platforms/python/mappings.py b/python/src/pywy/platforms/python/mappings.py index d949c188..9adc062a 100644 --- a/python/src/pywy/platforms/python/mappings.py +++ b/python/src/pywy/platforms/python/mappings.py @@ -2,9 +2,9 @@ from pywy.core import Mapping from pywy.platforms.python.operator import * -PywyOperatorMappings = Mapping() +PYWY_OPERATOR_MAPPINGS = Mapping() -PywyOperatorMappings.add_mapping(PyFilterOperator()) -PywyOperatorMappings.add_mapping(PyTextFileSourceOperator()) -PywyOperatorMappings.add_mapping(PyTextFileSinkOperator()) +PYWY_OPERATOR_MAPPINGS.add_mapping(PyFilterOperator()) +PYWY_OPERATOR_MAPPINGS.add_mapping(PyTextFileSourceOperator()) +PYWY_OPERATOR_MAPPINGS.add_mapping(PyTextFileSinkOperator()) diff --git a/python/src/pywy/platforms/python/operator/__init__.py b/python/src/pywy/platforms/python/operator/__init__.py index ec6db064..ef9d94b8 100644 --- a/python/src/pywy/platforms/python/operator/__init__.py +++ b/python/src/pywy/platforms/python/operator/__init__.py @@ -8,4 +8,4 @@ __ALL__ = [ PyFilterOperator, PyTextFileSourceOperator, PyTextFileSinkOperator -] \ No newline at end of file +] diff --git a/python/src/pywy/platforms/python/operator/py_execution_operator.py b/python/src/pywy/platforms/python/operator/py_execution_operator.py index 718e9c49..a2e82590 100644 --- a/python/src/pywy/platforms/python/operator/py_execution_operator.py +++ b/python/src/pywy/platforms/python/operator/py_execution_operator.py @@ -1,10 +1,11 @@ from pywy.operators.base import PywyOperator from pywy.platforms.python.channels import Channel + class PyExecutionOperator(PywyOperator): def prefix(self) -> str: return 'Py' def execute(self, inputs: Channel, output: Channel): - pass \ No newline at end of file + pass diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py index 9fe9fc06..429da96c 100644 --- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py @@ -2,11 +2,11 @@ from typing import Set from pywy.operators.sink import TextFileSink from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator from pywy.platforms.python.channels import ( - Channel, - ChannelDescriptor, - PyIteratorChannel, - PyIteratorChannelDescriptor - ) + Channel, + ChannelDescriptor, + PyIteratorChannel, + PY_ITERATOR_CHANNEL_DESCRIPTOR +) class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator): @@ -18,8 +18,8 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator): def execute(self, inputs: Channel, outputs: Channel): self.validate_channels(inputs, outputs) - if isinstance(inputs[0], PyIteratorChannel) : - file = open(self.path,'w') + if isinstance(inputs[0], PyIteratorChannel): + file = open(self.path, 'w') py_in_iter_channel: PyIteratorChannel = inputs[0] iterable = py_in_iter_channel.provide_iterable(); for element in iterable: @@ -29,9 +29,8 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator): else: raise Exception("Channel Type does not supported") - def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PyIteratorChannelDescriptor} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR} def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: raise Exception("The PyTextFileSource does not support Output Channels") diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py index 7bdf7f2a..7fdbf740 100644 --- a/python/src/pywy/platforms/python/operator/py_source_textfile.py +++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py @@ -2,10 +2,10 @@ from typing import Set from pywy.operators.source import TextFileSource from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator from pywy.platforms.python.channels import ( - Channel, - ChannelDescriptor, - PyIteratorChannel, - PyIteratorChannelDescriptor + Channel, + ChannelDescriptor, + PyIteratorChannel, + PY_ITERATOR_CHANNEL_DESCRIPTOR ) @@ -18,7 +18,7 @@ class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator): def execute(self, inputs: Channel, outputs: Channel): self.validate_channels(inputs, outputs) - if isinstance(outputs[0], PyIteratorChannel) : + if isinstance(outputs[0], PyIteratorChannel): py_out_iter_channel: PyIteratorChannel = outputs[0] py_out_iter_channel.accept_iterable( open( @@ -26,13 +26,11 @@ class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator): 'r' ) ) - else: raise Exception("Channel Type does not supported") - def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: raise Exception("The PyTextFileSource does not support Input Channels") def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PyIteratorChannelDescriptor} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR} diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py index ae02784d..b08bb5ca 100644 --- a/python/src/pywy/platforms/python/operator/py_unary_filter.py +++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py @@ -5,8 +5,8 @@ from pywy.platforms.python.channels import ( Channel, ChannelDescriptor, PyIteratorChannel, - PyIteratorChannelDescriptor, - PyCallableChannelDescriptor, + PY_ITERATOR_CHANNEL_DESCRIPTOR, + PY_CALLABLE_CHANNEL_DESCRIPTOR, PyCallableChannel ) @@ -21,11 +21,11 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator): def execute(self, inputs: Channel, outputs: Channel): self.validate_channels(inputs, outputs) udf = self.predicate - if isinstance(inputs[0], PyIteratorChannel) : + if isinstance(inputs[0], PyIteratorChannel): py_in_iter_channel: PyIteratorChannel = inputs[0] py_out_iter_channel: PyIteratorChannel = outputs[0] py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable())) - elif isinstance(inputs[0], PyCallableChannel) : + elif isinstance(inputs[0], PyCallableChannel): py_in_call_channel: PyCallableChannel = inputs[0] py_out_call_channel: PyCallableChannel = outputs[0] @@ -41,9 +41,8 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator): else: raise Exception("Channel Type does not supported") - def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]: - return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor} + return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR} diff --git a/python/src/pywy/platforms/python/platform.py b/python/src/pywy/platforms/python/platform.py index e2aa3fed..6434db6c 100644 --- a/python/src/pywy/platforms/python/platform.py +++ b/python/src/pywy/platforms/python/platform.py @@ -1,6 +1,7 @@ from pywy.core.platform import Platform + class PythonPlatform(Platform): def __init__(self): - super(PythonPlatform, self).__init__("Python") \ No newline at end of file + super(PythonPlatform, self).__init__("Python") diff --git a/python/src/pywy/platforms/python/plugin.py b/python/src/pywy/platforms/python/plugin.py index 4f41470e..2c652d2e 100644 --- a/python/src/pywy/platforms/python/plugin.py +++ b/python/src/pywy/platforms/python/plugin.py @@ -2,13 +2,13 @@ from pywy.core import Executor from pywy.platforms.python.execution import PyExecutor from pywy.platforms.python.platform import PythonPlatform from pywy.core import Plugin -from pywy.platforms.python.mappings import PywyOperatorMappings +from pywy.platforms.python.mappings import PYWY_OPERATOR_MAPPINGS class PythonPlugin(Plugin): def __init__(self): - super(PythonPlugin, self).__init__({PythonPlatform()}, PywyOperatorMappings) + super(PythonPlugin, self).__init__({PythonPlatform()}, PYWY_OPERATOR_MAPPINGS) def get_executor(self) -> Executor: - return PyExecutor() \ No newline at end of file + return PyExecutor() diff --git a/python/src/pywy/plugins.py b/python/src/pywy/plugins.py index cad2f356..37fb2d6d 100644 --- a/python/src/pywy/plugins.py +++ b/python/src/pywy/plugins.py @@ -3,8 +3,8 @@ from pywy.core import Plugin from pywy.platforms.python.plugin import PythonPlugin # define the basic plugins that can be used -java = Plugin(Platform('java')) -spark = Plugin(Platform('spark')) -flink = Plugin(Platform('flink')) +JAVA = Plugin({Platform('java')}) +SPARK = Plugin({Platform('spark')}) +FLINK = Plugin({Platform('flink')}) # plugin for the python platform -python = PythonPlugin() +PYTHON = PythonPlugin() diff --git a/python/src/pywy/types.py b/python/src/pywy/types.py index e1413457..f39d4528 100644 --- a/python/src/pywy/types.py +++ b/python/src/pywy/types.py @@ -28,7 +28,11 @@ FlatmapFunction = Callable[[T], IterableOut] def get_type_predicate(call: Predicate) -> type: sig = signature(call) if len(sig.parameters) != 1: - raise PywyException("the parameters for the Predicate are distinct than one, {}".format(str(sig.parameters))) + raise PywyException( + "the parameters for the Predicate are distinct than one, {}".format( + str(sig.parameters) + ) + ) keys = list(sig.parameters.keys()) return sig.parameters[keys[0]].annotation @@ -37,7 +41,11 @@ def get_type_predicate(call: Predicate) -> type: def get_type_function(call: Function) -> (type, type): sig = signature(call) if len(sig.parameters) != 1: - raise PywyException("the parameters for the Function are distinct than one, {}".format(str(sig.parameters))) + raise PywyException( + "the parameters for the Function are distinct than one, {}".format( + str(sig.parameters) + ) + ) keys = list(sig.parameters.keys()) return sig.parameters[keys[0]].annotation, sig.return_annotation @@ -46,7 +54,11 @@ def get_type_function(call: Function) -> (type, type): def get_type_bifunction(call: BiFunction) -> (type, type, type): sig = signature(call) if len(sig.parameters) != 2: - raise PywyException("the parameters for the BiFunction are distinct than two, {}".format(str(sig.parameters))) + raise PywyException( + "the parameters for the BiFunction are distinct than two, {}".format( + str(sig.parameters) + ) + ) keys = list(sig.parameters.keys()) return sig.parameters[keys[0]].annotation, sig.parameters[keys[1]].annotation, sig.return_annotation @@ -55,7 +67,11 @@ def get_type_bifunction(call: BiFunction) -> (type, type, type): def get_type_flatmap_function(call: FlatmapFunction) -> (type, type): sig = signature(call) if len(sig.parameters) != 1: - raise PywyException("the parameters for the FlatmapFunction are distinct than one, {}".format(str(sig.parameters))) + raise PywyException( + "the parameters for the FlatmapFunction are distinct than one, {}".format( + str(sig.parameters) + ) + ) keys = list(sig.parameters.keys()) return sig.parameters[keys[0]].annotation, sig.return_annotation
