[
https://issues.apache.org/jira/browse/FLINK-34759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828681#comment-17828681
]
Flink CDC Issue Import commented on FLINK-34759:
------------------------------------------------
Date: Wed Dec 07 21:46:51 CST 2022, Author: [ldwnt|https://github.com/ldwnt]
> Thanks for reporting. It is duplicate to
> ([#1757|https://github.com/apache/flink-cdc/issues/1757] |
> [FLINK-1757|https://issues.apache.org/jira/browse/FLINK-1757]). It has been
> fixed in ([#1758|https://github.com/apache/flink-cdc/issues/1758] |
> [FLINK-1758|https://issues.apache.org/jira/browse/FLINK-1758]) in the branch
> master.
I built the cdc jar with source code @ commit
https://github.com/ververica/flink-cdc-connectors/commit/c1a049ed1bb38d0ab336681b8ac34e4ef34a6fc2,
and repeated the scenario as below:
1) start a job A with cdc 2.1.1 (snapshot finished and binlog read)
2) stop job A, update some records in the source mysql
3) replace the jars and restart job A with cdc 2.3
The updated records were correctly read in job A, but the checkpoint can not be
completed:
```
2022-12-07 18:44:56,860 INFO [jobmanager-future-thread-1]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [303] - Restoring
SplitEnumerator of source Source: @s -> @p -> (Sink: @ds, Sink: @ls) from
checkpoint.
2022-12-07 18:44:56,901 ERROR [jobmanager-future-thread-1]
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
[384] - Failed to reset the coordinator to checkpoint and start.
java.lang.IllegalStateException: Invalid status code 16777216,the valid code
range is [0, 4]
at
com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.fromStatusCode(AssignerStatus.java:164)
~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
at
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:258)
~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
at
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:322)
~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
at
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializePendingSplitsState(PendingSplitsStateSerializer.java:143)
~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
at
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:112)
~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
at
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:50)
~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:414)
~[flink-runtime_2.11-1.13.5.jar:?]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:309)
~[flink-runtime_2.11-1.13.5.jar:?]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:377)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:136)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_312]
at
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
~[?:1.8.0_312]
at
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
~[?:1.8.0_312]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.resetToCheckpoint(RecreateOnResetOperatorCoordinator.java:131)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.resetToCheckpoint(OperatorCoordinatorHolder.java:273)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreStateToCoordinators(CheckpointCoordinator.java:1815)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1577)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1642)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-core-1.13.5.jar:1.13.5]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
[?:1.8.0_312]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_312]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_312]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_312]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
...
2022-12-07 18:45:16,608 INFO [Checkpoint Timer]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [742] - Triggering
checkpoint 4 (type=CHECKPOINT) @ 1670409916573 for job
bce7153aa983463f434472bc58f25a0d.
2022-12-07 19:45:16,608 INFO [Checkpoint Timer]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [1985] - Checkpoint 4
of job bce7153aa983463f434472bc58f25a0d expired before completing.
```
> Can not start FlinkCDC 2.3 with startupOptions = specificOffset
> ---------------------------------------------------------------
>
> Key: FLINK-34759
> URL: https://issues.apache.org/jira/browse/FLINK-34759
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Flink CDC Issue Import
> Priority: Major
> Labels: github-import
>
> I just upgraded the FlinkCDC in my job from 2.1.1 to 2.3.0 to leverage the
> new feature "Scan Newly Added Tables" (the table was not in the tableList in
> the first place). But I ended up with the "Name is null" exception:
> ```
> java.lang.NullPointerException: Name is null
> at java.lang.Enum.valueOf(Enum.java:236) ~[?:1.8.0_312]
> at
> com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind.valueOf(BinlogOffsetKind.java:26)
> ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
> at
> com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.getOffsetKind(BinlogOffset.java:136)
> ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
> at
> com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition(SerializerUtils.java:73)
> ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
> at
> com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition(SerializerUtils.java:59)
> ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
> at
> com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserializeSplit(MySqlSplitSerializer.java:153)
> ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
> at
> com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserialize(MySqlSplitSerializer.java:122)
> ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
> at
> com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserialize(MySqlSplitSerializer.java:46)
> ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
> at
> org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:165)
> ~[flink-core-1.13.5.jar:1.13.5]
> at
> org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
> ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
> at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_312]
> at
> org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:95)
> ~[flink-core-1.13.5.jar:1.13.5]
> at
> org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:251)
> ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
> ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
> ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
> ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
> ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
> ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
> ```
> I could not find solutions to this issue here, so turned to another new
> feature: scan.startup.mode = specific-offset. This time I got exception "No
> TableMapEventData has been found for table id". Issues
> (https://github.com/ververica/flink-cdc-connectors/issues/276,
> https://github.com/ververica/flink-cdc-connectors/issues/411) were closed but
> still reported to exist, as what I observed here. Hoping I could get some
> help here.
> **Environment :**
> - Flink version : 1.13.5
> - Flink CDC version: 2.3.0
> - Database and version: mysql 8.0.25
> **To Reproduce**
> Steps to reproduce the behavior:
> 1. The test data :
> 2. The test code :
> ```
> MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
> .hostname(datasource.getConnect().getHost())
> .port(Integer.parseInt(datasource.getConnect().getPort()))
> .username(datasource.getConnect().getUsername())
> .password(datasource.getConnect().getPassword())
> .databaseList(datasource.getDatabase())
> .tableList(tableRegex)
> .scanNewlyAddedTableEnabled(true)
>
> .startupOptions(StartupOptions.specificOffset("binlog.000017", 499573306))
> .includeSchemaChanges(true)
> .deserializer(new ChangJsonDeserializationSchema(true))
> .serverId(serverId)
> .debeziumProperties(properties);
> ```
> 4. The error :
> ```
> 2022-12-05 16:02:06,396 INFO [blc-mysql-sit.deepq.tech:3306]
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1185] -
> Connected to MySQL binlog at mysql-sit.deepq.tech:3306, starting at
> MySqlOffsetContext
> [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT},
> sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=binlog.000017,
> currentBinlogPosition=499573306, currentRowNumber=0, serverId=0,
> sourceTime=null, threadId=-1, currentQuery=null, tableIds=[],
> databaseName=null], partition={server=mysql_binlog_source},
> snapshotCompleted=false, transactionContext=TransactionContext
> [currentTransactionId=null, perTableEventCount={}, totalEventCount=0],
> restartGtidSet=null, currentGtidSet=null,
> restartBinlogFilename=binlog.000017, restartBinlogPosition=499573306,
> restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0,
> inTransaction=false, transactionId=null, incrementalSnapshotContext
> =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null,
> dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
> 2022-12-05 16:02:06,399 INFO [debezium-reader-0]
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource [917] - Waiting
> for keepalive thread to start
> 2022-12-05 16:02:06,399 INFO [blc-mysql-sit.deepq.tech:3306]
> io.debezium.util.Threads [287] - Creating thread
> debezium-mysqlconnector-mysql_binlog_source-binlog-client
> 2022-12-05 16:02:06,400 INFO [debezium-reader-0]
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource [924] - Keepalive
> thread is running
> 2022-12-05 16:02:06,405 ERROR [blc-mysql-sit.deepq.tech:3306]
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1054] - Error
> during binlog processing. Last offset stored = null, binlog reader near
> position = binlog.000017/499573306
> 2022-12-05 16:02:06,430 ERROR [blc-mysql-sit.deepq.tech:3306]
> io.debezium.pipeline.ErrorHandler [31] - Producer failure
> io.debezium.DebeziumException: Failed to deserialize data of
> EventHeaderV4{timestamp=1670226722000, eventType=EXT_UPDATE_ROWS, serverId=1,
> headerLength=19, dataLength=31, nextPosition=499573356, flags=0}
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1154)
> ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1207)
> [debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:958)
> [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
> [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
> [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
> Caused by:
> com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
> Failed to deserialize data of EventHeaderV4{timestamp=1670226722000,
> eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31,
> nextPosition=499573356, flags=0}
> at
> com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:309)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
> ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> ... 3 more
> Caused by:
> com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException:
> No TableMapEventData has been found for table id:905. Usually that means
> that you have started reading binary log 'within the logical event group'
> (e.g. from WRITE_ROWS and not proceeding TABLE_MAP
> at
> com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:109)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserializeRows(UpdateRowsEventDataDeserializer.java:71)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:58)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:33)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
> ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
> ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
> ... 3 more
> ```
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/apache/flink-cdc/issues/1795
> Created by: [ldwnt|https://github.com/ldwnt]
> Labels: bug,
> Created at: Mon Dec 05 18:40:41 CST 2022
> State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)