HuangXingBo commented on a change in pull request #15416:
URL: https://github.com/apache/flink/pull/15416#discussion_r603748023



##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
##########
@@ -161,23 +164,26 @@ public void open() throws Exception {
                 
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
                         runnerOutputTypeInfo);
 
-        InternalTimerService<VoidNamespace> internalTimerService =
-                getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
-        timerService = new SimpleTimerService(internalTimerService);
-        reusableInput = new Row(5);
-        reusableTimerData = new Row(5);
+        TypeSerializer timerServiceNamespaceSerializer =

Review comment:
       We don't need this variable

##########
File path: flink-python/pyflink/fn_execution/utils/output_factory.py
##########
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from enum import Enum
+from io import BytesIO
+
+from typing import List, Tuple, Iterable
+
+from pyflink.common import Row
+from pyflink.common.serializer import TypeSerializer, VoidNamespaceSerializer
+from pyflink.datastream.timerservice import InternalTimer
+from pyflink.fn_execution.timerservice_impl import TimerOperandType
+
+
+class RunnerOutputType(Enum):
+    NORMAL_RECORD = 0
+    TIMER_OPERATION = 1
+
+
+class RowWithTimerOutputFactory(object):
+
+    def __init__(self, namespace_serializer: TypeSerializer = None):

Review comment:
       we can directly pass the `VoidNamespaceSerializer` and then we remove 
the following code about deciding whether `namespace_serializer` is None

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -137,9 +138,10 @@ public void open() throws Exception {
         this.runnerOutputHandler =
                 new OutputWithTimerRowHandler(
                         getKeyedStateBackend(),
-                        timerService,
+                        internalTimerService,

Review comment:
       Maybe we can remove `timerService` which has type of 
`SimpleTimerService` and use `internalTimerService` for all
   

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
##########
@@ -161,23 +164,26 @@ public void open() throws Exception {
                 
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
                         runnerOutputTypeInfo);
 
-        InternalTimerService<VoidNamespace> internalTimerService =
-                getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
-        timerService = new SimpleTimerService(internalTimerService);
-        reusableInput = new Row(5);
-        reusableTimerData = new Row(5);
+        TypeSerializer timerServiceNamespaceSerializer =
+                namespaceSerializer == null
+                        ? VoidNamespaceSerializer.INSTANCE
+                        : namespaceSerializer;
+        internalTimerService =
+                getInternalTimerService("user-timers", 
timerServiceNamespaceSerializer, this);

Review comment:
       ```suggestion
                   getInternalTimerService("user-timers", namespaceSerializer, 
this);
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/utils/input/KeyedTwoInputWithTimerRowFactory.java
##########
@@ -21,27 +21,24 @@
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.types.Row;
 
 /**
  * This factory produces runner input row for two input operators which need 
to send the timer
  * trigger event to python side.
  */
-public class KeyedTwoInputWithTimerRowFactory {
-
-    private final Row reuseRunnerInput;
+public class KeyedTwoInputWithTimerRowFactory extends 
KeyedInputWithTimerRowFactory {

Review comment:
       In my opinion, this inheritance structure seems very strange. We may be 
able to extract a base class, and then `KeyedInputWithTimerRowFactory` and 
`KeyedTwoInputWithTimerRowFactory` to inherit it
   

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
##########
@@ -135,20 +130,28 @@ public PythonKeyedProcessOperator(
             RowTypeInfo inputTypeInfo,
             TypeInformation<OUT> outputTypeInfo,
             DataStreamPythonFunctionInfo pythonFunctionInfo) {
+        this(config, inputTypeInfo, outputTypeInfo, pythonFunctionInfo, null);

Review comment:
       ```suggestion
           this(config, inputTypeInfo, outputTypeInfo, pythonFunctionInfo, 
VoidNamespaceSerializer.INSTANCE);
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/OutputWithTimerRowHandler.java
##########
@@ -20,58 +20,87 @@
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.KeyContext;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.types.Row;
 
+import java.io.IOException;
+
 /** This handler can accepts the runner output which contains timer 
registration event. */
 public class OutputWithTimerRowHandler {
 
     private final KeyedStateBackend<Row> keyedStateBackend;
-    private final TimerService timerService;
+    private final InternalTimerService internalTimerService;
     private final TimestampedCollector collector;
     private final KeyContext keyContext;
+    private final TypeSerializer namespaceSerializer;
+    private final ByteArrayInputStreamWithPos bais;
+    private final DataInputViewStreamWrapper baisWrapper;
 
     public OutputWithTimerRowHandler(
             KeyedStateBackend<Row> keyedStateBackend,
-            TimerService timerService,
+            InternalTimerService internalTimerService,
             TimestampedCollector collector,
-            KeyContext keyContext) {
+            KeyContext keyContext,
+            TypeSerializer namespaceSerializer) {
         this.keyedStateBackend = keyedStateBackend;
-        this.timerService = timerService;
+        this.internalTimerService = internalTimerService;
         this.collector = collector;
         this.keyContext = keyContext;
+        this.namespaceSerializer = namespaceSerializer;
+        this.bais = new ByteArrayInputStreamWithPos();
+        this.baisWrapper = new DataInputViewStreamWrapper(bais);
     }
 
-    public void accept(Row runnerOutput, long timestamp) {
-        if (runnerOutput.getField(0) == null) {
-            // null represents normal data
-            onData(timestamp, runnerOutput.getField(3));
-        } else {
-            TimerOperandType operandType =
-                    TimerOperandType.valueOf((byte) runnerOutput.getField(0));
-            onTimerOperation(
-                    operandType, (long) runnerOutput.getField(1), (Row) 
runnerOutput.getField(2));
+    public void accept(Row runnerOutput, long timestamp) throws IOException {
+        switch (RunnerOutputType.valueOf((byte) runnerOutput.getField(0))) {
+            case NORMAL_RECORD:
+                onData(timestamp, runnerOutput.getField(1));
+                break;
+            case TIMER_OPERATION:
+                Row timerData = (Row) runnerOutput.getField(2);
+                assert timerData != null;
+
+                TimerOperandType operandType =
+                        TimerOperandType.valueOf((byte) timerData.getField(0));
+                Row key = (Row) timerData.getField(1);
+                long time = (long) timerData.getField(2);
+                byte[] encodedNamespace = (byte[]) timerData.getField(3);
+                assert encodedNamespace != null;
+
+                bais.setBuffer(encodedNamespace, 0, encodedNamespace.length);
+                Object namespace;
+                if (namespaceSerializer != null) {
+                    namespace = namespaceSerializer.deserialize(baisWrapper);
+                } else {
+                    namespace = VoidNamespace.INSTANCE;
+                }

Review comment:
       ```suggestion
                   if (namespaceSerializer == VoidNamespaceSerializer.INSTANCE) 
{
                       namespace = VoidNamespace.INSTANCE;
                   } else {
                       namespace = namespaceSerializer.deserialize(baisWrapper);
                   }
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -137,9 +138,10 @@ public void open() throws Exception {
         this.runnerOutputHandler =
                 new OutputWithTimerRowHandler(
                         getKeyedStateBackend(),
-                        timerService,
+                        internalTimerService,
                         new TimestampedCollector<>(output),
-                        this);
+                        this,
+                        null);

Review comment:
       ```suggestion
                           VoidNamespaceSerializer.INSTANCE);
   ```

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1007,7 +1007,7 @@ def to_pandas(self):
                 self.get_schema().to_row_data_type(),
                 timezone)
             import pyarrow as pa
-            table = 
pa.Table.from_batches(serializer.load_from_iterator(batches))
+            table = 
pa.Table.from_batches(serializer.load_from_iterable(batches))

Review comment:
       `batches` is an iterator, I think it should not be changed

##########
File path: flink-python/pyflink/common/serializer.py
##########
@@ -0,0 +1,87 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import abstractmethod, ABC
+from io import BytesIO
+from typing import TypeVar, Generic
+
+T = TypeVar('T')
+
+
+class TypeSerializer(ABC, Generic[T]):
+
+    # Note: our notion of "equality" is that output generated by
+    # equal serializers can be deserialized using the same serializer.
+
+    # This default implementation handles the simple cases;
+    # subclasses should override __eq__ as appropriate.
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __repr__(self):
+        return "%s()" % self.__class__.__name__
+
+    def __hash__(self):
+        return hash(str(self))
+
+    @abstractmethod
+    def serialize(self, element: T, stream: BytesIO) -> None:
+        """
+        Serializes an iterator of objects to the output stream.
+        """
+        pass
+
+    @abstractmethod
+    def deserialize(self, stream: BytesIO) -> T:
+        """
+        Returns an iterator of deserialized objects from the input stream.

Review comment:
       ditto

##########
File path: flink-python/pyflink/fn_execution/utils/input_handler.py
##########
@@ -124,12 +187,16 @@ def on_normal_record(self, unified_input, timestamp: int) 
-> Iterable:
         yield from self._emit_output(output_result)
 
     def on_event_time(
-            self, key, timestamp: int, serialized_namespace: bytes) -> 
Iterable:
-        yield from self._on_timer(TimeDomain.EVENT_TIME, key, timestamp)
+            self, internal_timer: InternalTimer) -> Iterable:
+        yield from self._on_timer(TimeDomain.EVENT_TIME,
+                                  internal_timer.get_key(),
+                                  internal_timer.get_timestamp())
 
     def on_processing_time(
-            self, key, timestamp: int, serialized_namespace: bytes) -> 
Iterable:
-        yield from self._on_timer(TimeDomain.PROCESSING_TIME, key, timestamp)
+            self, internal_timer: InternalTimer) -> Iterable:

Review comment:
       ditto

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -916,3 +928,206 @@ def on_timer(self, timestamp: int, ctx: 
'KeyedCoProcessFunction.OnTimerContext')
                     invocation of this method, do not store it.
         """
         pass
+
+
+class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
+    """
+    Base interface for functions that are evaluated over keyed (grouped) 
windows.
+    """
+
+    @abstractmethod
+    def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> 
Iterable[OUT]:
+        """
+        Evaluates the window and outputs none or several elements.
+
+        :param key: The key for which this window is evaluated.
+        :param window: The window that is being evaluated.
+        :param inputs: The elements in the window being evaluated.
+        :param out: A collector for emitting elements.

Review comment:
       remove this line?

##########
File path: flink-python/pyflink/common/serializer.py
##########
@@ -0,0 +1,87 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import abstractmethod, ABC
+from io import BytesIO
+from typing import TypeVar, Generic
+
+T = TypeVar('T')
+
+
+class TypeSerializer(ABC, Generic[T]):
+
+    # Note: our notion of "equality" is that output generated by
+    # equal serializers can be deserialized using the same serializer.
+
+    # This default implementation handles the simple cases;
+    # subclasses should override __eq__ as appropriate.
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __repr__(self):
+        return "%s()" % self.__class__.__name__
+
+    def __hash__(self):
+        return hash(str(self))
+
+    @abstractmethod
+    def serialize(self, element: T, stream: BytesIO) -> None:
+        """
+        Serializes an iterator of objects to the output stream.

Review comment:
       need to change the comment

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
##########
@@ -789,10 +793,29 @@ public static IterateType fromOrd(byte ord) {
                 BeamFnApi.StateRequest request) throws Exception {
 
             ListState<byte[]> partitionedState = getListState(request);
-            // get values
-            byte[] valueBytes = request.getAppend().getData().toByteArray();
-            partitionedState.add(valueBytes);
-
+            if (request.getStateKey()
+                    .getBagUserState()
+                    .getTransformId()
+                    .equals(MERGE_NAMESPACES_MARK)) {
+                // get namespaces to merge
+                byte[] namespacesBytes = 
request.getAppend().getData().toByteArray();
+                bais.setBuffer(namespacesBytes, 0, namespacesBytes.length);
+                int namespaceCount = baisWrapper.readInt();
+                Set namespaces = new HashSet();

Review comment:
       ```suggestion
                   Set<Object> namespaces = new HashSet<>();
   ```

##########
File path: flink-python/pyflink/fn_execution/utils/input_handler.py
##########
@@ -62,47 +69,103 @@ def process_element(self, operation_input):
 
         self.advance_watermark(watermark)
         if input_type == RunnerInputType.NORMAL_RECORD.value:
-            yield from self.on_normal_record(normal_data, timestamp)
+            yield from self.process_element(normal_data, timestamp)
         elif input_type == RunnerInputType.TRIGGER_TIMER.value:
             timer_type = timer_data[0]
             key = timer_data[1]
             serialized_namespace = timer_data[2]
+            if self._namespace_coder is not None:
+                namespace = 
self._namespace_coder.decode_nested(serialized_namespace)
+            else:
+                namespace = None
+            internal_timer = InternalTimerImpl(timestamp, key, namespace)
             if timer_type == TimerType.EVENT_TIME.value:
-                yield from self.on_event_time(key, timestamp, 
serialized_namespace)
+                yield from self.on_event_time(internal_timer)
             elif timer_type == TimerType.PROCESSING_TIME.value:
-                yield from self.on_processing_time(key, timestamp, 
serialized_namespace)
+                yield from self.on_processing_time(internal_timer)
             else:
                 raise Exception("Unsupported timer type: %d" % timer_type)
         else:
             raise Exception("Unsupported input type: %d" % input_type)
 
 
-class KeyedTwoInputTimerRowHandler(TimerRowInputHandler):
+class OneInputRowWithTimerHandler(RowWithTimerInputHandler):
+
+    def __init__(self,
+                 internal_timer_service: InternalTimerServiceImpl,
+                 state_backend: RemoteKeyedStateBackend,
+                 key_selector_func,
+                 process_element_func,
+                 on_event_time_func,
+                 on_processing_time_func,
+                 output_factory: RowWithTimerOutputFactory):
+        super(OneInputRowWithTimerHandler, 
self).__init__(state_backend._namespace_coder_impl)
+        self._internal_timer_service = internal_timer_service
+        self._keyed_state_backend = state_backend
+        self._key_selector_func = key_selector_func
+        self._process_element_func = process_element_func
+        self._on_event_time_func = on_event_time_func
+        self._on_processing_time_func = on_processing_time_func
+        self._output_factory = output_factory
+
+    def advance_watermark(self, watermark) -> None:
+        self._internal_timer_service.advance_watermark(watermark)
+
+    def process_element(self, normal_data, timestamp: int) -> Iterable:
+        
self._keyed_state_backend.set_current_key(self._key_selector_func(normal_data))
+        output_result = self._process_element_func(normal_data, timestamp)
+        yield from self._emit_output(output_result)
+
+    def on_event_time(
+            self, internal_timer: InternalTimer) -> Iterable:
+        self._keyed_state_backend.set_current_key(internal_timer.get_key())
+        output_result = self._on_event_time_func(internal_timer)
+        yield from self._emit_output(output_result)
+
+    def on_processing_time(
+            self, internal_timer: InternalTimer) -> Iterable:
+        self._keyed_state_backend.set_current_key(internal_timer.get_key())
+        output_result = self._on_processing_time_func(internal_timer)
+        yield from self._emit_output(output_result)
+
+    def _emit_output(self, output_result):
+        if output_result:
+            yield from self._output_factory.from_normal_data(output_result)
+
+        yield from 
self._output_factory.from_timers(self._internal_timer_service.timers)
+
+        self._internal_timer_service.timers.clear()
+
+
+class TwoInputRowWithTimerHandler(RowWithTimerInputHandler):
 
     def __init__(self,
                  context,
                  timer_context,
-                 internal_collector,
+                 timer_service,
                  state_backend,
-                 keyed_co_process_function):
+                 keyed_co_process_function,
+                 output_factory):
         """
         :type context: InternalKeyedProcessFunctionContext
         :type timer_context: InternalKeyedProcessFunctionOnTimerContext
-        :type internal_collector: InternalCollector
+        :type internal_collector: TimerServiceImpl

Review comment:
       ```suggestion
           :type timer_service: TimerServiceImpl
   ```

##########
File path: flink-python/pyflink/fn_execution/utils/input_handler.py
##########
@@ -124,12 +187,16 @@ def on_normal_record(self, unified_input, timestamp: int) 
-> Iterable:
         yield from self._emit_output(output_result)
 
     def on_event_time(
-            self, key, timestamp: int, serialized_namespace: bytes) -> 
Iterable:
-        yield from self._on_timer(TimeDomain.EVENT_TIME, key, timestamp)
+            self, internal_timer: InternalTimer) -> Iterable:

Review comment:
       in one line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to