Hi everybody,
I am trying to replicate a MySQL database into StarRocks using a Flink CDC
pipeline. The Flink job loads correctly, starts the incremental snapshot
and transfers multiple GB (sometimes even multiple hundred GBs) into the
sink DB. Then, after an arbitrary duration of time (it ranges from ~10m to
about ~12h, depending on the run), the job fails with a 'Communications
link failure' - see error message below. Since the job is still in the
snapshot phase, it just perpetually restarts it. Unfortunately, I have been
unable to debug this issue for over a week now and I am getting a bit
desperate... Therefore, any help or pointers where to look further would be
greatly appreciated!
Relevant observations:
- This job used to run fine in a past setup, where the MySQL DB, Flink
and StarRocks were all containers on the same server. The issue appeared
after migrating the MySQL DB onto a separate server.
- The CPU, RAM and NICs on both machines seem to be far away from their
max capacity when the job fails
- Flink/ StarRocks server:
- Flink CDC 3.4.0 and Flink 1.20.2
- StarRocks 3.5.5
- Everything in Podman containers on Debain 13
- MySQL Connector/J 8.0.33 (the latest 8.0 jar that I could find)
- I can't see anything in journalctl or dmesg that would hint at a
root cause
- MySQL DB server:
- MySQL 8.0.43 in a Podman container on a Debian 13 VM that runs on
Proxmox
- I can't see any irregularities in MySQL's error_log, general_log or
slow_query_log
- I can't see anything in journalctl or dmesg that would hint at a
root cause
- I don't exceed max_connections
- Networking:
- The problem persists across three completely separate network
connections with separate NICs, two of the connections are even P2P (the
servers are physically next to each other)
- Running tcpdump on the Flink server does not show any dropped
packages
- Configuration:
- Please see below my current configuration for Flink, the Flink job
and MySQL
- I have tried to change any parameter in Flink, JDBC, Debezium and
MySQL that I thought could be relevant in terms of networking or
timeouts -
but still no luck
Root Exception:
2025-09-23 15:21:11,099 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
Source: Flink CDC Event Source: mysql -> SchemaOperator ->
PrePartition (1/1)
(c2b1d7b6ad8a31b175c49d96cc5e6b36_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from RUNNING to FAILED on 10.89.2.141:38211-997a8a @
flinktaskmanager1 (dataPort=35119).
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
~[flink-dist-1.20.2.jar:1.20.2]
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
~[flink-dist-1.20.2.jar:1.20.2]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.20.2.jar:1.20.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
~[flink-dist-1.20.2.jar:1.20.2]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.20.2.jar:1.20.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
~[flink-dist-1.20.2.jar:1.20.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
~[flink-dist-1.20.2.jar:1.20.2]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
~[flink-dist-1.20.2.jar:1.20.2]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist-1.20.2.jar:1.20.2]
at java.base/java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 110
received unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
~[flink-connector-files-1.20.2.jar:1.20.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) ~[?:?]
... 1 more
Caused by: io.debezium.DebeziumException: Error reading MySQL
variables: Communications link failure
The last packet sent successfully to the server was 0 milliseconds
ago. The driver has not received any packets from the server.
at
io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:162)
~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140)
~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
~[?:?]
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
~[flink-connector-files-1.20.2.jar:1.20.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) ~[?:?]
... 1 more
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException:
Communications link failure
The last packet sent successfully to the server was 0 milliseconds
ago. The driver has not received any packets from the server.
at
com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:175)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:825)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:446)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:239)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:188)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244)
~[?:?]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888)
~[?:?]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883)
~[?:?]
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:548) ~[?:?]
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496) ~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:146)
~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140)
~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
~[?:?]
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
~[flink-connector-files-1.20.2.jar:1.20.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) ~[?:?]
... 1 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException:
Communications link failure
The last packet sent successfully to the server was 0 milliseconds
ago. The driver has not received any packets from the server.
at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) ~[?:?]
at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
Source) ~[?:?]
at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
Source) ~[?:?]
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
~[?:?]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:62)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:150)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:166)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:89)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.NativeSession.connect(NativeSession.java:121)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:945)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:815)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:446)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:239)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:188)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244)
~[?:?]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888)
~[?:?]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883)
~[?:?]
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:548) ~[?:?]
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496) ~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:146)
~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140)
~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
~[?:?]
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
~[flink-connector-files-1.20.2.jar:1.20.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) ~[?:?]
... 1 more
Caused by: java.net.ConnectException: Connection timed out (Connection
timed out)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
~[?:?]
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
~[?:?]
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source) ~[?:?]
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
~[?:?]
at java.base/java.net.SocksSocketImpl.connect(Unknown Source) ~[?:?]
at java.base/java.net.Socket.connect(Unknown Source) ~[?:?]
at
com.mysql.cj.protocol.StandardSocketFactory.connect(StandardSocketFactory.java:153)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:63)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.NativeSession.connect(NativeSession.java:121)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:945)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:815)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:446)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:239)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:188)
~[mysql-connector-j-8.0.33.jar:8.0.33]
at
io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:244)
~[?:?]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888)
~[?:?]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883)
~[?:?]
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:548) ~[?:?]
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496) ~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:146)
~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.readMySqlSystemVariables(MySqlConnection.java:140)
~[?:?]
at
io.debezium.connector.mysql.MySqlConnection.isTableIdCaseSensitive(MySqlConnection.java:502)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:134)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:77)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118)
~[?:?]
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
~[?:?]
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
~[flink-connector-files-1.20.2.jar:1.20.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
~[flink-connector-files-1.20.2.jar:1.20.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) ~[?:?]
... 1 more
Flink config.yaml
# Flink CDC configuration
# /opt/flink/conf/config.yaml
blob.server.port: 6124
taskmanager.bind-host: '0.0.0.0'
taskmanager.memory.process.size: 200g
taskmanager.memory.managed.size: 1g
taskmanager.memory.network.max: 1g
taskmanager.numberOfTaskSlots: 1
taskmanager.network.partition-request-timeout: 3600s
taskmanager.network.request-backoff.initial: 100
taskmanager.network.request-backoff.max: 1800000
taskmanager.network.retries: 20
jobmanager.bind-host: '0.0.0.0'
jobmanager.execution.failover-strategy: region
jobmanager.memory.process.size: 100g
jobmanager.memory.off-heap.size: 1g
jobmanager.memory.enable-jvm-direct-memory-limit: true
jobmanager.rpc.address: '10.89.0.40'
jobmanager.rpc.port: 6123
rest.address: '10.89.0.40'
rest.bind-address: '0.0.0.0'
rest.port: 8081
parallelism.default: 1
state.backend.type: 'hashmap'
execution.checkpointing.incremental: true
execution.checkpointing.interval: 5m
execution.checkpointing.min-pause: 5m
execution.checkpointing.timeout: 1h
execution.checkpointing.storage: 'filesystem'
execution.checkpointing.dir:
'file:///opt/flink/checkpoints/'
execution.checkpointing.savepoint-dir:
'file:///opt/flink/savepoints/'
env.java.opts.all:
--add-exports=java.base/sun.net.util=ALL-UNNAMED
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.text=ALL-UNNAMED
--add-opens=java.base/java.time=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
pekko.ask.timeout: 10m
heartbeat.timeout: 10m
Flink CDC pipeline job
################################################################################
# Description: Sync all MySQL tables to StarRocks
################################################################################
pipeline:
name: Sync all MySQL tables to StarRocks
parallelism: 1
source:
type: mysql
hostname: 192.168.10.32
port: 3306
username: flink_mysql
password: "$FLINK_MYSQL_PASSWORD"
tables: db.\.*
tables.exclude: db.(tmptable_[a-zA-Z0-9_]+)
server-id: '5500-5599'
scan.incremental.snapshot.chunk.size: 131072 # 8192
scan.binlog.newly-added-table.enabled: 'true'
scan.incremental.close-idle-reader.enabled: 'true'
connect.timeout: 7200s
connect.max-retries: 10
server-time-zone: 'UTC'
jdbc.properties.allowPublicKeyRetrieval: 'true'
jdbc.properties.connectTimeout: 7200000
jdbc.properties.socketTimeout: 7200000
jdbc.properties.max_execution_time: 7200000
debezium.poll.interval.ms: 10000
debezium.connect.timeout.ms: 7200000
debezium.snapshot.lock.timeout.ms: 7200000
debezium.signal.kafka.poll.timeout.ms: 10000
debezium.connect.keep.alive: 'true'
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://10.89.0.50:9030
load-url:
http://10.89.0.50:8030 <http://10.89.2.150:8030/>
username: flink
password: "$FLINK_STARROCKS_PASSWORD"
table.create.properties.replication_num: 1
table.create.properties.fast_schema_evolution: 'true'
sink.properties.timeout: 1200
my.cnf
[mysqld]
log_bin = ON
join_buffer_size=128M
sort_buffer_size=2M
read_rnd_buffer_size=2M
skip-name-resolve
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
secure-file-priv=/var/lib/mysql-files
pid-file=/var/run/mysqld/mysqld.pid
tmpdir=/tmp
bind-address = 0.0.0.0
# Set up logging
general_log=ON
general_log_file=/var/lib/mysql/log_general
slow_query_log=ON
slow_query_log_file=/var/lib/mysql/log_slow_query
default_storage_engine=InnoDB
character_set_server=utf8mb4
collation_server=utf8mb4_bin
lower_case_table_names=1
innodb_file_per_table=ON
max_connections=100
innodb_buffer_pool_size=64g
innodb_redo_log_capacity=24G
innodb_flush_log_at_trx_commit=1
innodb_flush_method=O_DIRECT
innodb_log_buffer_size=1G
innodb_io_capacity=10000
innodb_io_capacity_max=20000
innodb_ddl_buffer_size=4G
innodb_ddl_threads=10
innodb_parallel_read_threads=10
temptable_max_ram=4G
# Server System Variables
connect_timeout=3600
interactive_timeout=28800
max_execution_time=7200000
net_read_timeout=3600
wait_timeout=28800
# InnoDB Startup Options
innodb_lock_wait_timeout=3600
mysqlx=OFF
Thank you for your help!
Peter