Hi,

版本:Flink-1.15.1 
模式:单机


s1为kafka的source表,已定义,
g_name为s1表的字段,string类型


select `g_name` from s1;
这个运行正常,有数据输出


然后加上 IF 语句,则会报错:
Flink SQL> select if(`g_name` is null, 'no', `g_name`) from s1;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
运行报错


堆栈里面的Unknown Source,感觉有问题。
at StreamExecCalc$1247.processElement_split196(Unknown Source) ~[?:?]


报错堆栈:
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
 ~[flink-connector-files-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_302]
Caused by: java.lang.NullPointerException
at StreamExecCalc$1247.processElement_split196(Unknown Source) ~[?:?]
at StreamExecCalc$1247.processElement(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
 ~[flink-connector-kafka-1.15.1.jar:1.15.1]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
 ~[flink-connector-files-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_302]


Thanks

Reply via email to