这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是: 1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“ 2、一直没有 join 上 mysql 的数据导致的。 可以设置下 数据库的 wait_timeout 看下 PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。 最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致 这条数据被自动ack而丢弃的。 如果开启 checkpoint 的话,下游支持 upsert 或者 精确一次语意的话,就会保证全链路精确一次,要不然会最少一次,就是会重复
At 2020-11-12 13:36:11, "xiexinyuan341" <xiexinyuan...@163.com> wrote: >souce是kafka,使用JdbcRowDataLookupFunction作为维表.异常信息是这样的,看了下日志,这种异常基本上每10多分钟就会有一次. >2020-11-12 01:00:09.028 ERROR JdbcRowDataLookupFunction.java:170 JDBC >executeBatch error, retry times = 1 >com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link >failure > >The last packet successfully received from the server was 815,816 >milliseconds ago. The last packet sent successfully to the server was 1 >milliseconds ago. > at sun.reflect.GeneratedConstructorAccessor16.newInstance(Unknown > Source) > at >sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:389) > at >com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038) > at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422) > at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762) > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2535) > at >com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1911) > at >com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2034) > at >org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152) > at LookupFunction$10.flatMap(Unknown Source) > at >org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) > at >org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) > at >org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at >org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at >org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at StreamExecCalc$7.processElement(Unknown Source) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at >org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at >org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at >org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > at >org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:75) > at JoinTableFuncCollector$6.collect(Unknown Source) > at >org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:203) > at >org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:162) > at LookupFunction$2.flatMap(Unknown Source) > at >org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) > at >org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) > at >org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at >org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at >org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at >org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at >org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at >org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > at >org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > at >org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at >org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at >org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) >Caused by: java.io.EOFException: Can not read response from server. Expected >to read 4 bytes, read 0 bytes before connection was unexpectedly lost. > at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) > at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3332) > ... 46 common frames omitted >2020-11-12 01:00:10.503 ERROR JdbcBatchingOutputFormat.java:175 JDBC >executeBatch error, retry times = 1 >java.sql.SQLException: Could not retrieve transation read-only status server > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:904) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:894) > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3613) > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3582) > at >com.mysql.jdbc.PreparedStatement.executeBatch(PreparedStatement.java:1249) > at >org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:71) > at >org.apache.flink.connector.jdbc.internal.executor.BufferReduceStatementExecutor.executeBatch(BufferReduceStatementExecutor.java:98) > at >org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200) > at >org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171) > at >org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at >java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at >java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at >java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at >java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) >Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: >Communications link failure > >The last packet successfully received from the server was 817,119 >milliseconds ago. The last packet sent successfully to the server was 0 >milliseconds ago. > at sun.reflect.GeneratedConstructorAccessor16.newInstance(Unknown > Source) > at >sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:389) > at >com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038) > at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422) > at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762) > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2531) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2489) > at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1446) > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3607) > ... 14 common frames omitted >Caused by: java.io.EOFException: Can not read response from server. Expected >to read 4 bytes, read 0 bytes before connection was unexpectedly lost. > at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) > at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3332) > ... 22 common frames omitted > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/