Flink CDC Issue Import created FLINK-34800:
----------------------------------------------

             Summary: [Bug] Mysql Startup with TIMESTAMP,Restore From 
Checkpoint Failed:One or more fetchers have encountered exception
                 Key: FLINK-34800
                 URL: https://issues.apache.org/jira/browse/FLINK-34800
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
nothing similar.


### Flink version

Flink version 1.16.1

### Flink CDC version

Flink CDC version 2.4.0

### Database and its version

Mysql: 8.0.31

### Minimal reproduce step

// 1. Create Source Table
CREATE TABLE user_10 (
id INT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED 
) WITH( 
'connector' = 'mysql-cdc', 
'hostname' = '192.168.2.29', 
'port' = '3306', 
'username' = 'root', 
'password' = '123456', 
'database-name' = 'flink_test', 
'table-name' = 'user_10', 
'scan.startup.mode' = 'timestamp', 
'scan.incremental.snapshot.enabled' = 'true' ,
'scan.startup.timestamp-millis' = '1689736158444'
)

// 2.Create Sink Table
CREATE TABLE user_10_sink_1 (
id INT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED 
) WITH( 
'connector' = 'jdbc', 
'url' = 
'jdbc:mysql://192.168.2.29:3306/flink_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai',
 
'username' = 'root', 
'password' = '123456', 
'driver' = 'com.mysql.cj.jdbc.Driver', 
'table-name' = 'user_10_sink_1' 
)

//Execute Flink SQL
INSERT INTO user_10_sink_1 SELECT id AS id,name AS name,age AS age FROM user_10

After startup, terminate the task after generating a checkpoint. Modify some 
data in the source table. Then, restore the task from the checkpoint


### What did you expect to see?

Successfully restored from checkpoint

### What did you see instead?

2023-07-19 11:50:37
java.lang.RuntimeException: One or more fetchers have encountered exception
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
        at 
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
        at 
com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:85)
        at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1545)
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
        ... 1 more
Caused by: io.debezium.DebeziumException: bogus data in log event; the first 
event '' at 45092, the last event read from './binlog.000006' at 23504414, the 
last byte read from './binlog.000006' at 23504433. Error code: 1236; SQLSTATE: 
HY000.
        at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1489)
        ... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: bogus data 
in log event; the first event '' at 45092, the last event read from 
'./binlog.000006' at 23504414, the last byte read from './binlog.000006' at 
23504433.
        at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1043)
        ... 3 more


### Anything else?

# Operating environment
1.  Checkpoint Configuration:
env.enableCheckpointing(300000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2147483647, 3000L)];

2.  MySQL does not have GTID enabled

3. Mysql Binlog list:
Log_name File_size Encrypted
binlog.000006   1073756181      No
binlog.000007   1073759694      No
binlog.000008   1160855925      No
binlog.000009   1073763792      No
binlog.000010   731076314       No
binlog.000011   47220   No
binlog.000012   180     No
binlog.000013   157     No

4. Mysql Configuration: 
max_allowed_packet      1073741824
mysqlx_max_allowed_packet       1073741824
replica_max_allowed_packet      1073741824
slave_max_allowed_packet        1073741824

### Are you willing to submit a PR?

- [ ] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2319
Created by: [GaoYaokun|https://github.com/GaoYaokun]
Labels: bug, 
Created at: Wed Jul 19 14:27:05 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to