[
https://issues.apache.org/jira/browse/FLINK-39724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrew Feng updated FLINK-39724:
--------------------------------
Description:
See good.py and bad.py attached.
The only difference is good.py puts the stream through the identity map. This
is highly counter-intuitive and seems to be undocumented. In v2.2 documentation:
{code:python}
schema = CsvSchema.builder() \
.add_number_column('id', number_type=DataTypes.BIGINT()) \
.add_array_column('array', separator='#', element_type=DataTypes.INT()) \
.set_column_separator(',') \
.build()
source = FileSource.for_record_stream_format(
CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(),
Types.LIST(Types.INT())])
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
{code}
Here's bad.py for convenience:
{code:python}
from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
from pyflink.table.types import DataTypes
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource
from pyflink.common import WatermarkStrategy
env = StreamExecutionEnvironment.get_execution_environment()
input_path = "./input.csv"
schema = CsvSchema.builder() \
.add_string_column("type") \
.add_number_column("value", number_type=DataTypes.DOUBLE()) \
.set_column_separator(',') \
.build()
file_source = (
FileSource
.for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
.build()
)
ds = env.from_source(
source=file_source,
watermark_strategy=WatermarkStrategy.no_watermarks(),
source_name="csv_source",
)
ds.get_type() # this fails
# Traceback (most recent call last):
# File "/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/./main.py",
line 29, in <module>
# ds.get_type() # this fails
# ^^^^^^^^^^^^^
# File
"/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/.venv/lib/python3.12/site-packages/pyflink/datastream/data_stream.py",
line 169, in get_type
# return typeinfo._from_java_type(self._j_data_stream.getType())
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# File
"/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/.venv/lib/python3.12/site-packages/pyflink/common/typeinfo.py",
line 1118, in _from_java_type
# raise TypeError("The java type info: %s is not supported in PyFlink
currently." % j_type_info)
# TypeError: The java type info: ROW<`type` STRING, `value`
DOUBLE>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer) is not supported in
PyFlink currently.
# comment: assign_timestamps_and_watermarks (data_stream.py:692) calls
get_type, so calling ds.assign_timestamps_and_watermarks will trigger the same
problem.
ds.print()
env.execute()
{code}
was:
See good.py and bad.py attached.
The only difference is good.py puts the stream through the identity map. This
is highly counter-intuitive and seems to be undocumented. In v2.2 documentation:
{code:python}
schema = CsvSchema.builder() \
.add_number_column('id', number_type=DataTypes.BIGINT()) \
.add_array_column('array', separator='#', element_type=DataTypes.INT()) \
.set_column_separator(',') \
.build()
source = FileSource.for_record_stream_format(
CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
# the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(),
Types.LIST(Types.INT())])
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
{code}
> CsvReaderFormat produces a stream which fails get_type() and
> assign_timestamps_and_watermarks()
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-39724
> URL: https://issues.apache.org/jira/browse/FLINK-39724
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 2.2.0
> Reporter: Andrew Feng
> Priority: Major
> Attachments: CsvFormatReader.tar.gz
>
>
> See good.py and bad.py attached.
> The only difference is good.py puts the stream through the identity map. This
> is highly counter-intuitive and seems to be undocumented. In v2.2
> documentation:
> {code:python}
> schema = CsvSchema.builder() \
> .add_number_column('id', number_type=DataTypes.BIGINT()) \
> .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
> .set_column_separator(',') \
> .build()
> source = FileSource.for_record_stream_format(
> CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build()
> # the type of record will be Types.ROW_NAMED(['id', 'array'], [Types.LONG(),
> Types.LIST(Types.INT())])
> ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source')
> {code}
>
> Here's bad.py for convenience:
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
> from pyflink.table.types import DataTypes
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors.file_system import FileSource
> from pyflink.common import WatermarkStrategy
> env = StreamExecutionEnvironment.get_execution_environment()
> input_path = "./input.csv"
> schema = CsvSchema.builder() \
> .add_string_column("type") \
> .add_number_column("value", number_type=DataTypes.DOUBLE()) \
> .set_column_separator(',') \
> .build()
> file_source = (
> FileSource
> .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
> .build()
> )
> ds = env.from_source(
> source=file_source,
> watermark_strategy=WatermarkStrategy.no_watermarks(),
> source_name="csv_source",
> )
> ds.get_type() # this fails
> # Traceback (most recent call last):
> # File "/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/./main.py",
> line 29, in <module>
> # ds.get_type() # this fails
> # ^^^^^^^^^^^^^
> # File
> "/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/.venv/lib/python3.12/site-packages/pyflink/datastream/data_stream.py",
> line 169, in get_type
> # return typeinfo._from_java_type(self._j_data_stream.getType())
> # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> # File
> "/home/andrew/Projects/bug_reports/flink/CsvReaderFormat/.venv/lib/python3.12/site-packages/pyflink/common/typeinfo.py",
> line 1118, in _from_java_type
> # raise TypeError("The java type info: %s is not supported in PyFlink
> currently." % j_type_info)
> # TypeError: The java type info: ROW<`type` STRING, `value`
> DOUBLE>(org.apache.flink.table.data.RowData,
> org.apache.flink.table.runtime.typeutils.RowDataSerializer) is not supported
> in PyFlink currently.
> # comment: assign_timestamps_and_watermarks (data_stream.py:692) calls
> get_type, so calling ds.assign_timestamps_and_watermarks will trigger the
> same problem.
> ds.print()
> env.execute()
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)