This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ecc3429 [FLINK-18945][python] Support CoFlatMap for Python DataStream API (#13152) ecc3429 is described below commit ecc342904404c272939196cb9b30803cc518faa6 Author: Hequn Cheng <hequn....@alibaba-inc.com> AuthorDate: Sun Aug 16 21:12:04 2020 +0800 [FLINK-18945][python] Support CoFlatMap for Python DataStream API (#13152) --- flink-python/pyflink/datastream/data_stream.py | 38 ++++++++++++++-- flink-python/pyflink/datastream/functions.py | 53 +++++++++++++++++++++- .../pyflink/datastream/tests/test_data_stream.py | 44 +++++++++++++++++- .../pyflink/fn_execution/operation_utils.py | 8 ++++ 4 files changed, 136 insertions(+), 7 deletions(-) diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py index 17abb70..a5f9c1d 100644 --- a/flink-python/pyflink/datastream/data_stream.py +++ b/flink-python/pyflink/datastream/data_stream.py @@ -24,7 +24,7 @@ from pyflink.common.typeinfo import TypeInformation from pyflink.datastream.functions import _get_python_env, FlatMapFunctionWrapper, FlatMapFunction, \ MapFunction, MapFunctionWrapper, Function, FunctionWrapper, SinkFunction, FilterFunction, \ FilterFunctionWrapper, KeySelectorFunctionWrapper, KeySelector, ReduceFunction, \ - ReduceFunctionWrapper, CoMapFunction + ReduceFunctionWrapper, CoMapFunction, CoFlatMapFunction from pyflink.java_gateway import get_gateway @@ -799,7 +799,7 @@ class ConnectedStreams(object): self.stream1 = stream1 self.stream2 = stream2 - def map(self, func: CoMapFunction, type_info: TypeInformation = None) \ + def map(self, func: CoMapFunction, output_type: TypeInformation = None) \ -> 'DataStream': """ Applies a CoMap transformation on a `ConnectedStreams` and maps the output to a common @@ -819,8 +819,38 @@ class ConnectedStreams(object): j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream) from pyflink.fn_execution import flink_fn_execution_pb2 j_operator, j_output_type = self._get_connected_stream_operator( - func, type_info, func_name, flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP) - return DataStream(j_connected_stream.transform("Co-Process", j_output_type, j_operator)) + func, + output_type, + func_name, + flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP) + return DataStream(j_connected_stream.transform("Co-Map", j_output_type, j_operator)) + + def flat_map(self, func: CoFlatMapFunction, output_type: TypeInformation = None) \ + -> 'DataStream': + """ + Applies a CoFlatMap transformation on a `ConnectedStreams` and maps the output to a + common type. The transformation calls a `CoFlatMapFunction.flatMap1` for each element + of the first input and `CoFlatMapFunction.flatMap2` for each element of the second + input. Each CoFlatMapFunction call returns any number of elements including none. + + :param func: The CoFlatMapFunction used to jointly transform the two input DataStreams + :param output_type: `TypeInformation` for the result type of the function. + :return: The transformed `DataStream` + """ + + if not isinstance(func, CoFlatMapFunction): + raise TypeError("The input must be a CoFlatMapFunction!") + func_name = str(func) + + # get connected stream + j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream) + from pyflink.fn_execution import flink_fn_execution_pb2 + j_operator, j_output_type = self._get_connected_stream_operator( + func, + output_type, + func_name, + flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_FLAT_MAP) + return DataStream(j_connected_stream.transform("Co-Flat Map", j_output_type, j_operator)) def _get_connected_stream_operator(self, func: Union[Function, FunctionWrapper], type_info: TypeInformation, func_name: str, diff --git a/flink-python/pyflink/datastream/functions.py b/flink-python/pyflink/datastream/functions.py index 10433ee..76f5025 100644 --- a/flink-python/pyflink/datastream/functions.py +++ b/flink-python/pyflink/datastream/functions.py @@ -63,7 +63,7 @@ class CoMapFunction(Function): The same instance of the transformation function is used to transform both of the connected streams. That way, the stream transformations can share state. - The basic syntax for using a MapFunction is as follows: + The basic syntax for using a CoMapFunction is as follows: :: >>> ds1 = ... >>> ds2 = ... @@ -121,6 +121,57 @@ class FlatMapFunction(Function): pass +class CoFlatMapFunction(Function): + """ + A CoFlatMapFunction implements a flat-map transformation over two connected streams. + + The same instance of the transformation function is used to transform both of the + connected streams. That way, the stream transformations can share state. + + An example for the use of connected streams would be to apply rules that change over time + onto elements of a stream. One of the connected streams has the rules, the other stream the + elements to apply the rules to. The operation on the connected stream maintains the + current set of rules in the state. It may receive either a rule update (from the first stream) + and update the state, or a data element (from the second stream) and apply the rules in the + state to the element. The result of applying the rules would be emitted. + + The basic syntax for using a CoFlatMapFunction is as follows: + :: + >>> ds1 = ... + >>> ds2 = ... + + >>> class MyCoFlatMapFunction(CoFlatMapFunction): + >>> def flat_map1(self, value): + >>> for i in range(value): + >>> yield i + >>> def flat_map2(self, value): + >>> for i in range(value): + >>> yield i + + >>> new_ds = ds1.connect(ds2).flat_map(MyCoFlatMapFunction()) + """ + + @abc.abstractmethod + def flat_map1(self, value): + """ + This method is called for each element in the first of the connected streams. + + :param value: The input value. + :return: A genertaor + """ + pass + + @abc.abstractmethod + def flat_map2(self, value): + """ + This method is called for each element in the second of the connected streams. + + :param value: The input value. + :return: A genertaor + """ + pass + + class ReduceFunction(Function): """ Base interface for Reduce functions. Reduce functions combine groups of elements to a single diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py index d05b2e7..c6c3665 100644 --- a/flink-python/pyflink/datastream/tests/test_data_stream.py +++ b/flink-python/pyflink/datastream/tests/test_data_stream.py @@ -22,7 +22,7 @@ from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import FilterFunction from pyflink.datastream.functions import KeySelector from pyflink.datastream.functions import MapFunction, FlatMapFunction -from pyflink.datastream.functions import CoMapFunction +from pyflink.datastream.functions import CoMapFunction, CoFlatMapFunction from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction from pyflink.java_gateway import get_gateway from pyflink.testing.test_case_utils import PyFlinkTestCase @@ -126,7 +126,7 @@ class DataStreamTests(PyFlinkTestCase): type_info=Types.ROW([Types.INT(), Types.INT()])) ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")], type_info=Types.ROW([Types.STRING(), Types.STRING()])) - ds1.connect(ds2).map(MyCoMapFunction(), type_info=Types.STRING()).add_sink(self.test_sink) + ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.STRING()).add_sink(self.test_sink) self.env.execute('co_map_function_test') results = self.test_sink.get_results(False) expected = ['2', '3', '4', 'a', 'b', 'c'] @@ -178,6 +178,35 @@ class DataStreamTests(PyFlinkTestCase): expected.sort() self.assertEqual(expected, results) + def test_co_flat_map_function_without_data_types(self): + self.env.set_parallelism(1) + ds1 = self.env.from_collection([(1, 1), (2, 2), (3, 3)], + type_info=Types.ROW([Types.INT(), Types.INT()])) + ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")], + type_info=Types.ROW([Types.STRING(), Types.STRING()])) + ds1.connect(ds2).flat_map(MyCoFlatMapFunction()).add_sink(self.test_sink) + self.env.execute('co_flat_map_function_test') + results = self.test_sink.get_results(True) + expected = ['2', '2', '3', '3', '4', '4', 'b'] + expected.sort() + results.sort() + self.assertEqual(expected, results) + + def test_co_flat_map_function_with_data_types(self): + self.env.set_parallelism(1) + ds1 = self.env.from_collection([(1, 1), (2, 2), (3, 3)], + type_info=Types.ROW([Types.INT(), Types.INT()])) + ds2 = self.env.from_collection([("a", "a"), ("b", "b"), ("c", "c")], + type_info=Types.ROW([Types.STRING(), Types.STRING()])) + ds1.connect(ds2).flat_map(MyCoFlatMapFunction(), output_type=Types.STRING())\ + .add_sink(self.test_sink) + self.env.execute('co_flat_map_function_test') + results = self.test_sink.get_results(False) + expected = ['2', '2', '3', '3', '4', '4', 'b'] + expected.sort() + results.sort() + self.assertEqual(expected, results) + def test_filter_without_data_types(self): ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')]) ds.filter(MyFilterFunction()).add_sink(self.test_sink) @@ -469,3 +498,14 @@ class MyCoMapFunction(CoMapFunction): def map2(self, value): return value[0] + + +class MyCoFlatMapFunction(CoFlatMapFunction): + + def flat_map1(self, value): + yield str(value[0] + 1) + yield str(value[0] + 1) + + def flat_map2(self, value): + if value[0] == 'b': + yield value[0] diff --git a/flink-python/pyflink/fn_execution/operation_utils.py b/flink-python/pyflink/fn_execution/operation_utils.py index c361681..a85cd55 100644 --- a/flink-python/pyflink/fn_execution/operation_utils.py +++ b/flink-python/pyflink/fn_execution/operation_utils.py @@ -114,6 +114,14 @@ def extract_data_stream_stateless_funcs(udfs): def wrap_func(value): return co_map_func.map1(value[1]) if value[0] else co_map_func.map2(value[2]) func = wrap_func + elif func_type == udf.CO_FLAT_MAP: + co_flat_map_func = cloudpickle.loads(udfs[0].payload) + + def wrap_func(value): + return co_flat_map_func.flat_map1( + value[1]) if value[0] else co_flat_map_func.flat_map2( + value[2]) + func = wrap_func return func