[ 
https://issues.apache.org/jira/browse/FLINK-34759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828681#comment-17828681
 ] 

Flink CDC Issue Import commented on FLINK-34759:
------------------------------------------------

Date: Wed Dec 07 21:46:51 CST 2022, Author: [ldwnt|https://github.com/ldwnt]

> Thanks for reporting. It is duplicate to 
> ([#1757|https://github.com/apache/flink-cdc/issues/1757] | 
> [FLINK-1757|https://issues.apache.org/jira/browse/FLINK-1757]). It has been 
> fixed in ([#1758|https://github.com/apache/flink-cdc/issues/1758] | 
> [FLINK-1758|https://issues.apache.org/jira/browse/FLINK-1758]) in the branch 
> master.

I built the cdc jar with source code @ commit 
https://github.com/ververica/flink-cdc-connectors/commit/c1a049ed1bb38d0ab336681b8ac34e4ef34a6fc2,
 and repeated the scenario as below:
1) start a job A with cdc 2.1.1 (snapshot finished and binlog read)
2) stop job A, update some records in the source mysql
3) replace the jars and restart job A with cdc 2.3

The updated records were correctly read in job A, but the checkpoint can not be 
completed:
```
2022-12-07 18:44:56,860 INFO  [jobmanager-future-thread-1] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [303] - Restoring 
SplitEnumerator of source Source: @s -> @p -> (Sink: @ds, Sink: @ls) from 
checkpoint.
2022-12-07 18:44:56,901 ERROR [jobmanager-future-thread-1] 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
 [384] - Failed to reset the coordinator to checkpoint and start.
java.lang.IllegalStateException: Invalid status code 16777216,the valid code 
range is [0, 4]
        at 
com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.fromStatusCode(AssignerStatus.java:164)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
        at 
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:258)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
        at 
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:322)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
        at 
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializePendingSplitsState(PendingSplitsStateSerializer.java:143)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
        at 
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:112)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
        at 
com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:50)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:414)
 ~[flink-runtime_2.11-1.13.5.jar:?]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:309)
 ~[flink-runtime_2.11-1.13.5.jar:?]
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:377)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:136)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) 
~[?:1.8.0_312]
        at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731) 
~[?:1.8.0_312]
        at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) 
~[?:1.8.0_312]
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.resetToCheckpoint(RecreateOnResetOperatorCoordinator.java:131)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.resetToCheckpoint(OperatorCoordinatorHolder.java:273)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreStateToCoordinators(CheckpointCoordinator.java:1815)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1577)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1642)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) 
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
 ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
 ~[flink-core-1.13.5.jar:1.13.5]
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 [?:1.8.0_312]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_312]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_312]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_312]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_312]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_312]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_312]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
...
2022-12-07 18:45:16,608 INFO  [Checkpoint Timer] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [742] - Triggering 
checkpoint 4 (type=CHECKPOINT) @ 1670409916573 for job 
bce7153aa983463f434472bc58f25a0d.
2022-12-07 19:45:16,608 INFO  [Checkpoint Timer] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [1985] - Checkpoint 4 
of job bce7153aa983463f434472bc58f25a0d expired before completing.
```

> Can not start FlinkCDC 2.3 with startupOptions = specificOffset
> ---------------------------------------------------------------
>
>                 Key: FLINK-34759
>                 URL: https://issues.apache.org/jira/browse/FLINK-34759
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Flink CDC Issue Import
>            Priority: Major
>              Labels: github-import
>
> I just upgraded the FlinkCDC in my job from 2.1.1 to 2.3.0 to leverage the 
> new feature "Scan Newly Added Tables" (the table was not in the tableList in 
> the first place). But I ended up with the "Name is null" exception:
> ```
> java.lang.NullPointerException: Name is null
>       at java.lang.Enum.valueOf(Enum.java:236) ~[?:1.8.0_312]
>       at 
> com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind.valueOf(BinlogOffsetKind.java:26)
>  ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
>       at 
> com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.getOffsetKind(BinlogOffset.java:136)
>  ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
>       at 
> com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition(SerializerUtils.java:73)
>  ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
>       at 
> com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition(SerializerUtils.java:59)
>  ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
>       at 
> com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserializeSplit(MySqlSplitSerializer.java:153)
>  ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
>       at 
> com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserialize(MySqlSplitSerializer.java:122)
>  ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
>       at 
> com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserialize(MySqlSplitSerializer.java:46)
>  ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
>       at 
> org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:165)
>  ~[flink-core-1.13.5.jar:1.13.5]
>       at 
> org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
>  ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
>       at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_312]
>       at 
> org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:95) 
> ~[flink-core-1.13.5.jar:1.13.5]
>       at 
> org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:251)
>  ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
>  ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
>  ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
>  ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>  ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
>  ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
> ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
>       at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
> ```
> I could not find solutions to this issue here, so turned to another new 
> feature: scan.startup.mode = specific-offset. This time I got exception "No 
> TableMapEventData has been found for table id". Issues 
> (https://github.com/ververica/flink-cdc-connectors/issues/276, 
> https://github.com/ververica/flink-cdc-connectors/issues/411) were closed but 
> still reported to exist, as what I observed here. Hoping I could get some 
> help here.
> **Environment :**
>  - Flink version :  1.13.5
>  - Flink CDC version: 2.3.0
>  - Database and version: mysql 8.0.25
> **To Reproduce**
> Steps to reproduce the behavior:
> 1. The test data :
> 2. The test code :
> ```
>             MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
>                 .hostname(datasource.getConnect().getHost())
>                 .port(Integer.parseInt(datasource.getConnect().getPort()))
>                 .username(datasource.getConnect().getUsername())
>                 .password(datasource.getConnect().getPassword())
>                 .databaseList(datasource.getDatabase())
>                 .tableList(tableRegex)
>                 .scanNewlyAddedTableEnabled(true)
>                 
> .startupOptions(StartupOptions.specificOffset("binlog.000017", 499573306))
>                 .includeSchemaChanges(true)
>                 .deserializer(new ChangJsonDeserializationSchema(true))
>                 .serverId(serverId)
>                 .debeziumProperties(properties);
> ```
> 4. The error :
> ```
> 2022-12-05 16:02:06,396 INFO  [blc-mysql-sit.deepq.tech:3306] 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1185] - 
> Connected to MySQL binlog at mysql-sit.deepq.tech:3306, starting at 
> MySqlOffsetContext 
> [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, 
> sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=binlog.000017, 
> currentBinlogPosition=499573306, currentRowNumber=0, serverId=0, 
> sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], 
> databaseName=null], partition={server=mysql_binlog_source}, 
> snapshotCompleted=false, transactionContext=TransactionContext 
> [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], 
> restartGtidSet=null, currentGtidSet=null, 
> restartBinlogFilename=binlog.000017, restartBinlogPosition=499573306, 
> restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, 
> inTransaction=false, transactionId=null, incrementalSnapshotContext 
> =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, 
> dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
> 2022-12-05 16:02:06,399 INFO  [debezium-reader-0] 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource [917] - Waiting 
> for keepalive thread to start
> 2022-12-05 16:02:06,399 INFO  [blc-mysql-sit.deepq.tech:3306] 
> io.debezium.util.Threads [287] - Creating thread 
> debezium-mysqlconnector-mysql_binlog_source-binlog-client
> 2022-12-05 16:02:06,400 INFO  [debezium-reader-0] 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource [924] - Keepalive 
> thread is running
> 2022-12-05 16:02:06,405 ERROR [blc-mysql-sit.deepq.tech:3306] 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1054] - Error 
> during binlog processing. Last offset stored = null, binlog reader near 
> position = binlog.000017/499573306
> 2022-12-05 16:02:06,430 ERROR [blc-mysql-sit.deepq.tech:3306] 
> io.debezium.pipeline.ErrorHandler [31] - Producer failure
> io.debezium.DebeziumException: Failed to deserialize data of 
> EventHeaderV4{timestamp=1670226722000, eventType=EXT_UPDATE_ROWS, serverId=1, 
> headerLength=19, dataLength=31, nextPosition=499573356, flags=0}
>       at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1154)
>  ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
>       at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1207)
>  [debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
>       at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:958)
>  [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
>  [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
>  [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
> Caused by: 
> com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
>  Failed to deserialize data of EventHeaderV4{timestamp=1670226722000, 
> eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, 
> nextPosition=499573356, flags=0}
>       at 
> com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:309)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
>  ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
>       at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       ... 3 more
> Caused by: 
> com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException:
>  No TableMapEventData has been found for table id:905. Usually that means 
> that you have started reading binary log 'within the logical event group' 
> (e.g. from WRITE_ROWS and not proceeding TABLE_MAP
>       at 
> com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:109)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserializeRows(UpdateRowsEventDataDeserializer.java:71)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:58)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:33)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
>  ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
>       at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
>  ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
>       ... 3 more
> ```
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/apache/flink-cdc/issues/1795
> Created by: [ldwnt|https://github.com/ldwnt]
> Labels: bug, 
> Created at: Mon Dec 05 18:40:41 CST 2022
> State: open



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to