Lv Luo Gang created FLINK-35668:
-----------------------------------

             Summary: Throw exception "java.lang.OutOfMemoryError" when import 
data of a MySQL table to StarRocks
                 Key: FLINK-35668
                 URL: https://issues.apache.org/jira/browse/FLINK-35668
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.1.0
         Environment: flink-1.18.1

flink-cdc-3.1.0

MySQL 8.0.33

StarRocks-3.2.7
            Reporter: Lv Luo Gang


I have 40 mysql insert sql files of a big table which total record number is 
about 100 million, each file size is 100MB. I recover these files into a mysql 
table named "standby_atomic_action" use mysql cli program in a loop, at the 
same time, I started a Flink CDC pipeline with scan.startup.mode "initial" to 
copy the MySQL table data to a StarRocks table, when the Flink task executes 
the last sql split to get snapshot data, it returns more than 1 million 
records, then the Flink taskexecutor throw an exception 
"java.lang.OutOfMemoryError" and has been terminated.


I have checked the flink cdc source code of method 
org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils#buildSplitScanQuery
 in module flink-connector-mysql-cdc, it calls method buildSplitQuery use 
parameter limitSize -1, so when isLastSplit is true, the split sql is "select * 
from standby_atomic_action where id>=?" without limit cause, at same time, 
mysql cli have just committed more than 1 million records which far greater 
than scan.incremental.snapshot.chunk.size default value 8096.

 
*Exception:*
2024-06-21 05:52:35,440 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Snapshot step 1 - Determining low watermark {ts_sec=0, file=binlog.000045, 
pos=488153283, kind=SPECIFIC, 
gtids=334bec6e-849e-11eb-9157-b8599f49e7f4:1-936486941,
778ec805-a69b-11eb-9250-506b4b02d56e:1-900365321,
d6a416c1-a2de-11e9-adcf-506b4b233658:1-166810501,
d6bb9ebd-a2de-11e9-936f-506b4bfd5c94:1-80755324, row=0, event=0} for split 
MySqlSnapshotSplit\{tableId=qwgas.standby_atomic_action, 
splitId='qwgas.standby_atomic_action:10569', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[92277940], splitEnd=null, highWatermark=null}
2024-06-21 05:52:35,440 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Snapshot step 2 - Snapshotting data
2024-06-21 05:52:35,440 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exporting data from split 'qwgas.standby_atomic_action:10569' of table 
qwgas.standby_atomic_action
2024-06-21 05:52:35,440 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - For split 'qwgas.standby_atomic_action:10569' of table 
qwgas.standby_atomic_action using select statement: '{color:#FF0000}SELECT * 
FROM `qwgas`.`standby_atomic_action` WHERE `id` >= ?{color}'
...
...
...
2024-06-21 05:54:59,970 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - {color:#FF0000}Exported 1877835 records for split 
'qwgas.standby_atomic_action:10569'{color} after 00:02:24.53
2024-06-21 05:55:06,410 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exported 5149 records for split 'qwgas.standby_atomic_action_counter:148' 
after 00:00:33.897
2024-06-21 05:55:07,484 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exported 6646 records for split 'qwgas.standby_atomic_action_counter:147' 
after 00:00:34.971
2024-06-21 05:55:10,714 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exported 4214 records for split 'qwgas.standby_atomic_action_counter:149' 
after 00:00:37.106
2024-06-21 05:55:11,784 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - {color:#FF0000}Exported 1878137 records for split 
'qwgas.standby_atomic_action:10569'{color} after 00:02:36.344
2024-06-21 05:55:17,187 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exported 5826 records for split 'qwgas.standby_atomic_action_counter:148' 
after 00:00:44.674
2024-06-21 05:55:18,274 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exported 7150 records for split 'qwgas.standby_atomic_action_counter:147' 
after 00:00:45.761
2024-06-21 05:55:21,522 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exported 4312 records for split 'qwgas.standby_atomic_action_counter:149' 
after 00:00:47.914
2024-06-21 05:56:15,500 ERROR 
com.starrocks.data.load.stream.v2.StreamLoadManagerV2        [] - 
StarRocks-Sink-Manager error
java.lang.OutOfMemoryError: Java heap space
2024-06-21 05:56:15,500 ERROR 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Received uncaught exception.
java.lang.OutOfMemoryError: Java heap space
2024-06-21 05:56:15,500 WARN  
org.jboss.netty.channel.socket.nio.AbstractNioSelector       [] - Unexpected 
exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
2024-06-21 05:56:15,500 WARN  
org.apache.flink.runtime.accumulators.AccumulatorRegistry    [] - Failed to 
serialize accumulators for task.
java.lang.OutOfMemoryError: Java heap space
2024-06-21 05:56:15,501 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exported 6112 records for split 'qwgas.standby_atomic_action_counter:148' 
after 00:01:42.988
2024-06-21 05:56:15,502 ERROR org.apache.flink.util.FatalExitExceptionHandler   
           [] - FATAL: Thread 'flink-pekko.remote.default-remote-dispatcher-16' 
produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
2024-06-21 05:56:15,502 INFO  
org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask 
[] - Exported 7459 records for split 'qwgas.standby_atomic_action_counter:147' 
after 00:01:42.989
2024-06-21 05:56:15,503 ERROR org.apache.flink.util.FatalExitExceptionHandler   
           [] - FATAL: Thread 'flink-pekko.remote.default-remote-dispatcher-14' 
produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
2024-06-21 05:56:15,776 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - The heartbeat 
of JobManager with id f357f3610bf2f36204f11d9e81d50b09 timed out.
2024-06-21 05:56:15,777 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
JobManager connection for job f7d731948175e4efbc9ee57824148e6b.
2024-06-21 05:56:15,777 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Attempting to fail task externally Source: Flink CDC Event 
Source: mysql -> SchemaOperator -> PrePartition (3/4)#4 
(77a0a5b99d79f9f19621258a8015daee_cbc357ccb763df2852fee8c4fc7d55f2_2_4).
2024-06-21 05:56:15,777 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: Flink CDC Event Source: mysql -> SchemaOperator -> 
PrePartition (3/4)#4 
(77a0a5b99d79f9f19621258a8015daee_cbc357ccb763df2852fee8c4fc7d55f2_2_4) 
switched from RUNNING to FAILED with failure cause:
org.apache.flink.util.FlinkException: Disconnect from JobManager responsible 
for f7d731948175e4efbc9ee57824148e6b.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1764)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectAndTryReconnectToJobManager(TaskExecutor.java:1305)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$4300(TaskExecutor.java:188)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$handleJobManagerConnectionLoss$0(TaskExecutor.java:2549)
 ~[flink-dist-1.18.1.jar:1.18.1]
at java.util.Optional.ifPresent(Optional.java:183) ~[?:?]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.handleJobManagerConnectionLoss(TaskExecutor.java:2547)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.notifyHeartbeatTimeout(TaskExecutor.java:2530)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java:158)
 ~[flink-dist-1.18.1.jar:1.18.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
 ~[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
 ~[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
 ~[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
 ~[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
[flink-rpc-akkaea8c4733-f06e-4574-b875-9fc2cf5fb378.jar:1.18.1]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
at 
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
 [?:?]
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) 
[?:?]
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager 
with id f357f3610bf2f36204f11d9e81d50b09 timed out.
... 30 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to