Joekwal created FLINK-32040:
-------------------------------
Summary: The WatermarkStrategy defined with the
Function(with_idleness) report an error
Key: FLINK-32040
URL: https://issues.apache.org/jira/browse/FLINK-32040
Project: Flink
Issue Type: Bug
Components: API / Python
Reporter: Joekwal
version: upgrade pyflink1.15.2 to pyflink1.16.1
Report an error:
Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time
characteristic set to 'ProcessingTime', or did you forget to call
'data_stream.assign_timestamps_and_watermarks(...)'?
The application before with version 1.15.2 has never reported the error.
Example1 report an error:
{code:java}
```python```
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp: int) -> int:
return value['version']
sql="""
select columns,version(milliseconds) from kafka_source
"""
table = st_env.sql_query(sql)
stream = st_env.to_changelog_stream(table)
stream = stream.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1))
.with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10)))
stream = stream.key_by(CommonKeySelector()) \
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \
.process(WindowFunction(), typeInfo){code}
Try to debug to trace
pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks and
find
watermark_strategy._timestamp_assigner is none.
Solution:
Remove function–with_idleness(Duration.of_seconds(10))
{code:java}
stream = stream.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1))
.with_timestamp_assigner(MyTimestampAssigner())) {code}
Is this a bug?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)