This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch python-platform in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit fc94a5fd660ac89906ab82f723c696240e4aec69 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Mon Apr 4 20:03:12 2022 +0200 [WAYANG-#8] Structure the Python file inside of a module Signed-off-by: bertty <[email protected]> --- python/src/pywayang/context.py | 35 ++++++++++ python/src/pywayang/dataquanta.py | 32 +++++++++ python/{ => src/pywayang/operator}/__init__.py | 0 python/src/pywayang/operator/base.py | 37 +++++++++++ python/src/pywayang/operator/source.py | 28 ++++++++ python/src/pywayang/operator/unary.py | 90 ++++++++++++++++++++++++++ python/src/pywayang/orchestrator/operator.py | 8 ++- python/src/pywayang/platform.py | 24 +++++++ python/src/pywayang/plugin.py | 27 ++++++++ python/src/pywayang/test.py | 61 +++++++++++++++++ python/src/pywayang/types.py | 57 ++++++++++++++++ 11 files changed, 397 insertions(+), 2 deletions(-) diff --git a/python/src/pywayang/context.py b/python/src/pywayang/context.py new file mode 100644 index 00000000..95942ef7 --- /dev/null +++ b/python/src/pywayang/context.py @@ -0,0 +1,35 @@ +from pywayang.plugin import Plugin +from pywayang.dataquanta import DataQuanta +from pywayang.operator.source import TextFileSource + +class WayangContext: + """ + This is the entry point for users to work with Wayang. + """ + def __init__(self): + self.plugins = set() + + """ + add a :class:`Plugin` to the :class:`Context` + """ + def register(self, *p: Plugin): + self.plugins.add(p) + return self + + """ + remove a :class:`Plugin` from the :class:`Context` + """ + def unregister(self, p: Plugin): + self.plugins.remove(p) + return self + + def textFile(self, file_path: str) -> DataQuanta[str]: + return DataQuanta(TextFileSource(file_path)) + + + def __str__(self): + return "Plugins: {} \n".format(str(self.plugins)) + + def __repr__(self): + return self.__str__() + diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py new file mode 100644 index 00000000..5f740941 --- /dev/null +++ b/python/src/pywayang/dataquanta.py @@ -0,0 +1,32 @@ +from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO) +from pywayang.operator.base import (BaseOperator) +from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator) + + +class DataQuanta(GenericTco): + """ + Represents an intermediate result/data flow edge in a [[WayangPlan]]. + """ + previous : BaseOperator = None + + def __init__(self, operator: BaseOperator): + self.operator = operator + + + def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" : + return DataQuanta(FilterOperator(p)) + + def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" : + return DataQuanta(MapOperator(f)) + + def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" : + return DataQuanta(FlatmapOperator(f)) + + def getOperator(self): + return self.operator + + def __str__(self): + return str(self.operator) + + def __repr__(self): + return self.__str__() diff --git a/python/__init__.py b/python/src/pywayang/operator/__init__.py similarity index 100% rename from python/__init__.py rename to python/src/pywayang/operator/__init__.py diff --git a/python/src/pywayang/operator/base.py b/python/src/pywayang/operator/base.py new file mode 100644 index 00000000..ad2deed5 --- /dev/null +++ b/python/src/pywayang/operator/base.py @@ -0,0 +1,37 @@ +from typing import (TypeVar, Optional, List) + + +class BaseOperator: + + inputSlot : List[TypeVar] + inputs : int + outputSlot : List[TypeVar] + outputs: int + + def __init__(self, + name: str, + input: Optional[TypeVar] = None, + output: Optional[TypeVar] = None, + input_lenght: Optional[int] = 1, + output_lenght: Optional[int] = 1 + ): + self.name = name + self.inputSlot = input + self.inputs = input_lenght + self.outputSlot = output + self.outputs = output_lenght + + def __str__(self): + return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format( + str(self.name), + str(self.inputs), + str(self.inputSlot), + str(self.outputs), + str(self.outputSlot), + ) + + def __repr__(self): + return self.__str__() + + + diff --git a/python/src/pywayang/operator/source.py b/python/src/pywayang/operator/source.py new file mode 100644 index 00000000..34a16644 --- /dev/null +++ b/python/src/pywayang/operator/source.py @@ -0,0 +1,28 @@ +from pywayang.operator.base import BaseOperator + +class SourceUnaryOperator(BaseOperator): + + def __init__(self, name:str): + super().__init__(name, None, str, 0, 1) + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() + + + +class TextFileSource(SourceUnaryOperator): + + path: str + + def __init__(self, path: str): + super().__init__('TextFileSource') + self.path = path + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() \ No newline at end of file diff --git a/python/src/pywayang/operator/unary.py b/python/src/pywayang/operator/unary.py new file mode 100644 index 00000000..24e5df2f --- /dev/null +++ b/python/src/pywayang/operator/unary.py @@ -0,0 +1,90 @@ +from pywayang.operator.base import BaseOperator +from pywayang.types import ( + GenericTco, + GenericUco, + Predicate, + getTypePredicate, + Function, + getTypeFunction, + FlatmapFunction, + getTypeFlatmapFunction + ) +from itertools import chain + + +class UnaryToUnaryOperator(BaseOperator): + + def __init__(self, name:str, input:GenericTco, output:GenericUco): + super().__init__(name, input, output, 1, 1) + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() + + + +class FilterOperator(UnaryToUnaryOperator): + + predicate: Predicate + + def __init__(self, predicate: Predicate): + type = getTypePredicate(predicate) + super().__init__("FilterOperator", type, type) + self.predicate = predicate + + def getWrapper(self): + udf = self.predicate + def func(iterator): + return filter(udf, iterator) + return func + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() + +class MapOperator(UnaryToUnaryOperator): + + function: Function + + def __init__(self, function: Function): + types = getTypeFunction(function) + super().__init__("MapOperator", types[0], types[1]) + self.function = function + + def getWrapper(self): + udf = self.function + def func(iterator): + return map(udf, iterator) + return func + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() + + +class FlatmapOperator(UnaryToUnaryOperator): + + fmfunction: FlatmapFunction + + def __init__(self, fmfunction: FlatmapFunction): + types = getTypeFlatmapFunction(fmfunction) + super().__init__("FlatmapOperator", types[0], types[1]) + self.fmfunction = fmfunction + + def getWrapper(self): + udf = self.fmfunction + def func(iterator): + return chain.from_iterable(map(udf, iterator)) + return func + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() \ No newline at end of file diff --git a/python/src/pywayang/orchestrator/operator.py b/python/src/pywayang/orchestrator/operator.py index 1307b5f2..87087642 100644 --- a/python/src/pywayang/orchestrator/operator.py +++ b/python/src/pywayang/orchestrator/operator.py @@ -30,8 +30,12 @@ pickle_protocol = pickle.HIGHEST_PROTOCOL class Operator: def __init__( - self, operator_type=None, udf=None, previous=None, - iterator=None, python_exec=False + self, + operator_type=None, + udf=None, + previous=None, + iterator=None, + python_exec=False ): # Operator ID diff --git a/python/src/pywayang/platform.py b/python/src/pywayang/platform.py new file mode 100644 index 00000000..caece352 --- /dev/null +++ b/python/src/pywayang/platform.py @@ -0,0 +1,24 @@ + +class Platform: + """ + A platform describes an execution engine that is used for execute the + wayang plan + + Parameters + ---------- + name: str + platform name, it uses as identification + """ + + name : str + #configuration : dict[str, str] + + def __init__(self, name): + self.name = name + + def __str__(self): + return "name: {}".format(self.name) + + def __repr__(self): + return self.__str__() + diff --git a/python/src/pywayang/plugin.py b/python/src/pywayang/plugin.py new file mode 100644 index 00000000..82d75431 --- /dev/null +++ b/python/src/pywayang/plugin.py @@ -0,0 +1,27 @@ +from pywayang.platform import Platform + +class Plugin: + """ + A plugin contributes the following components to a :class:`Context` + - mappings + - channels + - configurations + In turn, it may require several :clas:`Platform`s for its operation. + """ + + platforms = [] + + def __init__(self, *platform:Platform): + self.platforms = list(platform) + + def __str__(self): + return "Platforms: {}".format(str(self.platforms)) + + def __repr__(self): + return self.__str__() + + +# define the basic plugins that can be used +java = Plugin(Platform('java')) +spark = Plugin(Platform('spark')) +flink = Plugin(Platform('flink')) diff --git a/python/src/pywayang/test.py b/python/src/pywayang/test.py new file mode 100644 index 00000000..66ddab0a --- /dev/null +++ b/python/src/pywayang/test.py @@ -0,0 +1,61 @@ +from typing import Iterable + +from pywayang.platform import Platform +from pywayang.context import WayangContext +from pywayang.plugin import java, spark +from pywayang.operator.unary import * + +p = Platform("nana") +print(p) + + +print(str(WayangContext().register(java, spark))) + +from pywayang.types import Predicate, getTypePredicate + +predicate : Predicate = lambda x: x % 2 == 0 +getTypePredicate(predicate) + +def pre(a:str): + return len(a) > 3 + +def func(s:str) -> int: + return len(s) + +def fmfunc(i:int) -> str: + for x in range(i): + yield str(x) + +fileop = WayangContext()\ + .register(java)\ + .textFile("here")\ + +filterop: FilterOperator = fileop.filter(pre).getOperator() +fop_pre = filterop.getWrapper() +fop_pre_res = fop_pre(["la", "lala"]) +#for i in fop_pre_res: +# print(i) + + +mapop: MapOperator = fileop.map(func).getOperator() +mop_func = mapop.getWrapper() +mop_func_res = mop_func(["la", "lala"]) +#for i in mop_func_res: +# print(i) + + +fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator() +fmop_func = fmop.getWrapper() +fmop_func_res = fmop_func([2, 3]) +#for i in fmop_func_res: +# print(i) + +def concatenate(function_a, function_b): + def executable(iterable): + return function_b(function_a(iterable)) + return executable + +res = concatenate(concatenate(fop_pre, mop_func), fmop_func) +res_pro = res(["la", "lala"]) +for i in res_pro: + print(i) \ No newline at end of file diff --git a/python/src/pywayang/types.py b/python/src/pywayang/types.py new file mode 100644 index 00000000..8be502e0 --- /dev/null +++ b/python/src/pywayang/types.py @@ -0,0 +1,57 @@ +from typing import ( Generic, TypeVar, Callable, Hashable, Iterable) +from inspect import signature + +T = TypeVar("T") # Type +I = TypeVar("I") # Input Type number 1 +I2 = TypeVar("I2") # Input Type number 2 +O = TypeVar("O") # Output Type + +IterableT = Iterable[T] # Iterable of type 'T' +IterableO = Iterable[O] # Iterable of type 'O' + +T_co = TypeVar("T_co", covariant=True) +U_co = TypeVar("U_co", covariant=True) +K = TypeVar("K", bound=Hashable) + +GenericTco = Generic[T_co] +GenericUco = Generic[U_co] + +Predicate = Callable[[T], bool] +Function = Callable[[I], O] +BiFunction = Callable[[I, I2], O] +Function = Callable[[I], O] + +FlatmapFunction = Callable[[T], IterableO] + + +def getTypePredicate(callable: Predicate) -> Generic : + sig = signature(callable) + if(len(sig.parameters) != 1): + raise Exception("the parameters for the Predicate are distinct than one, {}".format(str(sig.parameters))) + + keys = list(sig.parameters.keys()) + return sig.parameters[keys[0]].annotation + +def getTypeFunction(callable: Function) -> Generic : + sig = signature(callable) + if(len(sig.parameters) != 1): + raise Exception("the parameters for the Function are distinct than one, {}".format(str(sig.parameters))) + + keys = list(sig.parameters.keys()) + return (sig.parameters[keys[0]].annotation, sig.return_annotation) + +def getTypeBiFunction(callable: BiFunction) -> (Generic, Generic, Generic) : + sig = signature(callable) + if(len(sig.parameters) != 2): + raise Exception("the parameters for the BiFunction are distinct than two, {}".format(str(sig.parameters))) + + keys = list(sig.parameters.keys()) + return (sig.parameters[keys[0]].annotation, sig.parameters[keys[1]].annotation, sig.return_annotation) + +def getTypeFlatmapFunction(callable: FlatmapFunction) -> (Generic, Generic) : + sig = signature(callable) + if(len(sig.parameters) != 1): + raise Exception("the parameters for the FlatmapFunction are distinct than one, {}".format(str(sig.parameters))) + + keys = list(sig.parameters.keys()) + return (sig.parameters[keys[0]].annotation, sig.return_annotation) \ No newline at end of file
