Hi, 请问使用的 mysql 维表是 flink sql 内置的 jdbc connector 吗? 如果是的话,对应内部的 cache 只是读缓存, 并不会持久化, 任务重启或者到达设定的缓存淘汰条件就失效了 如果是自己开发的维表,建议增加相应的数据加载日志, 以便确认 failover 时的处理是否有异常
Best, Lincoln Lee Xuchao <amber_...@qq.com.invalid> 于2022年6月24日周五 17:15写道: > 您好! > 我在使用flink时遇到一些问题。 > flink-1.14.4 > sqlserver-cdc-2.2.1 > yarn-per-job > > 我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点; > sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据; > 检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据; > > 以上,可能是什么问题,应该如何解决呢? > > 期待回复! > best wishes! > > 附日志: > 2022-06-24 14:55:45,950 ERROR > com.ververica.cdc.debezium.internal.Handover [] - Reporting > error: > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: > An exception occurred in the change event producer. This connector will be > stopped. > at > io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at > io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[?:1.8.0_301] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_301] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_301] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301] > Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数 > cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。 > at > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at > io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > ... 7 more > 2022-06-24 14:55:45,953 INFO io.debezium.embedded.EmbeddedEngine > [] - Stopping the embedded engine > 2022-06-24 14:55:45,954 INFO > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl > [] - Source: TableSourceScan(table=[[default_catalog, default_database, > carflow]], fields=[id, plate_license, site_id, create_time, flow_type, > circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, > (create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) > -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 > discarding 0 drained requests > 2022-06-24 14:55:45,955 INFO io.debezium.embedded.EmbeddedEngine > [] - Stopping the embedded engine > 2022-06-24 14:55:45,957 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: TableSourceScan(table=[[default_catalog, > default_database, carflow]], fields=[id, plate_license, site_id, > create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license, > site_id, create_time, (create_time + -28800000:INTERVAL HOUR) AS c_time, > flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time], > watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02) switched > from RUNNING to FAILED with failure cause: > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: > An exception occurred in the change event producer. This connector will be > stopped. > at > io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292) > at > io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152) > at > io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数 > cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。 > at > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427) > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758) > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016) > at > io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181) > at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608) > at > io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171) > ... 7 more > > 2022-06-24 14:55:45,957 INFO org.apache.flink.runtime.taskmanager.Task > [] - Freeing task resources for Source: > TableSourceScan(table=[[default_catalog, default_database, carflow]], > fields=[id, plate_license, site_id, create_time, flow_type, circle_id]) -> > Calc(select=[id, plate_license, site_id, create_time, (create_time + > -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> > WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 > (71206ba8149ac20bb39d8169ff3d2f02). > 2022-06-24 15:03:57,819 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - > Un-registering task and sending final execution state FINISHED to > JobManager for task Source: TableSourceScan(table=[[default_catalog, > default_database, sitecar]], fields=[car_number, site_id, first_oil_time, > is_tq_car, relation_id]) -> WatermarkAssigner(rowtime=[first_oil_time], > watermark=[first_oil_time]) (1/1)#1 9ebcb0fc15ced6db3f2a579510e415ee. > 2022-06-24 15:06:35,005 INFO io.debezium.connector.common.BaseSourceTask > [] - 23591 records sent during previous 00:05:19.499, last > recorded offset: {transaction_id=null, event_serial_no=1, > commit_lsn=0000162c:00016205:00d0, change_lsn=0000162c:00016205:00c7} > > > > > > amber_...@qq.com >