dalongliu created FLINK-29547:
---------------------------------
Summary: Select a[1] which is array type for parquet complex type
throw ClassCastException
Key: FLINK-29547
URL: https://issues.apache.org/jira/browse/FLINK-29547
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.16.0
Reporter: dalongliu
Fix For: 1.17.0
Regarding the following SQL test in HiveTableSourceITCase, it will throw
ClassCastException.
{code:java}
batchTableEnv.executeSql(
"create table parquet_complex_type_test("
+ "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>)
stored as parquet");
String[] modules = batchTableEnv.listModules();
// load hive module so that we can use array,map, named_struct function
// for convenient writing complex data
batchTableEnv.loadModule("hive", new HiveModule());
batchTableEnv.useModules("hive", CoreModuleFactory.IDENTIFIER);
batchTableEnv
.executeSql(
"insert into parquet_complex_type_test"
+ " select array(1, 2), map(1, 'val1', 2, 'val2'),"
+ " named_struct('f1', 1, 'f2', 2)")
.await();
Table src = batchTableEnv.sqlQuery("select a[1] from
parquet_complex_type_test");
List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect());{code}
The exception stack:
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to
[Ljava.lang.Integer;
at BatchExecCalc$37.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
at
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:98)
at
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:92)
at
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
at
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:401)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
After debugging the code, I found the root cause is that source operator reads
array data from parquet in the vectorized way, and it returns
ColumnarArrayData, then in the calc operator we convert it to GenericArrayData,
the object array is Object[] type instead of Integer[], so if we call the
ArrayObjectArrayConverter#toExternal method converts it to Integer[], it still
returns Object[] type, and then if convert the array to Integer[] type
forcedly, we will get the exception.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)