dianfu commented on a change in pull request #13986:
URL: https://github.com/apache/flink/pull/13986#discussion_r519516510



##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -601,6 +602,50 @@ def test_basic_array_type_info(self):
         expected.sort()
         self.assertEqual(expected, results)
 
+    def test_timestamp_assigner_and_watermark_strategy(self):
+        self.env.set_parallelism(1)
+        self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+        data_stream = self.env.from_collection([(1, '1603708211000'),
+                                                (2, '1603708224000'),
+                                                (3, '1603708226000'),
+                                                (4, '1603708289000')],
+                                               
type_info=Types.ROW([Types.INT(), Types.STRING()]))
+
+        class MyTimestampAssigner(TimestampAssigner):
+
+            def extract_timestamp(self, value, previous) -> int:
+                return int(value[1])
+
+        class MyProcessFunction(ProcessFunction):
+
+            def process_element(self, value, ctx, out):
+                current_timestamp = ctx.timestamp()
+                current_watermark = ctx.timer_service().current_watermark()
+                out.collect("current timestamp: {}, current watermark: {}, 
current_value: {}"
+                            .format(str(current_timestamp), 
str(current_watermark), str(value)))
+
+            def on_timer(self, timestamp, ctx, out):
+                pass
+
+        watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()\
+            .with_timestamp_assigner(MyTimestampAssigner())
+        data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
+            .key_by(lambda x: x[0], key_type_info=Types.INT()) \
+            .process(MyProcessFunction(), 
output_type=Types.STRING()).add_sink(self.test_sink)
+        self.env.execute('test time stamp assigner')
+        result = self.test_sink.get_results()
+        expeected_result = ["current timestamp: None, current watermark: 
9223372036854775807, "

Review comment:
       ```suggestion
           expected_result = ["current timestamp: None, current watermark: 
9223372036854775807, "
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>

Review comment:
       What about rename to PythonTimestampsAndWatermarksOperator

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>
+       implements ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<IN> watermarkStrategy;
+
+       private final TypeInformation runnerInputTypeInfo;
+
+       private final TypeInformation runnerOutputTypeInfo;
+
+       private transient TypeSerializer runnerInputSerializer;
+
+       private transient TypeSerializer runnerOutputSerializer;
+
+       private transient WatermarkGenerator<IN> watermarkGenerator;
+
+       private transient WatermarkOutput watermarkOutput;
+
+       private transient long watermarkInterval;
+
+       private transient Row resuableInput;
+
+       private transient StreamRecord<IN> reusableStreamRecord;
+
+       public TimestampsAndWatermarksOperator(
+               Configuration config,
+               TypeInformation<IN> inputTypeInfo,
+               DataStreamPythonFunctionInfo pythonFunctionInfo,
+               WatermarkStrategy<IN> watermarkStrategy) {
+               super(config, inputTypeInfo, inputTypeInfo, pythonFunctionInfo);
+               this.watermarkStrategy = watermarkStrategy;
+               this.chainingStrategy = ChainingStrategy.ALWAYS;
+               this.runnerInputTypeInfo = Types.ROW(Types.LONG, inputTypeInfo);
+               this.runnerOutputTypeInfo = Types.ROW(Types.LONG, 
inputTypeInfo);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               runnerInputSerializer = 
PythonTypeUtils.TypeInfoToSerializerConverter
+                       .typeInfoSerializerConverter(runnerInputTypeInfo);
+               runnerOutputSerializer = 
PythonTypeUtils.TypeInfoToSerializerConverter
+                       .typeInfoSerializerConverter(runnerOutputTypeInfo);
+               resuableInput = new Row(2);
+               this.reusableStreamRecord = new StreamRecord<>(null);
+
+               watermarkGenerator = 
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);
+               watermarkOutput = new WatermarkEmitter(output,
+                       getContainingTask().getStreamStatusMaintainer());
+
+               watermarkInterval = 
getExecutionConfig().getAutoWatermarkInterval();
+               if (watermarkInterval > 0) {
+                       final long now = 
getProcessingTimeService().getCurrentProcessingTime();
+                       getProcessingTimeService().registerTimer(now + 
watermarkInterval, this);
+               }
+       }
+
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               IN value = element.getValue();
+               final long previousTimestamp = element.hasTimestamp() ?
+                       element.getTimestamp() : Long.MIN_VALUE;
+
+               resuableInput.setField(0, previousTimestamp);
+               resuableInput.setField(1, value);
+
+               runnerInputSerializer.serialize(resuableInput, baosWrapper);
+               pythonFunctionRunner.process(baos.toByteArray());
+               baos.reset();
+               elementCount++;
+               checkInvokeFinishBundleByCount();
+               emitResults();
+       }
+
+       @Override
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] rawResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(rawResult, 0, length);
+               Row runnerOutput = (Row) 
runnerOutputSerializer.deserialize(baisWrapper);
+               long newTimestamp = (Long) runnerOutput.getField(0);
+               IN originalData = (IN) runnerOutput.getField(1);
+               reusableStreamRecord.replace(originalData, newTimestamp);
+               output.collect(reusableStreamRecord);
+               watermarkGenerator.onEvent(originalData, newTimestamp, 
watermarkOutput);
+

Review comment:
       unnecessary empty line

##########
File path: flink-python/pyflink/common/time.py
##########
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+
+class Duration(object):

Review comment:
       Export it

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>
+       implements ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<IN> watermarkStrategy;
+
+       private final TypeInformation runnerInputTypeInfo;
+
+       private final TypeInformation runnerOutputTypeInfo;
+
+       private transient TypeSerializer runnerInputSerializer;
+
+       private transient TypeSerializer runnerOutputSerializer;
+
+       private transient WatermarkGenerator<IN> watermarkGenerator;
+
+       private transient WatermarkOutput watermarkOutput;
+
+       private transient long watermarkInterval;
+
+       private transient Row resuableInput;
+
+       private transient StreamRecord<IN> reusableStreamRecord;
+
+       public TimestampsAndWatermarksOperator(
+               Configuration config,
+               TypeInformation<IN> inputTypeInfo,
+               DataStreamPythonFunctionInfo pythonFunctionInfo,
+               WatermarkStrategy<IN> watermarkStrategy) {
+               super(config, inputTypeInfo, inputTypeInfo, pythonFunctionInfo);
+               this.watermarkStrategy = watermarkStrategy;
+               this.chainingStrategy = ChainingStrategy.ALWAYS;
+               this.runnerInputTypeInfo = Types.ROW(Types.LONG, inputTypeInfo);

Review comment:
       Could move the initialization of runnerInputTypeInfo and 
runnerOutputTypeInfo to the open method. Then we could make them transient.

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>
+       implements ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<IN> watermarkStrategy;

Review comment:
       Add documentation

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>
+       implements ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<IN> watermarkStrategy;
+
+       private final TypeInformation runnerInputTypeInfo;
+
+       private final TypeInformation runnerOutputTypeInfo;
+
+       private transient TypeSerializer runnerInputSerializer;
+
+       private transient TypeSerializer runnerOutputSerializer;
+
+       private transient WatermarkGenerator<IN> watermarkGenerator;
+
+       private transient WatermarkOutput watermarkOutput;
+
+       private transient long watermarkInterval;
+
+       private transient Row resuableInput;
+
+       private transient StreamRecord<IN> reusableStreamRecord;
+
+       public TimestampsAndWatermarksOperator(
+               Configuration config,
+               TypeInformation<IN> inputTypeInfo,
+               DataStreamPythonFunctionInfo pythonFunctionInfo,
+               WatermarkStrategy<IN> watermarkStrategy) {
+               super(config, inputTypeInfo, inputTypeInfo, pythonFunctionInfo);
+               this.watermarkStrategy = watermarkStrategy;
+               this.chainingStrategy = ChainingStrategy.ALWAYS;

Review comment:
       could be removed as it's already set in the super class

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>
+       implements ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<IN> watermarkStrategy;
+
+       private final TypeInformation runnerInputTypeInfo;
+
+       private final TypeInformation runnerOutputTypeInfo;
+
+       private transient TypeSerializer runnerInputSerializer;
+
+       private transient TypeSerializer runnerOutputSerializer;
+
+       private transient WatermarkGenerator<IN> watermarkGenerator;
+
+       private transient WatermarkOutput watermarkOutput;
+
+       private transient long watermarkInterval;
+
+       private transient Row resuableInput;

Review comment:
       Typo
   ```suggestion
        private transient Row reusableInput;
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>
+       implements ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<IN> watermarkStrategy;
+
+       private final TypeInformation runnerInputTypeInfo;
+
+       private final TypeInformation runnerOutputTypeInfo;
+
+       private transient TypeSerializer runnerInputSerializer;
+
+       private transient TypeSerializer runnerOutputSerializer;
+
+       private transient WatermarkGenerator<IN> watermarkGenerator;
+
+       private transient WatermarkOutput watermarkOutput;
+
+       private transient long watermarkInterval;
+
+       private transient Row resuableInput;
+
+       private transient StreamRecord<IN> reusableStreamRecord;
+
+       public TimestampsAndWatermarksOperator(
+               Configuration config,
+               TypeInformation<IN> inputTypeInfo,
+               DataStreamPythonFunctionInfo pythonFunctionInfo,
+               WatermarkStrategy<IN> watermarkStrategy) {
+               super(config, inputTypeInfo, inputTypeInfo, pythonFunctionInfo);
+               this.watermarkStrategy = watermarkStrategy;
+               this.chainingStrategy = ChainingStrategy.ALWAYS;
+               this.runnerInputTypeInfo = Types.ROW(Types.LONG, inputTypeInfo);
+               this.runnerOutputTypeInfo = Types.ROW(Types.LONG, 
inputTypeInfo);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               runnerInputSerializer = 
PythonTypeUtils.TypeInfoToSerializerConverter
+                       .typeInfoSerializerConverter(runnerInputTypeInfo);
+               runnerOutputSerializer = 
PythonTypeUtils.TypeInfoToSerializerConverter
+                       .typeInfoSerializerConverter(runnerOutputTypeInfo);
+               resuableInput = new Row(2);
+               this.reusableStreamRecord = new StreamRecord<>(null);

Review comment:
       ```suggestion
                reusableStreamRecord = new StreamRecord<>(null);
   ```
   Make the code style consistent.

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TimestampsAndWatermarksOperator.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+/**
+ * A stream operator that may do one or both of the following: extract 
timestamps from
+ * events and generate watermarks by user specify TimestampAssigner and 
WatermarkStrategy.
+ *
+ * <p>These two responsibilities run in the same operator rather than in two 
different ones,
+ * because the implementation of the timestamp assigner and the watermark 
generator is
+ * frequently in the same class (and should be run in the same instance), even 
though the
+ * separate interfaces support the use of different classes.
+ *
+ * @param <IN> The type of the input elements
+ */
+public class TimestampsAndWatermarksOperator<IN> extends 
StatelessOneInputPythonFunctionOperator<IN, IN>
+       implements ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       private final WatermarkStrategy<IN> watermarkStrategy;
+
+       private final TypeInformation runnerInputTypeInfo;
+
+       private final TypeInformation runnerOutputTypeInfo;
+
+       private transient TypeSerializer runnerInputSerializer;
+
+       private transient TypeSerializer runnerOutputSerializer;
+
+       private transient WatermarkGenerator<IN> watermarkGenerator;
+
+       private transient WatermarkOutput watermarkOutput;
+
+       private transient long watermarkInterval;
+
+       private transient Row resuableInput;
+
+       private transient StreamRecord<IN> reusableStreamRecord;
+
+       public TimestampsAndWatermarksOperator(
+               Configuration config,
+               TypeInformation<IN> inputTypeInfo,
+               DataStreamPythonFunctionInfo pythonFunctionInfo,
+               WatermarkStrategy<IN> watermarkStrategy) {
+               super(config, inputTypeInfo, inputTypeInfo, pythonFunctionInfo);
+               this.watermarkStrategy = watermarkStrategy;
+               this.chainingStrategy = ChainingStrategy.ALWAYS;
+               this.runnerInputTypeInfo = Types.ROW(Types.LONG, inputTypeInfo);
+               this.runnerOutputTypeInfo = Types.ROW(Types.LONG, 
inputTypeInfo);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               runnerInputSerializer = 
PythonTypeUtils.TypeInfoToSerializerConverter
+                       .typeInfoSerializerConverter(runnerInputTypeInfo);
+               runnerOutputSerializer = 
PythonTypeUtils.TypeInfoToSerializerConverter
+                       .typeInfoSerializerConverter(runnerOutputTypeInfo);
+               resuableInput = new Row(2);
+               this.reusableStreamRecord = new StreamRecord<>(null);
+
+               watermarkGenerator = 
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);
+               watermarkOutput = new WatermarkEmitter(output,
+                       getContainingTask().getStreamStatusMaintainer());
+
+               watermarkInterval = 
getExecutionConfig().getAutoWatermarkInterval();
+               if (watermarkInterval > 0) {
+                       final long now = 
getProcessingTimeService().getCurrentProcessingTime();
+                       getProcessingTimeService().registerTimer(now + 
watermarkInterval, this);
+               }
+       }
+
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               IN value = element.getValue();
+               final long previousTimestamp = element.hasTimestamp() ?
+                       element.getTimestamp() : Long.MIN_VALUE;
+
+               resuableInput.setField(0, previousTimestamp);
+               resuableInput.setField(1, value);
+
+               runnerInputSerializer.serialize(resuableInput, baosWrapper);
+               pythonFunctionRunner.process(baos.toByteArray());
+               baos.reset();
+               elementCount++;
+               checkInvokeFinishBundleByCount();
+               emitResults();
+       }
+
+       @Override
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] rawResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(rawResult, 0, length);
+               Row runnerOutput = (Row) 
runnerOutputSerializer.deserialize(baisWrapper);
+               long newTimestamp = (Long) runnerOutput.getField(0);
+               IN originalData = (IN) runnerOutput.getField(1);
+               reusableStreamRecord.replace(originalData, newTimestamp);
+               output.collect(reusableStreamRecord);
+               watermarkGenerator.onEvent(originalData, newTimestamp, 
watermarkOutput);
+
+       }
+
+       @Override
+       public PythonFunctionRunner createPythonFunctionRunner() throws 
Exception {
+               return new BeamDataStreamPythonFunctionRunner(
+                       getRuntimeContext().getTaskName(),
+                       createPythonEnvironmentManager(),
+                       runnerInputTypeInfo,
+                       runnerOutputTypeInfo,
+                       DATA_STREAM_STATELESS_PYTHON_FUNCTION_URN,
+                       
PythonOperatorUtils.getUserDefinedDataStreamFunctionProto(pythonFunctionInfo, 
getRuntimeContext(), Collections.EMPTY_MAP),
+                       DATA_STREAM_MAP_FUNCTION_CODER_URN,
+                       jobOptions,
+                       getFlinkMetricContainer(),
+                       null,
+                       null,
+                       getContainingTask().getEnvironment().getMemoryManager(),
+                       
getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(
+                               ManagedMemoryUseCase.PYTHON,
+                               
getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
+                               
getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader())
+               );
+       }
+
+       @Override
+       public void onProcessingTime(long timestamp) {
+               watermarkGenerator.onPeriodicEmit(watermarkOutput);
+               final long now = 
getProcessingTimeService().getCurrentProcessingTime();
+               getProcessingTimeService().registerTimer(now + 
watermarkInterval, this);
+       }
+
+       @Override
+       public void 
processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
+               if (mark.getTimestamp() == Long.MAX_VALUE) {
+                       watermarkOutput.emitWatermark(Watermark.MAX_WATERMARK);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               watermarkGenerator.onPeriodicEmit(watermarkOutput);
+       }
+
+       private static final class WatermarkEmitter implements WatermarkOutput {

Review comment:
       could reuse the WatermarkEmitter defined in 
org.apache.flink.streaming.runtime.operators.WatermarkEmitter.

##########
File path: flink-python/pyflink/common/watermark_strategy.py
##########
@@ -0,0 +1,112 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+
+from pyflink.common.time import Duration
+from pyflink.java_gateway import get_gateway
+
+
+class WatermarkStrategy(object):
+    """
+    The WatermarkStrategy defines how to generate Watermarks in the stream 
sources. The
+    WatermarkStrategy is a builder/factory for the WatermarkGenerator that 
generates the watermarks
+    and the TimestampAssigner which assigns the internal timestamp of a record.
+
+    The convenience methods, for example forBoundedOutOfOrderness(Duration), 
create a
+    WatermarkStrategy for common built in strategies.
+    """
+    def __init__(self, j_watermark_strategy):
+        self._j_watermark_strategy = j_watermark_strategy
+        self._timestamp_assigner = None
+
+    def with_timestamp_assigner(self, timestamp_assigner: 'TimestampAssigner'):

Review comment:
       type hint

##########
File path: flink-python/pyflink/common/watermark_strategy.py
##########
@@ -0,0 +1,112 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+
+from pyflink.common.time import Duration
+from pyflink.java_gateway import get_gateway
+
+
+class WatermarkStrategy(object):

Review comment:
       export it

##########
File path: flink-python/pyflink/common/watermark_strategy.py
##########
@@ -0,0 +1,112 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+
+from pyflink.common.time import Duration
+from pyflink.java_gateway import get_gateway
+
+
+class WatermarkStrategy(object):
+    """
+    The WatermarkStrategy defines how to generate Watermarks in the stream 
sources. The
+    WatermarkStrategy is a builder/factory for the WatermarkGenerator that 
generates the watermarks
+    and the TimestampAssigner which assigns the internal timestamp of a record.
+
+    The convenience methods, for example forBoundedOutOfOrderness(Duration), 
create a
+    WatermarkStrategy for common built in strategies.
+    """
+    def __init__(self, j_watermark_strategy):
+        self._j_watermark_strategy = j_watermark_strategy
+        self._timestamp_assigner = None
+
+    def with_timestamp_assigner(self, timestamp_assigner: 'TimestampAssigner'):
+        """
+        Creates a new WatermarkStrategy that wraps this strategy but instead 
uses the given a
+        TimestampAssigner by implementing TimestampAssigner interface.
+        ::
+            >>> watermark_strategy = 
WatermarkStrategy.for_monotonous_timestamps()
+            >>>   .with_timestamp_assigner(MyTimestampAssigner())
+
+        :param timestamp_assigner: The given TimestampAssigner.
+        :return: A WaterMarkStrategy that wraps a TimestampAssigner.
+        """
+        self._timestamp_assigner = timestamp_assigner
+        return self
+
+    def with_idleness(self, idle_timeout: Duration) -> 'WatermarkStrategy':
+        """
+        Creates a new enriched WatermarkStrategy that also does idleness 
detection in the created
+        WatermarkGenerator.
+
+        :param idle_timeout: The idle timeout.
+        :return: A new WatermarkStrategy with idle detection configured.
+        """
+        return 
WatermarkStrategy(self._j_watermark_strategy.withIdleness(idle_timeout._j_duration))

Review comment:
       self._timestamp_assigner is missing

##########
File path: flink-python/pyflink/common/watermark_strategy.py
##########
@@ -0,0 +1,112 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+
+from pyflink.common.time import Duration
+from pyflink.java_gateway import get_gateway
+
+
+class WatermarkStrategy(object):
+    """
+    The WatermarkStrategy defines how to generate Watermarks in the stream 
sources. The
+    WatermarkStrategy is a builder/factory for the WatermarkGenerator that 
generates the watermarks
+    and the TimestampAssigner which assigns the internal timestamp of a record.
+
+    The convenience methods, for example forBoundedOutOfOrderness(Duration), 
create a
+    WatermarkStrategy for common built in strategies.
+    """
+    def __init__(self, j_watermark_strategy):
+        self._j_watermark_strategy = j_watermark_strategy
+        self._timestamp_assigner = None
+
+    def with_timestamp_assigner(self, timestamp_assigner: 'TimestampAssigner'):
+        """
+        Creates a new WatermarkStrategy that wraps this strategy but instead 
uses the given a
+        TimestampAssigner by implementing TimestampAssigner interface.
+        ::
+            >>> watermark_strategy = 
WatermarkStrategy.for_monotonous_timestamps()
+            >>>   .with_timestamp_assigner(MyTimestampAssigner())
+
+        :param timestamp_assigner: The given TimestampAssigner.
+        :return: A WaterMarkStrategy that wraps a TimestampAssigner.
+        """
+        self._timestamp_assigner = timestamp_assigner
+        return self
+
+    def with_idleness(self, idle_timeout: Duration) -> 'WatermarkStrategy':
+        """
+        Creates a new enriched WatermarkStrategy that also does idleness 
detection in the created
+        WatermarkGenerator.
+
+        :param idle_timeout: The idle timeout.
+        :return: A new WatermarkStrategy with idle detection configured.
+        """
+        return 
WatermarkStrategy(self._j_watermark_strategy.withIdleness(idle_timeout._j_duration))
+
+    @staticmethod
+    def for_monotonous_timestamps():
+        """
+        Creates a watermark strategy for situations with monotonously 
ascending timestamps.
+
+        The watermarks are generated periodically and tightly follow the 
latest timestamp in the
+        data. The delay introduced by this strategy is mainly the periodic 
interval in which the
+        watermarks are generated.
+        """
+        JWaterMarkStrategy = get_gateway().jvm\
+            .org.apache.flink.api.common.eventtime.WatermarkStrategy
+        return WatermarkStrategy(JWaterMarkStrategy.forMonotonousTimestamps())
+
+    @staticmethod
+    def for_bounded_out_of_orderness(max_out_of_orderness: Duration):
+        """
+        Creates a watermark strategy for situations where records are out of 
order, but you can
+        place an upper bound on how far the events are out of order. An 
out-of-order bound B means
+        that once the an event with timestamp T was encountered, no events 
older than (T - B) will
+        follow any more.
+        """
+        JWaterMarkStrategy = get_gateway().jvm \
+            .org.apache.flink.api.common.eventtime.WatermarkStrategy
+        return WatermarkStrategy(
+            
JWaterMarkStrategy.forBoundedOutOfOrderness(max_out_of_orderness._j_duration))
+
+
+class TimestampAssigner(abc.ABC):
+    """
+    A TimestampAssigner assigns event time timestamps to elements. These 
timestamps are used by all
+    functions that operate on event time, for example event time windows.
+
+    Timestamps can be an arbitrary int value, but all built-in implementations 
represent it as the
+    milliseconds since the Epoch (midnight, January 1, 1970 UTC), the same way 
as time.time() does
+    it.
+    """
+    @abc.abstractmethod
+    def extract_timestamp(self, value, previous) -> int:

Review comment:
       add type hint




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to