[ 
https://issues.apache.org/jira/browse/FLINK-39725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18090095#comment-18090095
 ] 

Swati Gupta edited comment on FLINK-39725 at 6/19/26 7:19 AM:
--------------------------------------------------------------

Hey, I spent some time on this before jumping to a PR.

I ran the attached main.py and swapped exit(99) for a quick file-write so I 
could actually see whether extract_timestamp runs. It doesn't,  job runs fine, 
prints everything, but the marker file never shows up. Tried swapping 
FileSource for NumberSequenceSource too, same thing happens, so it's not just a 
CSV/FileSource quirk.

As a sanity check I tried the older style (from_collection + 
assign_timestamps_and_watermarks) and there the assigner works fine.

Digging into the pyflink code, looks like with_timestamp_assigner() just stores 
your assigner on the Python object and never actually passes it down to the 
Java side. from_source() only forwards the Java watermark strategy, so it never 
even looks for it. assign_timestamps_and_watermarks() does check for it though, 
which is why that path works.

Before I go any further, couple of things I wanted to ask:
1. Is this expected behavior for some reason I'm not seeing, or does it 
genuinely look like a bug to you too?
2. If it's a bug, any preference on how you'd want it fixed, handled inside 
from_source() directly, or by reusing the same logic 
assign_timestamps_and_watermarks() already has?


was (Author: JIRAUSER311913):
Hey, I spent some time on this before jumping to a PR.

I ran the attached main.py and swapped exit(99) for a quick file-write so I 
could actually see whether extract_timestamp runs. It doesn't,  job runs fine, 
prints everything, but the marker file never shows up. Tried swapping 
FileSource for NumberSequenceSource too, same thing happens, so it's not just a 
CSV/FileSource quirk.

As a sanity check I tried the older style (from_collection + 
assign_timestamps_and_watermarks) and there the assigner works fine.

Digging into the pyflink code, looks like with_timestamp_assigner() just stores 
your assigner on the Python object and never actually passes it down to the 
Java side. from_source() only forwards the Java watermark strategy, so it never 
even looks for it. assign_timestamps_and_watermarks() does check for it though, 
which is why that path works.

Before I go any further, couple of things I wanted to ask:
1. Is this expected behavior for some reason I'm not seeing, or does it 
genuinely look like a bug to you too?
2. If it's a bug, any preference on how you'd want it fixed — handled inside 
from_source() directly, or by reusing the same logic 
assign_timestamps_and_watermarks() already has?

> from_source does not use timestamp assigner in pyflink
> ------------------------------------------------------
>
>                 Key: FLINK-39725
>                 URL: https://issues.apache.org/jira/browse/FLINK-39725
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 2.2.0
>            Reporter: Andrew Feng
>            Priority: Major
>         Attachments: from_source.tar.gz
>
>
> See main.py:
>  
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors.file_system import FileSource
> from pyflink.common import WatermarkStrategy
> from pyflink.common import Row, Duration
> from pyflink.common.watermark_strategy import TimestampAssigner
> from pyflink.table.types import DataTypes
> env = StreamExecutionEnvironment.get_execution_environment()
> input_path = "./input.csv"
> class DateTimeAssigner(TimestampAssigner):
>     def extract_timestamp(self, value: Row, record_timestamp: int) -> int:
>         exit(99) # this line does not execute
>         return int(datetime.strptime(value.date_time, "%Y-%m-%d 
> %H:%M:%S").timestamp() * 1000)  # epoch millis
>     
> schema = CsvSchema.builder() \
>     .add_string_column("date_time") \
>     .add_string_column("type") \
>     .add_number_column("value", number_type=DataTypes.DOUBLE()) \
>     .build()
> file_source = (
>     FileSource
>     .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
>     .build()
> )
> ds = env.from_source(
>     file_source,
>     WatermarkStrategy
>     .for_bounded_out_of_orderness(Duration.of_seconds(2))
>     .with_timestamp_assigner(DateTimeAssigner()),
>     "csv_source")
> ds.print()
> env.execute()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to