Flink CDC Issue Import created FLINK-34808:
----------------------------------------------

             Summary: [Bug]  Mysql cdc connector  connection pool
                 Key: FLINK-34808
                 URL: https://issues.apache.org/jira/browse/FLINK-34808
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
nothing similar.


### Flink version

1.16.0

### Flink CDC version

2.3.0

### Database and its version

mysql 8.0.32

### Minimal reproduce step

using flink-sql-connector-mysql-cdc:2.3.0,
Synchronizing a substantial number of MySQL tables, around 100 in quantity, 
within a single job.

### What did you expect to see?

No errors were reported.






### What did you see instead?


org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.util.FlinkRuntimeException: 
java.sql.SQLTransientConnectionException: 
connection-pool-mysqltest-primary.mysql-bitnami:3306 - Connection is not 
available, request timed out after 30002ms.
at 
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:64]
 ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at 
com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:72)
 ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at 
com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170)
 ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:222)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:198)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:605)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1046)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:963)
 ~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:422) 
~[flink-runtime-1.16.0.jar:1.16-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198)
 ~[flink-dist-1.16.0.jar:1.16.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622)
 ~[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:621)
 ~[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:190)
 ~[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.actor.Actor.aroundReceive(Actor.scala:537) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_7e7f5d74-19da-43b1-93a0-b22723578747.jar:1.16.0]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) 
[?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: 
java.sql.SQLTransientConnectionException: 
connection-pool-mysqltest-primary.mysql-bitnami:3306 - Connection is not 
available, request timed out after 30002ms.
at 
com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:72)
 ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890) 
~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885) 
~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:418) 
~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at 
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:61)
 ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
... 37 more

### Anything else?

When using flink-sql-connector-mysql-cdc:2.3.0, I encountered an error upon 
task startup. It appears to be related to the unavailability of connections in 
the MySQL connection pool.
Adding 'connection.pool.size' ='200' resolves the issue mentioned above, and 
the task can execute normally.

However, in version 2.2.0, I didn't encounter this error. Without adding the 
aforementioned parameter, there are no issues, and the task can run normally.

### Are you willing to submit a PR?

- [ ] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2441
Created by: [nathan-szz|https://github.com/nathan-szz]
Labels: bug, 
Created at: Tue Aug 29 18:26:27 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to