FLink版本为1.12.0,自定义了一个mqtt的数据源,在flink sql中可以获取数据,通过connector='print'打印出来。但是输出换成JDBC就报错了,一条数据也没有输出到数据库中。
把source换成datagen测试,就可以写入到jdbc中。所以,是mqtt数据源的问题吗?还是因为mqtt数据太大了,数据库形成了反压?可是个人理解,即便是反压,也应该可以写入几条到数据库中。 每次报错都停留在 https://github.com/apache/flink/blob/release-1.12.0/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/AbstractJdbcRowConverter.java#L220 <https://github.com/apache/flink/blob/release-1.12.0/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/AbstractJdbcRowConverter.java#L220> 每次都报错在这里,说statement为null 已经在SQL中对null数据做了处理。 INSERT INTO ods_iot_message SELECT IF(`tag` is null, '', `tag`), IF(`value` is null, '', `value`), IF(`quality` is null, 0, `quality`), IF(`topic` is null, '', `topic`), `ts` FROM source2; 完整异常如下: 2021-05-07 08:58:18,540 ERROR com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction - +I(nxseq1474,0.644,0,nxs48,2021-05-07T09:00:39.064) 2021-05-07 08:58:18,540 ERROR com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction - Could not forward element to next operator org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction$1.messageArrived(MqttSourceFunction.java:130) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:519) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417) at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214) at java.lang.Thread.run(Thread.java:823) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ... 11 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$18.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ... 17 more Caused by: java.io.IOException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:159) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ... 23 more Caused by: java.lang.NullPointerException at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$createExternalConverter$57fde215$8(AbstractJdbcRowConverter.java:220) at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter$$Lambda$1220/0000000000000000.serialize(Unknown Source) at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableExternalConverter$2bf50691$1(AbstractJdbcRowConverter.java:194) at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter$$Lambda$1234/0000000000000000.serialize(Unknown Source) at org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:87) at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.addToBatch(TableSimpleStatementExecutor.java:57) at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.addToBatch(TableSimpleStatementExecutor.java:35) at org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor.executeBatch(TableBufferedStatementExecutor.java:62) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:202) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:173) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:156) ... 27 more Best, TonyChen