[
https://issues.apache.org/jira/browse/FLINK-39143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thorne updated FLINK-39143:
---------------------------
Summary: It is better to use a connection pool for fluss's connections
(was: fluss‘s connection use connection pool be better)
> It is better to use a connection pool for fluss's connections
> -------------------------------------------------------------
>
> Key: FLINK-39143
> URL: https://issues.apache.org/jira/browse/FLINK-39143
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: Thorne
> Priority: Major
>
> As shown in the code, this part will create a new connection during the
> process of handling the ddl of the relevant flux connector. If there are a
> large number of tables that need to be initialized, this is not a friendly
> way, but to use connection pooling for replacement.
> {code:java}
> //代码占位符
> private void applyCreateTable(CreateTableEvent event) {
> try (Connection connection =
> ConnectionFactory.createConnection(flussClientConfig);
> Admin admin = connection.getAdmin()) {
> TableId tableId = event.tableId();
> TablePath tablePath = new TablePath(tableId.getSchemaName(),
> tableId.getTableName());
> String tableIdentifier = tablePath.getDatabaseName() + "." +
> tablePath.getTableName();
> List<String> bucketKeys = bucketKeysMap.get(tableIdentifier);
> Integer bucketNum = bucketNumMap.get(tableIdentifier);
> TableDescriptor inferredFlussTable =
> toFlussTable(event.getSchema(), bucketKeys, bucketNum,
> tableProperties);
> admin.createDatabase(tablePath.getDatabaseName(),
> DatabaseDescriptor.EMPTY, true);
> if (!admin.tableExists(tablePath).get()) {
> admin.createTable(tablePath, inferredFlussTable, false).get();
> } else {
> TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
> // sanity check to prevent unexpected table schema evolution.
> sanityCheck(inferredFlussTable, currentTableInfo);
> }
> } catch (Exception e) {
> LOG.error("Failed to apply schema change {}", event, e);
> throw new RuntimeException(e);
> }
> }
> private void applyDropTable(DropTableEvent event) {
> try (Connection connection =
> ConnectionFactory.createConnection(flussClientConfig);
> Admin admin = connection.getAdmin()) {
> TableId tableId = event.tableId();
> TablePath tablePath = new TablePath(tableId.getSchemaName(),
> tableId.getTableName());
> admin.dropTable(tablePath, true).get();
> } catch (Exception e) {
> LOG.error("Failed to apply schema change {}", event, e);
> throw new RuntimeException(e);
> }
> } {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)