Hi, folks

I’m using flink 1.16.0, and I would like to read Parquet file (attached), that 
has schema [1].

I could read this file with Spark, but when I try to read it with Flink 1.16.0 
(program attached) using schema [2]

I got IndexOutOfBoundsException [3]

My code, and parquet file are attached. Is it:

·         the problem, described in 
FLINK-28867<https://issues.apache.org/jira/browse/FLINK-28867> or

·         something new, that deserve a separate Jira, or

·         something wrong with my code?


[1]: Parquet Schema

root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
|    |-- extendedProperties: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- key: string (nullable = true)
|    |    |    |-- value: string (nullable = true)
|    |-- sourceAccountId: string (nullable = true)
|    |-- sourceEntityId: string (nullable = true)
|    |-- sourceEntityType: string (nullable = true)
|    |-- sourceSystem: string (nullable = true)


[2]: Schema used in Flink:

    static RowType getSchema()
    {
        RowType elementType = RowType.of(
            new LogicalType[] {
                new VarCharType(VarCharType.MAX_LENGTH),
                new VarCharType(VarCharType.MAX_LENGTH)
            },
            new String[] {
                "key",
                "value"
            }
        );

        RowType element = RowType.of(
            new LogicalType[] { elementType },
            new String[] { "element" }
        );

        RowType sourceEntity = RowType.of(
            new LogicalType[] {
                new ArrayType(element),
                new VarCharType(),
                new VarCharType(),
                new VarCharType(),
                new VarCharType(),
            },
            new String[] {
                "extendedProperties",
                "sourceAccountId",
                "sourceEntityId",
                "sourceEntityType",
                "sourceSystem"
            }
        );

        return  RowType.of(
            new LogicalType[] {
                new DecimalType(),
                new VarCharType(),
                sourceEntity
            },
            new String[] {
                "amount",
                "connectionAccountId",
                "sourceEntity",
        });
    }

[3]:  Execution Exception:

2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received 
uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
    ...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
length 1
    at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
    at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
    at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
    at java.base/java.util.Objects.checkIndex(Objects.java:372)
    at java.base/java.util.ArrayList.get(ArrayList.java:459)
    at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
    at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
    at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
    at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
    at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    ... 6 common frames omitted

Thanks

Attachment: part-00121.parquet
Description: part-00121.parquet

Attachment: ReadParquetArray1.java
Description: ReadParquetArray1.java

Reply via email to