SteNicholas commented on a change in pull request #13986:
URL: https://github.com/apache/flink/pull/13986#discussion_r519525532



##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>
+       implements ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * A user specified watermarkStrategy.
+        */
+       private final WatermarkStrategy<IN> watermarkStrategy;
+
+       /**
+        * The TypeInformation of python worker input data.
+        */
+       private final TypeInformation runnerInputTypeInfo;
+
+       /**
+        * The TypeInformation of python worker output data.
+        */
+       private final TypeInformation runnerOutputTypeInfo;
+
+       /**
+        * Serializer to serialize input data for python worker.
+        */
+       private transient TypeSerializer runnerInputSerializer;
+
+       /**
+        * Serializer to deserialize output data from python worker.
+        */
+       private transient TypeSerializer runnerOutputSerializer;
+
+       /** The watermark generator, initialized during runtime. */
+       private transient WatermarkGenerator<IN> watermarkGenerator;
+
+       /** The watermark output gateway, initialized during runtime. */
+       private transient WatermarkOutput watermarkOutput;
+
+       /** The interval (in milliseconds) for periodic watermark probes. 
Initialized during runtime. */
+       private transient long watermarkInterval;
+
+       /**
+        * Reusable row for normal data runner inputs.
+        */
+       private transient Row resuableInput;
+
+       /**
+        * Reusable StreamRecord for data with new timestamp calculated in 
TimestampAssigner.
+        */
+       private transient StreamRecord<IN> reusableStreamRecord;
+

Review comment:
       This misses the `emitProgressiveWatermarks` flag, which is whether to 
emit intermediate watermarks or only one final watermark at the end of input.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to