Hello,

I have to convert the table to Datastream and try to do it with
toAppendStream (just saw that it is deprecated )
But I have not been able to do the conversion as yet. (See the attached
code).
Also my final Sink should be Kafka and the format ObjectNode/JSON.
So I need a serializer eventually.

What am I doing wrong? Can I convert to an ObjectNode with a serializer
directly?
Both toAppendStream and toDataStream fail with the same error.

It fails with (shortened stack trace)
java.io.IOException: Failed to deserialize consumer record due to
        at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54)
~[?:?]
....
Caused by: java.io.IOException: Failed to deserialize consumer record
ConsumerRecord(topic = cag-verwerkingstijden-input, partition = 18,
leaderEpoch = 0, offset = 27070, CreateTime = 1645105052689, serialized key
size = -1, serialized value size = 3587, headers = RecordHeaders(headers =
[], isReadOnly = false), key = null, value = [B@7fcb6863).
...
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
...
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
..
Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input
conversion from external DataStream API to internal Table API data
structures. Make sure that the provided data types that configure the
converters are correctly declared in the schema. Affected record:

Table result = tableEnv.sqlQuery("select transactionId" +
        ", originalEvent" +
        ", handlingTime" +
        ", handlingTime - ifnull(lag(handlingTime) over (partition by
transactionId order by eventTime), handlingTime) as elapsedTime" +
        " from " + tupled4DsTable + " order by eventTime");

result.printSchema();


*TupleTypeInfo<Tuple4<String, String, Long, Long>> tupleType = new
TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(),
Types.LONG()); <-- deprecated and failsDataStream<Tuple4<String,
String, Long, Long>> dsRow = tableEnv.toAppendStream(result,
tupleType)*;                                             *<--
deprecated and fails*

*DataStream<Row> xx = tableEnv.toDataStream(result); <-- fails with
the same error*

Regards Hans

Attachment: code.java
Description: Binary data

Reply via email to