Flink CDC Issue Import created FLINK-34759:
----------------------------------------------

             Summary: Can not start FlinkCDC 2.3 with startupOptions = 
specificOffset
                 Key: FLINK-34759
                 URL: https://issues.apache.org/jira/browse/FLINK-34759
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


I just upgraded the FlinkCDC in my job from 2.1.1 to 2.3.0 to leverage the new 
feature "Scan Newly Added Tables" (the table was not in the tableList in the 
first place). But I ended up with the "Name is null" exception:
```
java.lang.NullPointerException: Name is null
        at java.lang.Enum.valueOf(Enum.java:236) ~[?:1.8.0_312]
        at 
com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind.valueOf(BinlogOffsetKind.java:26)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
        at 
com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.getOffsetKind(BinlogOffset.java:136)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
        at 
com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition(SerializerUtils.java:73)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
        at 
com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition(SerializerUtils.java:59)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
        at 
com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserializeSplit(MySqlSplitSerializer.java:153)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
        at 
com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserialize(MySqlSplitSerializer.java:122)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
        at 
com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserialize(MySqlSplitSerializer.java:46)
 ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
        at 
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:165)
 ~[flink-core-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
 ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
        at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_312]
        at 
org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:95) 
~[flink-core-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:251)
 ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
 ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
 ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
 ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
 ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
 ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-runtime_2.11-1.13.5.jar:1.13.5]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
```
I could not find solutions to this issue here, so turned to another new 
feature: scan.startup.mode = specific-offset. This time I got exception "No 
TableMapEventData has been found for table id". Issues 
(https://github.com/ververica/flink-cdc-connectors/issues/276, 
https://github.com/ververica/flink-cdc-connectors/issues/411) were closed but 
still reported to exist, as what I observed here. Hoping I could get some help 
here.

**Environment :**
 - Flink version :  1.13.5
 - Flink CDC version: 2.3.0
 - Database and version: mysql 8.0.25

**To Reproduce**
Steps to reproduce the behavior:
1. The test data :
2. The test code :
```
            MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
                .hostname(datasource.getConnect().getHost())
                .port(Integer.parseInt(datasource.getConnect().getPort()))
                .username(datasource.getConnect().getUsername())
                .password(datasource.getConnect().getPassword())
                .databaseList(datasource.getDatabase())
                .tableList(tableRegex)
                .scanNewlyAddedTableEnabled(true)
                .startupOptions(StartupOptions.specificOffset("binlog.000017", 
499573306))
                .includeSchemaChanges(true)
                .deserializer(new ChangJsonDeserializationSchema(true))
                .serverId(serverId)
                .debeziumProperties(properties);
```
4. The error :
```
2022-12-05 16:02:06,396 INFO  [blc-mysql-sit.deepq.tech:3306] 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1185] - Connected 
to MySQL binlog at mysql-sit.deepq.tech:3306, starting at MySqlOffsetContext 
[sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, 
sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=binlog.000017, 
currentBinlogPosition=499573306, currentRowNumber=0, serverId=0, 
sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], 
databaseName=null], partition={server=mysql_binlog_source}, 
snapshotCompleted=false, transactionContext=TransactionContext 
[currentTransactionId=null, perTableEventCount={}, totalEventCount=0], 
restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=binlog.000017, 
restartBinlogPosition=499573306, restartRowsToSkip=0, restartEventsToSkip=0, 
currentEventLengthInBytes=0, inTransaction=false, transactionId=null, 
incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, 
chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, 
maximumKey=null]]
2022-12-05 16:02:06,399 INFO  [debezium-reader-0] 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource [917] - Waiting for 
keepalive thread to start
2022-12-05 16:02:06,399 INFO  [blc-mysql-sit.deepq.tech:3306] 
io.debezium.util.Threads [287] - Creating thread 
debezium-mysqlconnector-mysql_binlog_source-binlog-client
2022-12-05 16:02:06,400 INFO  [debezium-reader-0] 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource [924] - Keepalive 
thread is running
2022-12-05 16:02:06,405 ERROR [blc-mysql-sit.deepq.tech:3306] 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1054] - Error 
during binlog processing. Last offset stored = null, binlog reader near 
position = binlog.000017/499573306
2022-12-05 16:02:06,430 ERROR [blc-mysql-sit.deepq.tech:3306] 
io.debezium.pipeline.ErrorHandler [31] - Producer failure
io.debezium.DebeziumException: Failed to deserialize data of 
EventHeaderV4{timestamp=1670226722000, eventType=EXT_UPDATE_ROWS, serverId=1, 
headerLength=19, dataLength=31, nextPosition=499573356, flags=0}
        at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1154)
 ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
        at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1207)
 [debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:958)
 [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
 [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) 
[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Caused by: 
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
 Failed to deserialize data of EventHeaderV4{timestamp=1670226722000, 
eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, 
nextPosition=499573356, flags=0}
        at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:309)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
 ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        ... 3 more
Caused by: 
com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException:
 No TableMapEventData has been found for table id:905. Usually that means that 
you have started reading binary log 'within the logical event group' (e.g. from 
WRITE_ROWS and not proceeding TABLE_MAP
        at 
com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:109)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserializeRows(UpdateRowsEventDataDeserializer.java:71)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:58)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:33)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
 ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
 ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
        ... 3 more
```

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/1795
Created by: [ldwnt|https://github.com/ldwnt]
Labels: bug, 
Created at: Mon Dec 05 18:40:41 CST 2022
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to