This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4df6a398bbe [FLINK-29461][python] Make the test_process_function more 
stable
4df6a398bbe is described below

commit 4df6a398bbe2a9de7c23977176789e54cc0848fa
Author: huangxingbo <h...@apache.org>
AuthorDate: Mon Dec 12 19:33:21 2022 +0800

    [FLINK-29461][python] Make the test_process_function more stable
    
    This closes #21491.
---
 .../pyflink/datastream/tests/test_data_stream.py    | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py 
b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 009c8414ee1..6013c9cf0d8 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -980,9 +980,8 @@ class ProcessDataStreamTests(DataStreamTests):
 
             def process_element(self, value, ctx):
                 current_timestamp = ctx.timestamp()
-                current_watermark = ctx.timer_service().current_watermark()
-                yield "current timestamp: {}, current watermark: {}, 
current_value: {}"\
-                    .format(str(current_timestamp), str(current_watermark), 
str(value))
+                yield "current timestamp: {}, current_value: {}"\
+                    .format(str(current_timestamp), str(value))
 
         watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()\
             .with_timestamp_assigner(SecondColumnTimestampAssigner())
@@ -990,14 +989,14 @@ class ProcessDataStreamTests(DataStreamTests):
             .process(MyProcessFunction(), 
output_type=Types.STRING()).add_sink(self.test_sink)
         self.env.execute('test process function')
         results = self.test_sink.get_results()
-        expected = ["current timestamp: 1603708211000, current watermark: "
-                    "-9223372036854775808, current_value: Row(f0=1, 
f1='1603708211000')",
-                    "current timestamp: 1603708224000, current watermark: "
-                    "-9223372036854775808, current_value: Row(f0=2, 
f1='1603708224000')",
-                    "current timestamp: 1603708226000, current watermark: "
-                    "-9223372036854775808, current_value: Row(f0=3, 
f1='1603708226000')",
-                    "current timestamp: 1603708289000, current watermark: "
-                    "-9223372036854775808, current_value: Row(f0=4, 
f1='1603708289000')"]
+        expected = ["current timestamp: 1603708211000, "
+                    "current_value: Row(f0=1, f1='1603708211000')",
+                    "current timestamp: 1603708224000, "
+                    "current_value: Row(f0=2, f1='1603708224000')",
+                    "current timestamp: 1603708226000, "
+                    "current_value: Row(f0=3, f1='1603708226000')",
+                    "current timestamp: 1603708289000, "
+                    "current_value: Row(f0=4, f1='1603708289000')"]
         self.assert_equals_sorted(expected, results)
 
     def test_process_side_output(self):

Reply via email to