Xianxun Ye created FLINK-20181:
----------------------------------
Summary: RowData cannot cast to Tuple2
Key: FLINK-20181
URL: https://issues.apache.org/jira/browse/FLINK-20181
Project: Flink
Issue Type: Bug
Reporter: Xianxun Ye
I want to emit CDC data by my own StreamOperator.
flink version :1.11.2, blink planner.
{code:java}
//代码占位符
getTableEnv().registerTableSource(
"source",
new StreamTableSource<RowData>() {
TableSchema tableSchema = TableSchema.builder()
.field("id", new AtomicDataType(new IntType(false)))
.field("name", DataTypes.STRING())
.field("type", DataTypes.STRING())
.primaryKey("id")
.build(); @Override
public DataStream<RowData> getDataStream(StreamExecutionEnvironment
execEnv) {
return execEnv.addSource(new
DebugSourceFunction(tableSchema.toRowDataType()));
} @Override
public TableSchema getTableSchema() {
return tableSchema;
} @Override
public DataType getProducedDataType() {
return getTableSchema().toRowDataType().bridgedTo(RowData.class);
}
}
);
sql("insert into Test.testdb.animal "
+ " SELECT id, name, type, '2020' as da, '11' as hr"
+ " from source"
);
class DebugSourceFunction extends RichParallelSourceFunction<RowData>
implements ResultTypeQueryable<RowData> { DataType dataType; public
DebugSourceFunction(DataType dataType) {
this.dataType = dataType;
} @Override
public TypeInformation<RowData> getProducedType() {
return (TypeInformation<RowData>) createTypeInformation(dataType);
} @Override
public void run(SourceContext<RowData> ctx) throws Exception {
ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1,
StringData.fromString("monkey"), StringData.fromString("small")));
} @Override
public void cancel() { } public TypeInformation<?>
createTypeInformation(DataType producedDataType) {
final DataType internalDataType = DataTypeUtils.transform(
producedDataType,
TypeTransformations.TO_INTERNAL_CLASS);
return fromDataTypeToTypeInfo(internalDataType);
}
}
public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>,
OverwritableTableSink, PartitionableTableSink {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean,
RowData>> dataStream) {
DataStream<Void> returnStream = dataStream
.map(
(MapFunction<Tuple2<Boolean, RowData>, RowData>)
value -> value.f1
)
...... return returnStream
.addSink(new DiscardingSink<>())
.setParallelism(1);
}
}
{code}
when I execute sql with `insert into ...`, occurs class cast fail exception:
{code:java}
//代码占位符
Caused by: java.lang.ClassCastException:
org.apache.flink.table.data.GenericRowData cannot be cast to
org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException:
org.apache.flink.table.data.GenericRowData cannot be cast to
org.apache.flink.api.java.tuple.Tuple2 at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$8.processElement(Unknown Source) at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)