dianfu commented on a change in pull request #13475:
URL: https://github.com/apache/flink/pull/13475#discussion_r494733674



##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -560,23 +560,52 @@ def encode_to_stream(self, iter_cols, out_stream, nested):
 
     def decode_from_stream(self, in_stream, nested):
         while in_stream.size() > 0:
-            yield self._decode_one_batch_from_stream(in_stream)
+            yield self._decode_one_batch_from_stream(in_stream, 
in_stream.read_var_int64())
 
     @staticmethod
     def _load_from_stream(stream):
         reader = pa.ipc.open_stream(stream)
         for batch in reader:
             yield batch
 
-    def _decode_one_batch_from_stream(self, in_stream: create_InputStream) -> 
List:
-        self._resettable_io.set_input_bytes(in_stream.read_all(True))
+    def _decode_one_batch_from_stream(self, in_stream: create_InputStream, 
size: int) -> List:
+        self._resettable_io.set_input_bytes(in_stream.read(size))
         # there is only one arrow batch in the underlying input stream
         return arrow_to_pandas(self._timezone, self._field_types, 
[next(self._batch_reader)])
 
     def __repr__(self):
         return 'ArrowCoderImpl[%s]' % self._schema
 
 
+class OverWindowArrowCoderImpl(StreamCoderImpl):
+    def __init__(self, arrow_coder):
+        self._arrow_coder = arrow_coder
+        self._int_coder = IntCoderImpl()
+
+    def encode_to_stream(self, value, stream, nested):
+        self._arrow_coder.encode_to_stream(value, stream, nested)
+
+    def decode_from_stream(self, in_stream, nested):
+        while in_stream.size():
+            all_size = in_stream.read_var_int64()

Review comment:
       ```suggestion
               remaining_size = in_stream.read_var_int64()
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []

Review comment:
       ```suggestion
           self.is_bounded_range_window = []
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []
+        window_types = flink_fn_execution_pb2.OverWindow
+
+        bounded_range_window_nums = 0
+        for i, window in enumerate(self.windows):
+            window_type = window.window_type
+            if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
+                    window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
+                    window_type is window_types.RANGE_SLIDING):
+                self.window_to_bounded_range_window_index[i] = 
bounded_range_window_nums
+                self.window_is_bounded_range_type.append(True)
+                bounded_range_window_nums += 1
+            else:
+                self.window_is_bounded_range_type.append(False)
+
+    def generate_func(self, udfs):
+        user_defined_funcs = []
+        self.window_indexes = []
+        self.mapper = []
+        for udf in udfs:
+            pandas_agg_function, variable_dict, user_defined_func, 
window_index = \
+                operation_utils.extract_over_window_user_defined_function(udf)
+            user_defined_funcs.extend(user_defined_func)
+            self.window_indexes.append(window_index)
+            self.mapper.append(eval('lambda value: %s' % pandas_agg_function, 
variable_dict))
+        return self.wrap_over_window_function, user_defined_funcs
+
+    def wrap_over_window_function(self, it):
+        import pandas as pd
+        window_types = flink_fn_execution_pb2.OverWindow
+        for boundaries_series in it:
+            series = boundaries_series[-1]
+            # the row number of the arrow format data
+            rows_count = len(series[0])

Review comment:
       ```suggestion
               input_cnt = len(series[0])
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]

Review comment:
       ```suggestion
           self.bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []
+        window_types = flink_fn_execution_pb2.OverWindow
+
+        bounded_range_window_nums = 0
+        for i, window in enumerate(self.windows):
+            window_type = window.window_type
+            if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
+                    window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
+                    window_type is window_types.RANGE_SLIDING):
+                self.window_to_bounded_range_window_index[i] = 
bounded_range_window_nums
+                self.window_is_bounded_range_type.append(True)
+                bounded_range_window_nums += 1
+            else:
+                self.window_is_bounded_range_type.append(False)
+
+    def generate_func(self, udfs):
+        user_defined_funcs = []
+        self.window_indexes = []
+        self.mapper = []
+        for udf in udfs:
+            pandas_agg_function, variable_dict, user_defined_func, 
window_index = \
+                operation_utils.extract_over_window_user_defined_function(udf)
+            user_defined_funcs.extend(user_defined_func)
+            self.window_indexes.append(window_index)
+            self.mapper.append(eval('lambda value: %s' % pandas_agg_function, 
variable_dict))
+        return self.wrap_over_window_function, user_defined_funcs
+
+    def wrap_over_window_function(self, it):
+        import pandas as pd
+        window_types = flink_fn_execution_pb2.OverWindow

Review comment:
       ```suggestion
           OverWindow = flink_fn_execution_pb2.OverWindow
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []
+        window_types = flink_fn_execution_pb2.OverWindow
+
+        bounded_range_window_nums = 0
+        for i, window in enumerate(self.windows):
+            window_type = window.window_type
+            if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
+                    window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
+                    window_type is window_types.RANGE_SLIDING):
+                self.window_to_bounded_range_window_index[i] = 
bounded_range_window_nums
+                self.window_is_bounded_range_type.append(True)
+                bounded_range_window_nums += 1
+            else:
+                self.window_is_bounded_range_type.append(False)
+
+    def generate_func(self, udfs):
+        user_defined_funcs = []
+        self.window_indexes = []
+        self.mapper = []
+        for udf in udfs:
+            pandas_agg_function, variable_dict, user_defined_func, 
window_index = \
+                operation_utils.extract_over_window_user_defined_function(udf)
+            user_defined_funcs.extend(user_defined_func)
+            self.window_indexes.append(window_index)
+            self.mapper.append(eval('lambda value: %s' % pandas_agg_function, 
variable_dict))
+        return self.wrap_over_window_function, user_defined_funcs
+
+    def wrap_over_window_function(self, it):

Review comment:
       ```suggestion
       def wrapped_over_window_function(self, it):
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window

Review comment:
       ```suggestion
           # the index among all the bounded range over window
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecOverAggregateRule.scala
##########
@@ -101,17 +103,53 @@ class BatchExecOverAggregateRule
         groupToAggCallToAggFunction.flatMap(_._2).map(_._1))
 
       val providedTraitSet = 
call.getPlanner.emptyTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
-      overWindowAgg = new BatchExecOverAggregate(logicWindow.getCluster, 
call.builder(),
-        providedTraitSet,
-        newInput,
-        outputRowType,
-        newInput.getRowType,
-        groupSet,
-        orderKeyIndexes,
-        orders,
-        nullIsLasts,
-        groupToAggCallToAggFunction,
-        logicWindow)
+      // TODO: split pandas udaf, general python udaf, java/scala udaf into 
different node

Review comment:
       What about adding a separate rule for Python?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []
+        window_types = flink_fn_execution_pb2.OverWindow
+
+        bounded_range_window_nums = 0
+        for i, window in enumerate(self.windows):
+            window_type = window.window_type
+            if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
+                    window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
+                    window_type is window_types.RANGE_SLIDING):
+                self.window_to_bounded_range_window_index[i] = 
bounded_range_window_nums
+                self.window_is_bounded_range_type.append(True)
+                bounded_range_window_nums += 1
+            else:
+                self.window_is_bounded_range_type.append(False)
+
+    def generate_func(self, udfs):
+        user_defined_funcs = []
+        self.window_indexes = []
+        self.mapper = []
+        for udf in udfs:
+            pandas_agg_function, variable_dict, user_defined_func, 
window_index = \
+                operation_utils.extract_over_window_user_defined_function(udf)
+            user_defined_funcs.extend(user_defined_func)
+            self.window_indexes.append(window_index)
+            self.mapper.append(eval('lambda value: %s' % pandas_agg_function, 
variable_dict))
+        return self.wrap_over_window_function, user_defined_funcs
+
+    def wrap_over_window_function(self, it):
+        import pandas as pd
+        window_types = flink_fn_execution_pb2.OverWindow
+        for boundaries_series in it:
+            series = boundaries_series[-1]

Review comment:
       ```suggestion
               input_series = boundaries_series[-1]
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
##########
@@ -192,6 +194,108 @@ cdef class 
PandasAggregateFunctionOperation(BeamStatelessFunctionOperation):
         return generate_func, user_defined_funcs
 
 
+cdef class 
PandasBatchOverWindowAggregateFunctionOperation(BeamStatelessFunctionOperation):

Review comment:
       It seems to me that most code of beam_operations_fast and 
beam_operation_slow are the same. Is it possible to avoid the duplication? 
   
   PS: This is not an issue introduced in this PR. We could also address it in 
a separate if it's possible.

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
##########
@@ -192,6 +194,108 @@ cdef class 
PandasAggregateFunctionOperation(BeamStatelessFunctionOperation):
         return generate_func, user_defined_funcs
 
 
+cdef class 
PandasBatchOverWindowAggregateFunctionOperation(BeamStatelessFunctionOperation):

Review comment:
       It makes sense to create a separate PR for this purpose. Could you 
create a JIRA to track it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to