github-actions[bot] opened a new issue, #438: URL: https://github.com/apache/incubator-wayang/issues/438
key should be given by "udf" UDF specifies reducer function https://github.com/apache/incubator-wayang/blob/c25c6561bf786d76d0dc717b28cf15885c269232/python/old_code/pywayang/src/pywy/orchestrator/dataquanta.py#L104 ```python # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from pywy.graph.graph import Graph from pywy.graph.traversal import Traversal from pywy.protobuf.planwriter import MessageWriter from pywy.orchestrator.operator import Operator import pywy.orchestrator.operator import itertools import collections import logging from functools import reduce # Wraps a Source operation to create an iterable class DataQuantaBuilder: def __init__(self, descriptor): self.descriptor = descriptor def source(self, source): if type(source) is str: source_ori = open(source, "r") else: source_ori = source return DataQuanta( Operator( operator_type="source", udf=source, iterator=iter(source_ori), previous=[], python_exec=False ), descriptor=self.descriptor ) # Wraps an operation over an iterable class DataQuanta: def __init__(self, operator=None, descriptor=None): self.operator = operator self.descriptor = descriptor if self.operator.is_source(): self.descriptor.add_source(self.operator) if self.operator.is_sink(): self.descriptor.add_sink(self.operator) # Operational Functions def filter(self, udf): def func(iterator): return filter(udf, iterator) return DataQuanta( Operator( operator_type="filter", udf=func, previous=[self.operator], python_exec=True ), descriptor=self.descriptor ) def flatmap(self, udf): def auxfunc(iterator): return itertools.chain.from_iterable(map(udf, iterator)) def func(iterator): mapped = map(udf, iterator) flattened = flatten_single_dim(mapped) yield from flattened def flatten_single_dim(mapped): for item in mapped: for subitem in item: yield subitem return DataQuanta( Operator( operator_type="flatmap", udf=func, previous=[self.operator], python_exec=True ), descriptor=self.descriptor ) def group_by(self, udf): def func(iterator): # TODO key should be given by "udf" return itertools.groupby(iterator, key=operator.itemgetter(0)) #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0)) return DataQuanta( Operator( operator_type="group_by", udf=func, previous=[self.operator], python_exec=True ), descriptor=self.descriptor ) def map(self, udf): def func(iterator): return map(udf, iterator) return DataQuanta( Operator( operator_type="map", udf=func, previous=[self.operator], python_exec=True ), descriptor=self.descriptor ) # Key specifies pivot dimensions # UDF specifies reducer function def reduce_by_key(self, keys, udf): op = Operator( operator_type="reduce_by_key", udf=udf, previous=[self.operator], python_exec=False ) print(len(keys), keys) for i in range(0, len(keys)): """if keys[i] is int: op.set_parameter("vector_position|"+str(i), keys[i]) else: op.set_parameter("dimension_key|"+str(i), keys[i])""" # TODO maybe would be better just leave the number as key op.set_parameter("dimension|"+str(i+1), keys[i]) return DataQuanta( op, descriptor=self.descriptor ) def reduce(self, udf): def func(iterator): return reduce(udf, iterator) return DataQuanta( Operator( operator_type="reduce", udf=func, previous=[self.operator], python_exec=True ), descriptor=self.descriptor ) def sink(self, path, end="\n"): def consume(iterator): with open(path, 'w') as f: for x in iterator: f.write(str(x) + end) def func(iterator): consume(iterator) # return self.__run(consume) return DataQuanta( Operator( operator_type="sink", udf=path, # To execute directly uncomment # udf=func, previous=[self.operator], python_exec=False ), descriptor=self.descriptor ) def sort(self, udf): def func(iterator): return sorted(iterator, key=udf) return DataQuanta( Operator( operator_type="sort", udf=func, previous=[self.operator], python_exec=True ), descriptor=self.descriptor ) # This function allow the union to be performed by Python # Nevertheless, current configuration runs it over Java def union(self, other): def func(iterator): return itertools.chain(iterator, other.operator.getIterator()) return DataQuanta( Operator( operator_type="union", udf=func, previous=[self.operator, other.operator], python_exec=False ), descriptor=self.descriptor ) def __run(self, consumer): consumer(self.operator.getIterator()) # Execution Functions def console(self, end="\n"): def consume(iterator): for x in iterator: print(x, end=end) self.__run(consume) # Only for debugging purposes! # To execute the plan directly in the program driver def execute(self): logging.warn("DEBUG Execution") logging.info("Reminder to swap SINK UDF value from path to func") logging.debug(self.operator.previous[0].operator_type) if self.operator.is_sink(): logging.debug(self.operator.operator_type) logging.debug(self.operator.udf) logging.debug(len(self.operator.previous)) self.operator.udf(self.operator.previous[0].getIterator()) else: logging.error("Plan must call execute from SINK type of operator") raise RuntimeError # Converts Python Functional Plan to valid Wayang Plan def to_wayang_plan(self): sinks = self.descriptor.get_sinks() if len(sinks) == 0: return graph = Graph() graph.populate(self.descriptor.get_sinks()) # Uncomment to check the Graph built # graph.print_adjlist() # Function to be consumed by Traverse # Separates Python Plan into a List of Pipelines def define_pipelines(node1, current_pipeline, collection): def store_unique(pipe_to_insert): for pipe in collection: if equivalent_lists(pipe, pipe_to_insert): return collection.append(pipe_to_insert) def equivalent_lists(l1, l2): if collections.Counter(l1) == collections.Counter(l2): return True else: return False if not current_pipeline: current_pipeline = [node1] elif node1.operator.is_boundary(): store_unique(current_pipeline.copy()) current_pipeline.clear() current_pipeline.append(node1) else: current_pipeline.append(node1) if node1.operator.sink: store_unique(current_pipeline.copy()) current_pipeline.clear() return current_pipeline # Works over the graph trans = Traversal( graph=graph, origin=self.descriptor.get_sources(), # udf=lambda x, y, z: d(x, y, z) # UDF always will receive: # x: a Node object, # y: an object representing the result of the last iteration, # z: a collection to store final results inside your UDF udf=lambda x, y, z: define_pipelines(x, y, z) ) # Gets the results of the traverse process collected_stages = trans.get_collected_data() # Passing the Stages to a Wayang message writer writer = MessageWriter() a = 0 # Stage is composed of class Node objects for stage in collected_stages: a += 1 logging.info("///") logging.info("stage" + str(a)) writer.process_pipeline(stage) writer.set_dependencies() # Uses a file to provide the plan # writer.write_message(self.descriptor) # Send the plan to Wayang REST api directly writer.send_message(self.descriptor) ``` c1f2ed0fa73244efb915223a8c6334a93055dc1c -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
