Andrew Feng created FLINK-39725:
-----------------------------------
Summary: from_source does not use timestamp assigner in pyflink
Key: FLINK-39725
URL: https://issues.apache.org/jira/browse/FLINK-39725
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 2.2.0
Reporter: Andrew Feng
Attachments: from_source.tar.gz
See main.py:
{code:python}
from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource
from pyflink.common import WatermarkStrategy
from pyflink.common import Row, Duration
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.table.types import DataTypes
env = StreamExecutionEnvironment.get_execution_environment()
input_path = "./input.csv"
class DateTimeAssigner(TimestampAssigner):
def extract_timestamp(self, value: Row, record_timestamp: int) -> int:
exit(99) # this line does not execute
return int(datetime.strptime(value.date_time, "%Y-%m-%d
%H:%M:%S").timestamp() * 1000) # epoch millis
schema = CsvSchema.builder() \
.add_string_column("date_time") \
.add_string_column("type") \
.add_number_column("value", number_type=DataTypes.DOUBLE()) \
.build()
file_source = (
FileSource
.for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
.build()
)
ds = env.from_source(
file_source,
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(2))
.with_timestamp_assigner(DateTimeAssigner()),
"csv_source")
ds.print()
env.execute()
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)