MartijnVisser commented on code in PR #28629:
URL: https://github.com/apache/flink/pull/28629#discussion_r3520734042


##########
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py:
##########
@@ -16,45 +16,60 @@
 # limitations under the License.
 
################################################################################
 
+import json
+import sys
 from typing import Any
 
-from pyflink.common import Duration
-from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common import Duration, Encoder, Row
 from pyflink.common.typeinfo import Types
 from pyflink.common.watermark_strategy import TimestampAssigner, 
WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
-from pyflink.datastream.formats.json import JsonRowDeserializationSchema
+from pyflink.datastream.connectors.file_system import (FileSink, FileSource, 
RollingPolicy,
+                                                       StreamFormat)
 from pyflink.datastream.functions import KeyedProcessFunction
 
 from functions import MyKeySelector
 
 
-def python_data_stream_example():
+def python_data_stream_example(input_path: str, output_path: str):
     env = StreamExecutionEnvironment.get_execution_environment()
-    # Set the parallelism to be one to make sure that all data including fired 
timer and normal data
-    # are processed by the same worker and the collected result would be in 
order which is good for
-    # assertion.
+    # Process everything on one worker so the timer behavior is deterministic 
and the output
+    # lands in a single part file.
     env.set_parallelism(1)
+    # No periodic watermarks: current_watermark() stays at Long.MIN_VALUE 
until the bounded
+    # source emits MAX_WATERMARK at end of input, making the fired-timer 
output deterministic.
+    env.get_config().set_auto_watermark_interval(0)
 
     type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 
'payPlatform', 'provinceId'],
                                 [Types.LONG(), Types.LONG(), Types.DOUBLE(), 
Types.INT(),
                                  Types.INT()])
-    json_row_schema = 
JsonRowDeserializationSchema.builder().type_info(type_info).build()
-    kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 
'pyflink-e2e-source'}
 
-    kafka_consumer = FlinkKafkaConsumer("timer-stream-source", 
json_row_schema, kafka_props)
-    kafka_producer = FlinkKafkaProducer("timer-stream-sink", 
SimpleStringSchema(), kafka_props)
+    source = 
FileSource.for_record_stream_format(StreamFormat.text_line_format(), 
input_path) \
+        .process_static_file_set().build()
 
-    watermark_strategy = 
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5))\
-        .with_timestamp_assigner(KafkaRowTimestampAssigner())
+    watermark_strategy = 
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)) \

Review Comment:
   Almost: with the interval above 0 they are not quite equivalent, because the 
bounded-out-of-orderness generator emits periodically, so on a slow read the 
watermark could advance mid-stream and change which timestamps the timers 
register at. But you are right that `no_watermarks()` is the cleaner way to get 
exactly the interval-0 behavior: it never emits during processing, the 
timestamp assigner still chains onto it (needed for the `ctx.timestamp()` 
assertions), and the end-of-input `MAX_WATERMARK` is forwarded by 
`TimestampsAndWatermarksOperator` regardless of the strategy, so the timers 
still fire. Applied in fe0a7c4c337 and re-verified locally (same 16 lines, 
identical across runs).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to