dianfu commented on a change in pull request #13475:
URL: https://github.com/apache/flink/pull/13475#discussion_r494733674



##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -560,23 +560,52 @@ def encode_to_stream(self, iter_cols, out_stream, nested):
 
     def decode_from_stream(self, in_stream, nested):
         while in_stream.size() > 0:
-            yield self._decode_one_batch_from_stream(in_stream)
+            yield self._decode_one_batch_from_stream(in_stream, 
in_stream.read_var_int64())
 
     @staticmethod
     def _load_from_stream(stream):
         reader = pa.ipc.open_stream(stream)
         for batch in reader:
             yield batch
 
-    def _decode_one_batch_from_stream(self, in_stream: create_InputStream) -> 
List:
-        self._resettable_io.set_input_bytes(in_stream.read_all(True))
+    def _decode_one_batch_from_stream(self, in_stream: create_InputStream, 
size: int) -> List:
+        self._resettable_io.set_input_bytes(in_stream.read(size))
         # there is only one arrow batch in the underlying input stream
         return arrow_to_pandas(self._timezone, self._field_types, 
[next(self._batch_reader)])
 
     def __repr__(self):
         return 'ArrowCoderImpl[%s]' % self._schema
 
 
+class OverWindowArrowCoderImpl(StreamCoderImpl):
+    def __init__(self, arrow_coder):
+        self._arrow_coder = arrow_coder
+        self._int_coder = IntCoderImpl()
+
+    def encode_to_stream(self, value, stream, nested):
+        self._arrow_coder.encode_to_stream(value, stream, nested)
+
+    def decode_from_stream(self, in_stream, nested):
+        while in_stream.size():
+            all_size = in_stream.read_var_int64()

Review comment:
       ```suggestion
               remaining_size = in_stream.read_var_int64()
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []

Review comment:
       ```suggestion
           self.is_bounded_range_window = []
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []
+        window_types = flink_fn_execution_pb2.OverWindow
+
+        bounded_range_window_nums = 0
+        for i, window in enumerate(self.windows):
+            window_type = window.window_type
+            if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
+                    window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
+                    window_type is window_types.RANGE_SLIDING):
+                self.window_to_bounded_range_window_index[i] = 
bounded_range_window_nums
+                self.window_is_bounded_range_type.append(True)
+                bounded_range_window_nums += 1
+            else:
+                self.window_is_bounded_range_type.append(False)
+
+    def generate_func(self, udfs):
+        user_defined_funcs = []
+        self.window_indexes = []
+        self.mapper = []
+        for udf in udfs:
+            pandas_agg_function, variable_dict, user_defined_func, 
window_index = \
+                operation_utils.extract_over_window_user_defined_function(udf)
+            user_defined_funcs.extend(user_defined_func)
+            self.window_indexes.append(window_index)
+            self.mapper.append(eval('lambda value: %s' % pandas_agg_function, 
variable_dict))
+        return self.wrap_over_window_function, user_defined_funcs
+
+    def wrap_over_window_function(self, it):
+        import pandas as pd
+        window_types = flink_fn_execution_pb2.OverWindow
+        for boundaries_series in it:
+            series = boundaries_series[-1]
+            # the row number of the arrow format data
+            rows_count = len(series[0])

Review comment:
       ```suggestion
               input_cnt = len(series[0])
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]

Review comment:
       ```suggestion
           self.bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []
+        window_types = flink_fn_execution_pb2.OverWindow
+
+        bounded_range_window_nums = 0
+        for i, window in enumerate(self.windows):
+            window_type = window.window_type
+            if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
+                    window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
+                    window_type is window_types.RANGE_SLIDING):
+                self.window_to_bounded_range_window_index[i] = 
bounded_range_window_nums
+                self.window_is_bounded_range_type.append(True)
+                bounded_range_window_nums += 1
+            else:
+                self.window_is_bounded_range_type.append(False)
+
+    def generate_func(self, udfs):
+        user_defined_funcs = []
+        self.window_indexes = []
+        self.mapper = []
+        for udf in udfs:
+            pandas_agg_function, variable_dict, user_defined_func, 
window_index = \
+                operation_utils.extract_over_window_user_defined_function(udf)
+            user_defined_funcs.extend(user_defined_func)
+            self.window_indexes.append(window_index)
+            self.mapper.append(eval('lambda value: %s' % pandas_agg_function, 
variable_dict))
+        return self.wrap_over_window_function, user_defined_funcs
+
+    def wrap_over_window_function(self, it):
+        import pandas as pd
+        window_types = flink_fn_execution_pb2.OverWindow

Review comment:
       ```suggestion
           OverWindow = flink_fn_execution_pb2.OverWindow
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []
+        window_types = flink_fn_execution_pb2.OverWindow
+
+        bounded_range_window_nums = 0
+        for i, window in enumerate(self.windows):
+            window_type = window.window_type
+            if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
+                    window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
+                    window_type is window_types.RANGE_SLIDING):
+                self.window_to_bounded_range_window_index[i] = 
bounded_range_window_nums
+                self.window_is_bounded_range_type.append(True)
+                bounded_range_window_nums += 1
+            else:
+                self.window_is_bounded_range_type.append(False)
+
+    def generate_func(self, udfs):
+        user_defined_funcs = []
+        self.window_indexes = []
+        self.mapper = []
+        for udf in udfs:
+            pandas_agg_function, variable_dict, user_defined_func, 
window_index = \
+                operation_utils.extract_over_window_user_defined_function(udf)
+            user_defined_funcs.extend(user_defined_func)
+            self.window_indexes.append(window_index)
+            self.mapper.append(eval('lambda value: %s' % pandas_agg_function, 
variable_dict))
+        return self.wrap_over_window_function, user_defined_funcs
+
+    def wrap_over_window_function(self, it):

Review comment:
       ```suggestion
       def wrapped_over_window_function(self, it):
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window

Review comment:
       ```suggestion
           # the index among all the bounded range over window
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecOverAggregateRule.scala
##########
@@ -101,17 +103,53 @@ class BatchExecOverAggregateRule
         groupToAggCallToAggFunction.flatMap(_._2).map(_._1))
 
       val providedTraitSet = 
call.getPlanner.emptyTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
-      overWindowAgg = new BatchExecOverAggregate(logicWindow.getCluster, 
call.builder(),
-        providedTraitSet,
-        newInput,
-        outputRowType,
-        newInput.getRowType,
-        groupSet,
-        orderKeyIndexes,
-        orders,
-        nullIsLasts,
-        groupToAggCallToAggFunction,
-        logicWindow)
+      // TODO: split pandas udaf, general python udaf, java/scala udaf into 
different node

Review comment:
       What about adding a separate rule for Python?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -172,6 +172,113 @@ def generate_func(self, udfs):
         return lambda it: map(mapper, it), user_defined_funcs
 
 
+class 
PandasBatchOverWindowAggregateFunctionOperation(StatelessFunctionOperation):
+    def __init__(self, name, spec, counter_factory, sampler, consumers):
+        super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(
+            name, spec, counter_factory, sampler, consumers)
+        self.windows = [window for window in self.spec.serialized_fn.windows]
+        # Set a serial number for each over window to indicate which bounded 
range over window
+        # it is.
+        self.window_to_bounded_range_window_index = [-1 for _ in 
range(len(self.windows))]
+        # Whether the specified position window is a bounded range window.
+        self.window_is_bounded_range_type = []
+        window_types = flink_fn_execution_pb2.OverWindow
+
+        bounded_range_window_nums = 0
+        for i, window in enumerate(self.windows):
+            window_type = window.window_type
+            if (window_type is window_types.RANGE_UNBOUNDED_PRECEDING) or (
+                    window_type is window_types.RANGE_UNBOUNDED_FOLLOWING) or (
+                    window_type is window_types.RANGE_SLIDING):
+                self.window_to_bounded_range_window_index[i] = 
bounded_range_window_nums
+                self.window_is_bounded_range_type.append(True)
+                bounded_range_window_nums += 1
+            else:
+                self.window_is_bounded_range_type.append(False)
+
+    def generate_func(self, udfs):
+        user_defined_funcs = []
+        self.window_indexes = []
+        self.mapper = []
+        for udf in udfs:
+            pandas_agg_function, variable_dict, user_defined_func, 
window_index = \
+                operation_utils.extract_over_window_user_defined_function(udf)
+            user_defined_funcs.extend(user_defined_func)
+            self.window_indexes.append(window_index)
+            self.mapper.append(eval('lambda value: %s' % pandas_agg_function, 
variable_dict))
+        return self.wrap_over_window_function, user_defined_funcs
+
+    def wrap_over_window_function(self, it):
+        import pandas as pd
+        window_types = flink_fn_execution_pb2.OverWindow
+        for boundaries_series in it:
+            series = boundaries_series[-1]

Review comment:
       ```suggestion
               input_series = boundaries_series[-1]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to