Hi,
   请问使用的 mysql 维表是 flink sql 内置的 jdbc connector 吗? 如果是的话,对应内部的 cache 只是读缓存,
并不会持久化, 任务重启或者到达设定的缓存淘汰条件就失效了
   如果是自己开发的维表,建议增加相应的数据加载日志, 以便确认 failover 时的处理是否有异常

Best,
Lincoln Lee


Xuchao <amber_...@qq.com.invalid> 于2022年6月24日周五 17:15写道:

> 您好!
>     我在使用flink时遇到一些问题。
> flink-1.14.4
> sqlserver-cdc-2.2.1
> yarn-per-job
>
> 我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点;
> sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据;
> 检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据;
>
> 以上,可能是什么问题,应该如何解决呢?
>
> 期待回复!
> best wishes!
>
> 附日志:
> 2022-06-24 14:55:45,950 ERROR
> com.ververica.cdc.debezium.internal.Handover                 [] - Reporting
> error:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the change event producer. This connector will be
> stopped.
> at
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_301]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_301]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
> Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数
> cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> ... 7 more
> 2022-06-24 14:55:45,953 INFO  io.debezium.embedded.EmbeddedEngine
>                 [] - Stopping the embedded engine
> 2022-06-24 14:55:45,954 INFO
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
> [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> carflow]], fields=[id, plate_license, site_id, create_time, flow_type,
> circle_id]) -> Calc(select=[id, plate_license, site_id, create_time,
> (create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id])
> -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0
> discarding 0 drained requests
> 2022-06-24 14:55:45,955 INFO  io.debezium.embedded.EmbeddedEngine
>                 [] - Stopping the embedded engine
> 2022-06-24 14:55:45,957 WARN  org.apache.flink.runtime.taskmanager.Task
>                 [] - Source: TableSourceScan(table=[[default_catalog,
> default_database, carflow]], fields=[id, plate_license, site_id,
> create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license,
> site_id, create_time, (create_time + -28800000:INTERVAL HOUR) AS c_time,
> flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time],
> watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02) switched
> from RUNNING to FAILED with failure cause:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the change event producer. This connector will be
> stopped.
> at
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数
> cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
> at
> io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
> at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
> at
> io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
> ... 7 more
>
> 2022-06-24 14:55:45,957 INFO  org.apache.flink.runtime.taskmanager.Task
>                 [] - Freeing task resources for Source:
> TableSourceScan(table=[[default_catalog, default_database, carflow]],
> fields=[id, plate_license, site_id, create_time, flow_type, circle_id]) ->
> Calc(select=[id, plate_license, site_id, create_time, (create_time +
> -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) ->
> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0
> (71206ba8149ac20bb39d8169ff3d2f02).
> 2022-06-24 15:03:57,819 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
> Un-registering task and sending final execution state FINISHED to
> JobManager for task Source: TableSourceScan(table=[[default_catalog,
> default_database, sitecar]], fields=[car_number, site_id, first_oil_time,
> is_tq_car, relation_id]) -> WatermarkAssigner(rowtime=[first_oil_time],
> watermark=[first_oil_time]) (1/1)#1 9ebcb0fc15ced6db3f2a579510e415ee.
> 2022-06-24 15:06:35,005 INFO  io.debezium.connector.common.BaseSourceTask
>                 [] - 23591 records sent during previous 00:05:19.499, last
> recorded offset: {transaction_id=null, event_serial_no=1,
> commit_lsn=0000162c:00016205:00d0, change_lsn=0000162c:00016205:00c7}
>
>
>
>
>
> amber_...@qq.com
>

回复