FLink版本为1.12.0,自定义了一个mqtt的数据源,在flink 
sql中可以获取数据,通过connector='print'打印出来。但是输出换成JDBC就报错了,一条数据也没有输出到数据库中。

把source换成datagen测试,就可以写入到jdbc中。所以,是mqtt数据源的问题吗?还是因为mqtt数据太大了,数据库形成了反压?可是个人理解,即便是反压,也应该可以写入几条到数据库中。

每次报错都停留在
https://github.com/apache/flink/blob/release-1.12.0/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/AbstractJdbcRowConverter.java#L220
 
<https://github.com/apache/flink/blob/release-1.12.0/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/AbstractJdbcRowConverter.java#L220>

每次都报错在这里,说statement为null

已经在SQL中对null数据做了处理。
INSERT INTO ods_iot_message
SELECT IF(`tag` is null, '', `tag`),
       IF(`value` is null, '', `value`),
       IF(`quality` is null, 0, `quality`),
       IF(`topic` is null, '', `topic`),
       `ts`
FROM source2;


完整异常如下:
2021-05-07 08:58:18,540 ERROR 
com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction  - 
+I(nxseq1474,0.644,0,nxs48,2021-05-07T09:00:39.064)
2021-05-07 08:58:18,540 ERROR 
com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction  - Could not 
forward element to next operator
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
  at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
  at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
  at 
com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction$1.messageArrived(MqttSourceFunction.java:130)
  at 
org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:519)
  at 
org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417)
  at 
org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214)
  at java.lang.Thread.run(Thread.java:823)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
  at 
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  ... 11 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
  at StreamExecCalc$18.processElement(Unknown Source)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  ... 17 more
Caused by: java.io.IOException: Writing records to JDBC failed.
  at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:159)
  at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
  at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
  at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  ... 23 more
Caused by: java.lang.NullPointerException
  at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$createExternalConverter$57fde215$8(AbstractJdbcRowConverter.java:220)
  at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter$$Lambda$1220/0000000000000000.serialize(Unknown
 Source)
  at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableExternalConverter$2bf50691$1(AbstractJdbcRowConverter.java:194)
  at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter$$Lambda$1234/0000000000000000.serialize(Unknown
 Source)
  at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:87)
  at 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.addToBatch(TableSimpleStatementExecutor.java:57)
  at 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.addToBatch(TableSimpleStatementExecutor.java:35)
  at 
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor.executeBatch(TableBufferedStatementExecutor.java:62)
  at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:202)
  at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:173)
  at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:156)
  ... 27 more


Best,
TonyChen

回复