[ 
https://issues.apache.org/jira/browse/FLINK-20181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianxun Ye updated FLINK-20181:
-------------------------------
    Description: 
I want to emit CDC data by my own StreamOperator.

flink version :1.11.2, blink planner.
{code:java}
//代码占位符
    getTableEnv().registerTableSource(
        "source",
        new StreamTableSource<RowData>() {
          TableSchema tableSchema = TableSchema.builder()
              .field("id", new AtomicDataType(new IntType(false)))
              .field("name", DataTypes.STRING())
              .field("type", DataTypes.STRING())
              .primaryKey("id")
              .build();          @Override
          public DataStream<RowData> getDataStream(StreamExecutionEnvironment 
execEnv) {
            return execEnv.addSource(new 
DebugSourceFunction(tableSchema.toRowDataType()));
          }          @Override
          public TableSchema getTableSchema() {
            return tableSchema;
          }          @Override
          public DataType getProducedDataType() {
            return getTableSchema().toRowDataType().bridgedTo(RowData.class);
          }
        }
    );
    sql("insert into Test.testdb.animal "
            + " SELECT id, name, type, '2020' as da, '11' as hr"
            + " from source"
    );  

class DebugSourceFunction extends RichParallelSourceFunction<RowData> 
implements ResultTypeQueryable<RowData> {           
DataType dataType;    
    public DebugSourceFunction(DataType dataType) {
      this.dataType = dataType;
    }    
    @Override
    public TypeInformation<RowData> getProducedType() {
      return (TypeInformation<RowData>) createTypeInformation(dataType);
    }    
    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {
      ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, 
StringData.fromString("monkey"), StringData.fromString("small")));
    }    
    @Override
    public void cancel() {    }    public TypeInformation<?> 
createTypeInformation(DataType producedDataType) {
      final DataType internalDataType = DataTypeUtils.transform(
          producedDataType,
          TypeTransformations.TO_INTERNAL_CLASS);
      return fromDataTypeToTypeInfo(internalDataType);
    }
  }  

public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, 
OverwritableTableSink, PartitionableTableSink {
     @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
RowData>> dataStream) {
      
      DataStream<Void> returnStream = dataStream
          .map(
              (MapFunction<Tuple2<Boolean, RowData>, RowData>)
                  value -> value.f1
          )
          ......      
      return returnStream
          .addSink(new DiscardingSink<>())
          .setParallelism(1);
    }
  }
{code}
when I execute sql with `insert into ...`, occurs class cast fail exception:
{code:java}
//代码占位符
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.GenericRowData cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.GenericRowData cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at StreamExecCalc$8.processElement(Unknown Source) at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
{code}

  was:
I want to emit CDC data by my own StreamOperator.

flink version :1.11.2, blink planner.
{code:java}

//代码占位符
  getTableEnv().registerTableSource(
        "source",
        new StreamTableSource<RowData>() {
          TableSchema tableSchema = TableSchema.builder()
              .field("id", new AtomicDataType(new IntType(false)))
              .field("name", DataTypes.STRING())
              .field("type", DataTypes.STRING())
              .primaryKey("id")
              .build();          @Override
          public DataStream<RowData> getDataStream(StreamExecutionEnvironment 
execEnv) {
            return execEnv.addSource(new 
DebugSourceFunction(tableSchema.toRowDataType()));
          }          @Override
          public TableSchema getTableSchema() {
            return tableSchema;
          }          @Override
          public DataType getProducedDataType() {
            return getTableSchema().toRowDataType().bridgedTo(RowData.class);
          }
        }
    );
    sql("insert into Test.testdb.animal "
            + " SELECT id, name, type, '2020' as da, '11' as hr"
            + " from source"
    );  

class DebugSourceFunction extends RichParallelSourceFunction<RowData> 
implements ResultTypeQueryable<RowData> {    DataType dataType;    public 
DebugSourceFunction(DataType dataType) {
      this.dataType = dataType;
    }    @Override
    public TypeInformation<RowData> getProducedType() {
      return (TypeInformation<RowData>) createTypeInformation(dataType);
    }    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {
      ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, 
StringData.fromString("monkey"), StringData.fromString("small")));
    }    @Override
    public void cancel() {    }    public TypeInformation<?> 
createTypeInformation(DataType producedDataType) {
      final DataType internalDataType = DataTypeUtils.transform(
          producedDataType,
          TypeTransformations.TO_INTERNAL_CLASS);
      return fromDataTypeToTypeInfo(internalDataType);
    }
  }  

public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, 
OverwritableTableSink, PartitionableTableSink {
     @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
RowData>> dataStream) {
      
      DataStream<Void> returnStream = dataStream
          .map(
              (MapFunction<Tuple2<Boolean, RowData>, RowData>)
                  value -> value.f1
          )
          ......      return returnStream
          .addSink(new DiscardingSink<>())
          .setParallelism(1);
    }
  }
{code}
when I execute sql with `insert into ...`, occurs class cast fail exception:
{code:java}
//代码占位符
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.GenericRowData cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.GenericRowData cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at StreamExecCalc$8.processElement(Unknown Source) at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
{code}


> RowData cannot cast to Tuple2
> -----------------------------
>
>                 Key: FLINK-20181
>                 URL: https://issues.apache.org/jira/browse/FLINK-20181
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Xianxun Ye
>            Priority: Major
>
> I want to emit CDC data by my own StreamOperator.
> flink version :1.11.2, blink planner.
> {code:java}
> //代码占位符
>     getTableEnv().registerTableSource(
>         "source",
>         new StreamTableSource<RowData>() {
>           TableSchema tableSchema = TableSchema.builder()
>               .field("id", new AtomicDataType(new IntType(false)))
>               .field("name", DataTypes.STRING())
>               .field("type", DataTypes.STRING())
>               .primaryKey("id")
>               .build();          @Override
>           public DataStream<RowData> getDataStream(StreamExecutionEnvironment 
> execEnv) {
>             return execEnv.addSource(new 
> DebugSourceFunction(tableSchema.toRowDataType()));
>           }          @Override
>           public TableSchema getTableSchema() {
>             return tableSchema;
>           }          @Override
>           public DataType getProducedDataType() {
>             return getTableSchema().toRowDataType().bridgedTo(RowData.class);
>           }
>         }
>     );
>     sql("insert into Test.testdb.animal "
>             + " SELECT id, name, type, '2020' as da, '11' as hr"
>             + " from source"
>     );  
> class DebugSourceFunction extends RichParallelSourceFunction<RowData> 
> implements ResultTypeQueryable<RowData> {           
> DataType dataType;    
>     public DebugSourceFunction(DataType dataType) {
>       this.dataType = dataType;
>     }    
>     @Override
>     public TypeInformation<RowData> getProducedType() {
>       return (TypeInformation<RowData>) createTypeInformation(dataType);
>     }    
>     @Override
>     public void run(SourceContext<RowData> ctx) throws Exception {
>       ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, 
> StringData.fromString("monkey"), StringData.fromString("small")));
>     }    
>     @Override
>     public void cancel() {    }    public TypeInformation<?> 
> createTypeInformation(DataType producedDataType) {
>       final DataType internalDataType = DataTypeUtils.transform(
>           producedDataType,
>           TypeTransformations.TO_INTERNAL_CLASS);
>       return fromDataTypeToTypeInfo(internalDataType);
>     }
>   }  
> public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, 
> OverwritableTableSink, PartitionableTableSink {
>      @Override
>     public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
> RowData>> dataStream) {
>       
>       DataStream<Void> returnStream = dataStream
>           .map(
>               (MapFunction<Tuple2<Boolean, RowData>, RowData>)
>                   value -> value.f1
>           )
>           ......      
>       return returnStream
>           .addSink(new DiscardingSink<>())
>           .setParallelism(1);
>     }
>   }
> {code}
> when I execute sql with `insert into ...`, occurs class cast fail exception:
> {code:java}
> //代码占位符
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.table.data.GenericRowData cannot be cast to 
> org.apache.flink.api.java.tuple.Tuple2Caused by: 
> java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData 
> cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  at StreamExecCalc$8.processElement(Unknown Source) at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to