[ https://issues.apache.org/jira/browse/FLINK-20181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xianxun Ye updated FLINK-20181: ------------------------------- Description: I want to emit CDC data by my own StreamOperator. flink version :1.11.2, blink planner. {code:java} //代码占位符 getTableEnv().registerTableSource( "source", new StreamTableSource<RowData>() { TableSchema tableSchema = TableSchema.builder() .field("id", new AtomicDataType(new IntType(false))) .field("name", DataTypes.STRING()) .field("type", DataTypes.STRING()) .primaryKey("id") .build(); @Override public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) { return execEnv.addSource(new DebugSourceFunction(tableSchema.toRowDataType())); } @Override public TableSchema getTableSchema() { return tableSchema; } @Override public DataType getProducedDataType() { return getTableSchema().toRowDataType().bridgedTo(RowData.class); } } ); sql("insert into Test.testdb.animal " + " SELECT id, name, type, '2020' as da, '11' as hr" + " from source" ); class DebugSourceFunction extends RichParallelSourceFunction<RowData> implements ResultTypeQueryable<RowData> { DataType dataType; public DebugSourceFunction(DataType dataType) { this.dataType = dataType; } @Override public TypeInformation<RowData> getProducedType() { return (TypeInformation<RowData>) createTypeInformation(dataType); } @Override public void run(SourceContext<RowData> ctx) throws Exception { ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, StringData.fromString("monkey"), StringData.fromString("small"))); } @Override public void cancel() { } public TypeInformation<?> createTypeInformation(DataType producedDataType) { final DataType internalDataType = DataTypeUtils.transform( producedDataType, TypeTransformations.TO_INTERNAL_CLASS); return fromDataTypeToTypeInfo(internalDataType); } } public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, OverwritableTableSink, PartitionableTableSink { @Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, RowData>> dataStream) { DataStream<Void> returnStream = dataStream .map( (MapFunction<Tuple2<Boolean, RowData>, RowData>) value -> value.f1 ) ...... return returnStream .addSink(new DiscardingSink<>()) .setParallelism(1); } } {code} when I execute sql with `insert into ...`, occurs class cast fail exception: {code:java} //代码占位符 Caused by: java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$8.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) {code} was: I want to emit CDC data by my own StreamOperator. flink version :1.11.2, blink planner. {code:java} //代码占位符 getTableEnv().registerTableSource( "source", new StreamTableSource<RowData>() { TableSchema tableSchema = TableSchema.builder() .field("id", new AtomicDataType(new IntType(false))) .field("name", DataTypes.STRING()) .field("type", DataTypes.STRING()) .primaryKey("id") .build(); @Override public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) { return execEnv.addSource(new DebugSourceFunction(tableSchema.toRowDataType())); } @Override public TableSchema getTableSchema() { return tableSchema; } @Override public DataType getProducedDataType() { return getTableSchema().toRowDataType().bridgedTo(RowData.class); } } ); sql("insert into Test.testdb.animal " + " SELECT id, name, type, '2020' as da, '11' as hr" + " from source" ); class DebugSourceFunction extends RichParallelSourceFunction<RowData> implements ResultTypeQueryable<RowData> { DataType dataType; public DebugSourceFunction(DataType dataType) { this.dataType = dataType; } @Override public TypeInformation<RowData> getProducedType() { return (TypeInformation<RowData>) createTypeInformation(dataType); } @Override public void run(SourceContext<RowData> ctx) throws Exception { ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, StringData.fromString("monkey"), StringData.fromString("small"))); } @Override public void cancel() { } public TypeInformation<?> createTypeInformation(DataType producedDataType) { final DataType internalDataType = DataTypeUtils.transform( producedDataType, TypeTransformations.TO_INTERNAL_CLASS); return fromDataTypeToTypeInfo(internalDataType); } } public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, OverwritableTableSink, PartitionableTableSink { @Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, RowData>> dataStream) { DataStream<Void> returnStream = dataStream .map( (MapFunction<Tuple2<Boolean, RowData>, RowData>) value -> value.f1 ) ...... return returnStream .addSink(new DiscardingSink<>()) .setParallelism(1); } } {code} when I execute sql with `insert into ...`, occurs class cast fail exception: {code:java} //代码占位符 Caused by: java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$8.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) {code} > RowData cannot cast to Tuple2 > ----------------------------- > > Key: FLINK-20181 > URL: https://issues.apache.org/jira/browse/FLINK-20181 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Xianxun Ye > Priority: Major > > I want to emit CDC data by my own StreamOperator. > flink version :1.11.2, blink planner. > {code:java} > //代码占位符 > getTableEnv().registerTableSource( > "source", > new StreamTableSource<RowData>() { > TableSchema tableSchema = TableSchema.builder() > .field("id", new AtomicDataType(new IntType(false))) > .field("name", DataTypes.STRING()) > .field("type", DataTypes.STRING()) > .primaryKey("id") > .build(); @Override > public DataStream<RowData> getDataStream(StreamExecutionEnvironment > execEnv) { > return execEnv.addSource(new > DebugSourceFunction(tableSchema.toRowDataType())); > } @Override > public TableSchema getTableSchema() { > return tableSchema; > } @Override > public DataType getProducedDataType() { > return getTableSchema().toRowDataType().bridgedTo(RowData.class); > } > } > ); > sql("insert into Test.testdb.animal " > + " SELECT id, name, type, '2020' as da, '11' as hr" > + " from source" > ); > class DebugSourceFunction extends RichParallelSourceFunction<RowData> > implements ResultTypeQueryable<RowData> { > DataType dataType; > public DebugSourceFunction(DataType dataType) { > this.dataType = dataType; > } > @Override > public TypeInformation<RowData> getProducedType() { > return (TypeInformation<RowData>) createTypeInformation(dataType); > } > @Override > public void run(SourceContext<RowData> ctx) throws Exception { > ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, > StringData.fromString("monkey"), StringData.fromString("small"))); > } > @Override > public void cancel() { } public TypeInformation<?> > createTypeInformation(DataType producedDataType) { > final DataType internalDataType = DataTypeUtils.transform( > producedDataType, > TypeTransformations.TO_INTERNAL_CLASS); > return fromDataTypeToTypeInfo(internalDataType); > } > } > public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, > OverwritableTableSink, PartitionableTableSink { > @Override > public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, > RowData>> dataStream) { > > DataStream<Void> returnStream = dataStream > .map( > (MapFunction<Tuple2<Boolean, RowData>, RowData>) > value -> value.f1 > ) > ...... > return returnStream > .addSink(new DiscardingSink<>()) > .setParallelism(1); > } > } > {code} > when I execute sql with `insert into ...`, occurs class cast fail exception: > {code:java} > //代码占位符 > Caused by: java.lang.ClassCastException: > org.apache.flink.table.data.GenericRowData cannot be cast to > org.apache.flink.api.java.tuple.Tuple2Caused by: > java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData > cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at StreamExecCalc$8.processElement(Unknown Source) at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)