Flink??1.15.2

I am now going to change the data stream 
from&nbsp;DataStream<String&gt;&nbsp;to&nbsp;DataStream<ROW&gt;


Already implemented (insert only works fine), but when DataStream<String&gt; 
contains&nbsp;update&nbsp;information


The error is:
Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input 
conversion. Conversion expects insert-only records but DataStream API record 
contains: UPDATE_BEFORE
at 
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180)
at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
at 
org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118)
at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)


kafkaflink.java:179-180 lines of code
Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE, beforeObject, 
rowTypeInfo);
collector. collect(before);The before data output is -U[1, test, 123-456-789]


I would like to know : How to convert the stream 
containing&nbsp;update&nbsp;data 
from&nbsp;DataStream<String&gt;&nbsp;to&nbsp;DataStream<ROW&gt;

Reply via email to