Andrey Gaskov created FLINK-35698:
-------------------------------------

             Summary: Parquet connector fails to load ROW<x decimal(5, 2)> 
after save
                 Key: FLINK-35698
                 URL: https://issues.apache.org/jira/browse/FLINK-35698
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / API
    Affects Versions: 1.19.0, 1.18.0, 1.17.0
            Reporter: Andrey Gaskov


The bug could be reproduced by the following test added to 
ParquetFileSystemITCase.java:

 
{code:java}
@TestTemplate
void testRowColumnType() throws IOException, ExecutionException, 
InterruptedException {
    String path = 
TempDirUtils.newFolder(super.fileTempFolder()).toURI().getPath();
    super.tableEnv()
            .executeSql(
                    "create table t_in("
                            + "grp ROW<x decimal(5, 2)>"
                            + ") with ("
                            + "'connector' = 'datagen',"
                            + "'number-of-rows' = '10'"
                            + ")");
    super.tableEnv()
            .executeSql(
                    "create table t_out("
                            + "grp ROW<x decimal(5, 2)>"
                            + ") with ("
                            + "'connector' = 'filesystem',"
                            + "'path' = '"
                            + path
                            + "',"
                            + "'format' = 'parquet'"
                            + ")");
    super.tableEnv().executeSql("insert into t_out select * from t_in").await();
    List<Row> rows =
            CollectionUtil.iteratorToList(
                    super.tableEnv().executeSql("select * from t_out limit 
10").collect());
    assertThat(rows).hasSize(10);
} {code}
It fails with this root exception after hanging for 40 seconds:

 

 
{code:java}
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.columnar.vector.heap.HeapIntVector cannot be cast 
to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
    at 
org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
    at 
org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
    at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
    at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:207)
    at 
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeRow(AbstractBinaryWriter.java:147)
    at 
org.apache.flink.table.data.writer.BinaryRowWriter.writeRow(BinaryRowWriter.java:27)
    at 
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:155)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:173)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:44)
    at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:152)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108)
    at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
    at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
    at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
    at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101)
    at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32)
    at 
org.apache.flink.table.runtime.operators.sort.LimitOperator.processElement(LimitOperator.java:47)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:109)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:78)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40)
    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
    at 
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:103)
    at 
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
    at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
    at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:750) {code}
If "grp ROW<x decimal(5, 2)>" is changed to "grp ROW<x int>", the test runs 
successuflly in few seconds.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to