autophagy commented on code in PR #28629:
URL: https://github.com/apache/flink/pull/28629#discussion_r3520680940


##########
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py:
##########
@@ -16,45 +16,60 @@
 # limitations under the License.
 
################################################################################
 
+import json
+import sys
 from typing import Any
 
-from pyflink.common import Duration
-from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common import Duration, Encoder, Row
 from pyflink.common.typeinfo import Types
 from pyflink.common.watermark_strategy import TimestampAssigner, 
WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
-from pyflink.datastream.formats.json import JsonRowDeserializationSchema
+from pyflink.datastream.connectors.file_system import (FileSink, FileSource, 
RollingPolicy,
+                                                       StreamFormat)
 from pyflink.datastream.functions import KeyedProcessFunction
 
 from functions import MyKeySelector
 
 
-def python_data_stream_example():
+def python_data_stream_example(input_path: str, output_path: str):
     env = StreamExecutionEnvironment.get_execution_environment()
-    # Set the parallelism to be one to make sure that all data including fired 
timer and normal data
-    # are processed by the same worker and the collected result would be in 
order which is good for
-    # assertion.
+    # Process everything on one worker so the timer behavior is deterministic 
and the output
+    # lands in a single part file.
     env.set_parallelism(1)
+    # No periodic watermarks: current_watermark() stays at Long.MIN_VALUE 
until the bounded
+    # source emits MAX_WATERMARK at end of input, making the fired-timer 
output deterministic.
+    env.get_config().set_auto_watermark_interval(0)
 
     type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 
'payPlatform', 'provinceId'],
                                 [Types.LONG(), Types.LONG(), Types.DOUBLE(), 
Types.INT(),
                                  Types.INT()])
-    json_row_schema = 
JsonRowDeserializationSchema.builder().type_info(type_info).build()
-    kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 
'pyflink-e2e-source'}
 
-    kafka_consumer = FlinkKafkaConsumer("timer-stream-source", 
json_row_schema, kafka_props)
-    kafka_producer = FlinkKafkaProducer("timer-stream-sink", 
SimpleStringSchema(), kafka_props)
+    source = 
FileSource.for_record_stream_format(StreamFormat.text_line_format(), 
input_path) \
+        .process_static_file_set().build()
 
-    watermark_strategy = 
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5))\
-        .with_timestamp_assigner(KafkaRowTimestampAssigner())
+    watermark_strategy = 
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)) \

Review Comment:
   Does setting the auto watermark interval above 0 with a wm strategy with a 5 
second duration do the same thing as `WatermarkStrategy.no_watermarks()`? Could 
be a simplification if so, and wouldnt need to set the env config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to