autophagy commented on code in PR #28629:
URL: https://github.com/apache/flink/pull/28629#discussion_r3520680940
##########
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py:
##########
@@ -16,45 +16,60 @@
# limitations under the License.
################################################################################
+import json
+import sys
from typing import Any
-from pyflink.common import Duration
-from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common import Duration, Encoder, Row
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner,
WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer
-from pyflink.datastream.formats.json import JsonRowDeserializationSchema
+from pyflink.datastream.connectors.file_system import (FileSink, FileSource,
RollingPolicy,
+ StreamFormat)
from pyflink.datastream.functions import KeyedProcessFunction
from functions import MyKeySelector
-def python_data_stream_example():
+def python_data_stream_example(input_path: str, output_path: str):
env = StreamExecutionEnvironment.get_execution_environment()
- # Set the parallelism to be one to make sure that all data including fired
timer and normal data
- # are processed by the same worker and the collected result would be in
order which is good for
- # assertion.
+ # Process everything on one worker so the timer behavior is deterministic
and the output
+ # lands in a single part file.
env.set_parallelism(1)
+ # No periodic watermarks: current_watermark() stays at Long.MIN_VALUE
until the bounded
+ # source emits MAX_WATERMARK at end of input, making the fired-timer
output deterministic.
+ env.get_config().set_auto_watermark_interval(0)
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(),
Types.INT(),
Types.INT()])
- json_row_schema =
JsonRowDeserializationSchema.builder().type_info(type_info).build()
- kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
'pyflink-e2e-source'}
- kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
json_row_schema, kafka_props)
- kafka_producer = FlinkKafkaProducer("timer-stream-sink",
SimpleStringSchema(), kafka_props)
+ source =
FileSource.for_record_stream_format(StreamFormat.text_line_format(),
input_path) \
+ .process_static_file_set().build()
- watermark_strategy =
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5))\
- .with_timestamp_assigner(KafkaRowTimestampAssigner())
+ watermark_strategy =
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)) \
Review Comment:
Does setting the auto watermark interval above 0 with a wm strategy with a 5
second duration do the same thing as `WatermarkStrategy.no_watermarks()`? Could
be a simplification if so, and wouldnt need to set the env config?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]