This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 1dc9faf [FLINK-25513][python] Handle properly for None result in flat_map and map of ConnectedStream 1dc9faf is described below commit 1dc9faf75907c9a6b4051335b5a3083802ea053a Author: atptour2017 <49615...@qq.com> AuthorDate: Thu Jan 6 13:45:02 2022 +0800 [FLINK-25513][python] Handle properly for None result in flat_map and map of ConnectedStream This closes #18280. --- flink-python/pyflink/datastream/data_stream.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py index 48e8113..68ed512 100644 --- a/flink-python/pyflink/datastream/data_stream.py +++ b/flink-python/pyflink/datastream/data_stream.py @@ -1303,10 +1303,14 @@ class ConnectedStreams(object): self._underlying.open(runtime_context) def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'): - yield self._underlying.map1(value) + result = self._underlying.map1(value) + if result is not None: + yield result def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'): - yield self._underlying.map2(value) + result = self._underlying.map2(value) + if result is not None: + yield result def close(self): self._underlying.close() @@ -1350,10 +1354,14 @@ class ConnectedStreams(object): self._underlying.open(runtime_context) def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'): - yield from self._underlying.flat_map1(value) + result = self._underlying.flat_map1(value) + if result: + yield from result def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'): - yield from self._underlying.flat_map2(value) + result = self._underlying.flat_map2(value) + if result: + yield from result def close(self): self._underlying.close()