MartijnVisser commented on code in PR #28629:
URL: https://github.com/apache/flink/pull/28629#discussion_r3520734042
##########
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py:
##########
@@ -16,45 +16,60 @@
# limitations under the License.
################################################################################
+import json
+import sys
from typing import Any
-from pyflink.common import Duration
-from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common import Duration, Encoder, Row
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner,
WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer
-from pyflink.datastream.formats.json import JsonRowDeserializationSchema
+from pyflink.datastream.connectors.file_system import (FileSink, FileSource,
RollingPolicy,
+ StreamFormat)
from pyflink.datastream.functions import KeyedProcessFunction
from functions import MyKeySelector
-def python_data_stream_example():
+def python_data_stream_example(input_path: str, output_path: str):
env = StreamExecutionEnvironment.get_execution_environment()
- # Set the parallelism to be one to make sure that all data including fired
timer and normal data
- # are processed by the same worker and the collected result would be in
order which is good for
- # assertion.
+ # Process everything on one worker so the timer behavior is deterministic
and the output
+ # lands in a single part file.
env.set_parallelism(1)
+ # No periodic watermarks: current_watermark() stays at Long.MIN_VALUE
until the bounded
+ # source emits MAX_WATERMARK at end of input, making the fired-timer
output deterministic.
+ env.get_config().set_auto_watermark_interval(0)
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(),
Types.INT(),
Types.INT()])
- json_row_schema =
JsonRowDeserializationSchema.builder().type_info(type_info).build()
- kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
'pyflink-e2e-source'}
- kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
json_row_schema, kafka_props)
- kafka_producer = FlinkKafkaProducer("timer-stream-sink",
SimpleStringSchema(), kafka_props)
+ source =
FileSource.for_record_stream_format(StreamFormat.text_line_format(),
input_path) \
+ .process_static_file_set().build()
- watermark_strategy =
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5))\
- .with_timestamp_assigner(KafkaRowTimestampAssigner())
+ watermark_strategy =
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)) \
Review Comment:
Almost: with the interval above 0 they are not quite equivalent, because the
bounded-out-of-orderness generator emits periodically, so on a slow read the
watermark could advance mid-stream and change which timestamps the timers
register at. But you are right that `no_watermarks()` is the cleaner way to get
exactly the interval-0 behavior: it never emits during processing, the
timestamp assigner still chains onto it (needed for the `ctx.timestamp()`
assertions), and the end-of-input `MAX_WATERMARK` is forwarded by
`TimestampsAndWatermarksOperator` regardless of the strategy, so the timers
still fire. Applied in fe0a7c4c337 and re-verified locally (same 16 lines,
identical across runs).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]