Mate Czagany created FLINK-30899:
------------------------------------
Summary: FileSystemTableSource with CSV format incorrectly selects
fields if filtering for partition
Key: FLINK-30899
URL: https://issues.apache.org/jira/browse/FLINK-30899
Project: Flink
Issue Type: Bug
Components: Connectors / FileSystem
Affects Versions: 1.17.0
Reporter: Mate Czagany
In my testing it only affected csv and testcsv formats.
I think it's caused by `FileSystemTableSource` calling
`DeserializationFormatFactory#createRuntimeDecoder` with wrong
`physicalDataType`. The files won't contain the partitioned field values, but
in case of a projection pushdown (which can happen during planning phase if we
filter the partition field by a constant value) the final `physicalDataType`
passed to the deserializer by `FileSystemTableSource` will contain the
partitioned fields as well. As described in `DecodingFormat`, every field in
the `physicalDataType` parameter will have to be present in the serialized
record.
Example:
{code:java}
CREATE TABLE test_table (
f0 INT,
f1 INT,
f2 INT,
f3 INT
) PARTITIONED BY (f0,f1) WITH (
'connector' = 'filesystem',
'path' = 'file:///path/to/whatever',
'format' = 'csv'
)
SELECT * FROM test_table WHERE f0 = 1;
-- !!!! should be 1,4,7,10 !!!!
+-------------+-------------+-------------+-------------+
| f0 | f1 | f2 | f3 |
+-------------+-------------+-------------+-------------+
| 1 | 4 | 10 | 0 |
+-------------+-------------+-------------+-------------+
SELECT * FROM test_table;
+-------------+-------------+-------------+-------------+
| f0 | f1 | f2 | f3 |
+-------------+-------------+-------------+-------------+
| 2 | 5 | 8 | 11 |
| 1 | 4 | 7 | 10 |
| 3 | 6 | 9 | 12 |
+-------------+-------------+-------------+-------------+
SELECT * FROM test_table WHERE f0>0;
+-------------+-------------+-------------+-------------+
| f0 | f1 | f2 | f3 |
+-------------+-------------+-------------+-------------+
| 1 | 4 | 7 | 10 |
| 3 | 6 | 9 | 12 |
| 2 | 5 | 8 | 11 |
+-------------+-------------+-------------+-------------+
SELECT * FROM test_table WHERE f0 = 1 AND f1 = 4;
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for
length 4
at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:49)
at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:27)
at
org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:101)
at
org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:92)
at
org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:42)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
... {code}
At
[https://github.com/apache/flink/blob/b1e70aebd3e248d68cf41a43db385ec9c9b6235a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L147]
the `physicalRowDataType` will contain the partition fields as well, but
`partitionKeysToExtract` will not contain it since `producedDataType` has been
modified in the `applyProjection` method, so it will result in an empty
projection. Then on line 154 we construct the final `physicalDataType`, but
since `partitionKeysProjections` is empty, it will result with the same value
as `physicalDataType` which contains the partition fields too.
By changing
{code:java}
final Projection partitionKeysProjections =
Projection.fromFieldNames(physicalDataType, partitionKeysToExtract);{code}
to
{code:java}
final Projection partitionKeysProjections =
Projection.fromFieldNames(physicalDataType, partitionKeys);{code}
the issue can be solved. I have verified this solution with 1 and 2 partition
keys, with and without metadata columns, with and without virtual columns. But
I still need to test this change with other formats.
If this solution seems correct and a committer could assign me to the JIRA I
can start working on it
--
This message was sent by Atlassian Jira
(v8.20.10#820010)