dalongliu created FLINK-29547:
---------------------------------

             Summary: Select a[1] which is  array type for parquet complex type 
throw ClassCastException
                 Key: FLINK-29547
                 URL: https://issues.apache.org/jira/browse/FLINK-29547
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.16.0
            Reporter: dalongliu
             Fix For: 1.17.0


Regarding the following SQL test in HiveTableSourceITCase, it will throw 
ClassCastException.
{code:java}
batchTableEnv.executeSql(
        "create table parquet_complex_type_test("
                + "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>) 
stored as parquet");
String[] modules = batchTableEnv.listModules();
// load hive module so that we can use array,map, named_struct function
// for convenient writing complex data
batchTableEnv.loadModule("hive", new HiveModule());
batchTableEnv.useModules("hive", CoreModuleFactory.IDENTIFIER);

batchTableEnv
        .executeSql(
                "insert into parquet_complex_type_test"
                        + " select array(1, 2), map(1, 'val1', 2, 'val2'),"
                        + " named_struct('f1', 1,  'f2', 2)")
        .await();

Table src = batchTableEnv.sqlQuery("select a[1] from 
parquet_complex_type_test");
List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect());{code}
The exception stack: 

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Ljava.lang.Integer;
    at BatchExecCalc$37.processElement(Unknown Source)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
    at 
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:98)
    at 
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:92)
    at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
    at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:401)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:748)

 

After debugging the code, I found the root cause is that source operator reads 
array data from parquet in the vectorized way, and it returns 
ColumnarArrayData, then in the calc operator we convert it to GenericArrayData, 
the object array is Object[] type instead of Integer[], so if we call the 
ArrayObjectArrayConverter#toExternal method converts it to Integer[], it still 
returns Object[] type, and then if convert the array to Integer[] type 
forcedly, we will get the exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to