[ 
https://issues.apache.org/jira/browse/FLINK-39725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18091233#comment-18091233
 ] 

Andrew Feng commented on FLINK-39725:
-------------------------------------

1. I believe it is a bug, which is why I reported it. But someone more 
qualified at using Flink with Python should probably weigh in.
2. Not sure what the difference is here? I'm assuming it should be fixed by 
making from_source do what assign_timestamps_and_watermarks is already doing?

> from_source does not use timestamp assigner in pyflink
> ------------------------------------------------------
>
>                 Key: FLINK-39725
>                 URL: https://issues.apache.org/jira/browse/FLINK-39725
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 2.2.0
>            Reporter: Andrew Feng
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: from_source.tar.gz
>
>
> See main.py:
>  
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors.file_system import FileSource
> from pyflink.common import WatermarkStrategy
> from pyflink.common import Row, Duration
> from pyflink.common.watermark_strategy import TimestampAssigner
> from pyflink.table.types import DataTypes
> env = StreamExecutionEnvironment.get_execution_environment()
> input_path = "./input.csv"
> class DateTimeAssigner(TimestampAssigner):
>     def extract_timestamp(self, value: Row, record_timestamp: int) -> int:
>         exit(99) # this line does not execute
>         return int(datetime.strptime(value.date_time, "%Y-%m-%d 
> %H:%M:%S").timestamp() * 1000)  # epoch millis
>     
> schema = CsvSchema.builder() \
>     .add_string_column("date_time") \
>     .add_string_column("type") \
>     .add_number_column("value", number_type=DataTypes.DOUBLE()) \
>     .build()
> file_source = (
>     FileSource
>     .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
>     .build()
> )
> ds = env.from_source(
>     file_source,
>     WatermarkStrategy
>     .for_bounded_out_of_orderness(Duration.of_seconds(2))
>     .with_timestamp_assigner(DateTimeAssigner()),
>     "csv_source")
> ds.print()
> env.execute()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to