[ https://issues.apache.org/jira/browse/FLINK-28552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jane Chan updated FLINK-28552: ------------------------------ Description: Currently, changelog mode cannot support map and multiset as the field type. More specifically, * MULTISET is not supported at all, including append-only mode. ( Caused by: java.lang.UnsupportedOperationException: Unsupported type: MULTISET<TINYINT> at org.apache.flink.table.store.shaded.org.apache.flink.orc.OrcSplitReaderUtil.logicalTypeToOrcType(OrcSplitReaderUtil.java:214) at org.apache.flink.table.store.shaded.org.apache.flink.orc.OrcSplitReaderUtil.logicalTypeToOrcType(OrcSplitReaderUtil.java:210) at org.apache.flink.table.store.format.orc.OrcFileFormat.createWriterFactory(OrcFileFormat.java:94) at org.apache.flink.table.store.file.data.AppendOnlyWriter$RowRollingWriter.lambda$createRollingRowWriter$0(AppendOnlyWriter.java:229) at org.apache.flink.table.store.file.writer.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:73) at org.apache.flink.table.store.file.writer.RollingFileWriter.write(RollingFileWriter.java:61) at org.apache.flink.table.store.file.data.AppendOnlyWriter.write(AppendOnlyWriter.java:108) at org.apache.flink.table.store.file.data.AppendOnlyWriter.write(AppendOnlyWriter.java:56) at org.apache.flink.table.store.table.AppendOnlyFileStoreTable$3.writeSinkRecord(AppendOnlyFileStoreTable.java:119) at org.apache.flink.table.store.table.sink.AbstractTableWrite.write(AbstractTableWrite.java:76) at org.apache.flink.table.store.connector.sink.StoreWriteOperator.processElement(StoreWriteOperator.java:124) ... 13 more) * MAP cannot be pk for key-value mode, and cannot be the fields for value-count mode. Stacktrace java.lang.UnsupportedOperationException at org.apache.flink.table.store.codegen.GenerateUtils$.generateCompare(GenerateUtils.scala:139) at org.apache.flink.table.store.codegen.GenerateUtils$.$anonfun$generateRowCompare$1(GenerateUtils.scala:289) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at org.apache.flink.table.store.codegen.GenerateUtils$.generateRowCompare(GenerateUtils.scala:263) at org.apache.flink.table.store.codegen.ComparatorCodeGenerator$.gen(ComparatorCodeGenerator.scala:45) at org.apache.flink.table.store.codegen.ComparatorCodeGenerator.gen(ComparatorCodeGenerator.scala) at org.apache.flink.table.store.codegen.CodeGeneratorImpl.generateRecordComparator(CodeGeneratorImpl.java:53) at org.apache.flink.table.store.codegen.CodeGenUtils.generateRecordComparator(CodeGenUtils.java:66) at org.apache.flink.table.store.file.utils.KeyComparatorSupplier.<init>(KeyComparatorSupplier.java:40) at org.apache.flink.table.store.file.KeyValueFileStore.<init>(KeyValueFileStore.java:59) at org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable.<init>(ChangelogValueCountFileStoreTable.java:73) at org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:70) at org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:50) at org.apache.flink.table.store.spark.SimpleTableTestHelper.<init>(SimpleTableTestHelper.java:58) at org.apache.flink.table.store.spark.SparkReadITCase.startMetastoreAndSpark(SparkReadITCase.java:93) was: Currently, changelog mode cannot support map and multiset as the field type. More specifically, they cannot be pk for key-value mode, and cannot be the fields for value-count mode. Stacktrace java.lang.UnsupportedOperationException at org.apache.flink.table.store.codegen.GenerateUtils$.generateCompare(GenerateUtils.scala:139) at org.apache.flink.table.store.codegen.GenerateUtils$.$anonfun$generateRowCompare$1(GenerateUtils.scala:289) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at org.apache.flink.table.store.codegen.GenerateUtils$.generateRowCompare(GenerateUtils.scala:263) at org.apache.flink.table.store.codegen.ComparatorCodeGenerator$.gen(ComparatorCodeGenerator.scala:45) at org.apache.flink.table.store.codegen.ComparatorCodeGenerator.gen(ComparatorCodeGenerator.scala) at org.apache.flink.table.store.codegen.CodeGeneratorImpl.generateRecordComparator(CodeGeneratorImpl.java:53) at org.apache.flink.table.store.codegen.CodeGenUtils.generateRecordComparator(CodeGenUtils.java:66) at org.apache.flink.table.store.file.utils.KeyComparatorSupplier.<init>(KeyComparatorSupplier.java:40) at org.apache.flink.table.store.file.KeyValueFileStore.<init>(KeyValueFileStore.java:59) at org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable.<init>(ChangelogValueCountFileStoreTable.java:73) at org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:70) at org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:50) at org.apache.flink.table.store.spark.SimpleTableTestHelper.<init>(SimpleTableTestHelper.java:58) at org.apache.flink.table.store.spark.SparkReadITCase.startMetastoreAndSpark(SparkReadITCase.java:93) > GenerateUtils#generateCompare supports MULTISET and MAP > ------------------------------------------------------- > > Key: FLINK-28552 > URL: https://issues.apache.org/jira/browse/FLINK-28552 > Project: Flink > Issue Type: Improvement > Components: Table Store > Affects Versions: table-store-0.2.0 > Reporter: Jane Chan > Priority: Minor > Fix For: table-store-0.2.0 > > > Currently, changelog mode cannot support map and multiset as the field type. > More specifically, > * MULTISET is not supported at all, including append-only mode. ( > Caused by: java.lang.UnsupportedOperationException: Unsupported type: > MULTISET<TINYINT> > at > org.apache.flink.table.store.shaded.org.apache.flink.orc.OrcSplitReaderUtil.logicalTypeToOrcType(OrcSplitReaderUtil.java:214) > at > org.apache.flink.table.store.shaded.org.apache.flink.orc.OrcSplitReaderUtil.logicalTypeToOrcType(OrcSplitReaderUtil.java:210) > at > org.apache.flink.table.store.format.orc.OrcFileFormat.createWriterFactory(OrcFileFormat.java:94) > at > org.apache.flink.table.store.file.data.AppendOnlyWriter$RowRollingWriter.lambda$createRollingRowWriter$0(AppendOnlyWriter.java:229) > at > org.apache.flink.table.store.file.writer.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:73) > at > org.apache.flink.table.store.file.writer.RollingFileWriter.write(RollingFileWriter.java:61) > at > org.apache.flink.table.store.file.data.AppendOnlyWriter.write(AppendOnlyWriter.java:108) > at > org.apache.flink.table.store.file.data.AppendOnlyWriter.write(AppendOnlyWriter.java:56) > at > org.apache.flink.table.store.table.AppendOnlyFileStoreTable$3.writeSinkRecord(AppendOnlyFileStoreTable.java:119) > at > org.apache.flink.table.store.table.sink.AbstractTableWrite.write(AbstractTableWrite.java:76) > at > org.apache.flink.table.store.connector.sink.StoreWriteOperator.processElement(StoreWriteOperator.java:124) > ... 13 more) > * MAP cannot be pk for key-value mode, and cannot be the fields for > value-count mode. > > Stacktrace > java.lang.UnsupportedOperationException > at > org.apache.flink.table.store.codegen.GenerateUtils$.generateCompare(GenerateUtils.scala:139) > at > org.apache.flink.table.store.codegen.GenerateUtils$.$anonfun$generateRowCompare$1(GenerateUtils.scala:289) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) > at > org.apache.flink.table.store.codegen.GenerateUtils$.generateRowCompare(GenerateUtils.scala:263) > at > org.apache.flink.table.store.codegen.ComparatorCodeGenerator$.gen(ComparatorCodeGenerator.scala:45) > at > org.apache.flink.table.store.codegen.ComparatorCodeGenerator.gen(ComparatorCodeGenerator.scala) > at > org.apache.flink.table.store.codegen.CodeGeneratorImpl.generateRecordComparator(CodeGeneratorImpl.java:53) > at > org.apache.flink.table.store.codegen.CodeGenUtils.generateRecordComparator(CodeGenUtils.java:66) > at > org.apache.flink.table.store.file.utils.KeyComparatorSupplier.<init>(KeyComparatorSupplier.java:40) > at > org.apache.flink.table.store.file.KeyValueFileStore.<init>(KeyValueFileStore.java:59) > at > org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable.<init>(ChangelogValueCountFileStoreTable.java:73) > at > org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:70) > at > org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:50) > at > org.apache.flink.table.store.spark.SimpleTableTestHelper.<init>(SimpleTableTestHelper.java:58) > at > org.apache.flink.table.store.spark.SparkReadITCase.startMetastoreAndSpark(SparkReadITCase.java:93) -- This message was sent by Atlassian Jira (v8.20.10#820010)