[ 
https://issues.apache.org/jira/browse/FLINK-39143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thorne updated FLINK-39143:
---------------------------
    Description: 
As shown in the code, this part will create a new connection during the process 
of handling the ddl of the relevant flux connector. If there are a large number 
of tables that need to be initialized, this is not a friendly way, but to use 
connection pooling for replacement.
{code:java}
//代码占位符
private void applyCreateTable(CreateTableEvent event) {
    try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
            Admin admin = connection.getAdmin()) {
        TableId tableId = event.tableId();
        TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
        String tableIdentifier = tablePath.getDatabaseName() + "." + 
tablePath.getTableName();
        List<String> bucketKeys = bucketKeysMap.get(tableIdentifier);
        Integer bucketNum = bucketNumMap.get(tableIdentifier);
        TableDescriptor inferredFlussTable =
                toFlussTable(event.getSchema(), bucketKeys, bucketNum, 
tableProperties);
        admin.createDatabase(tablePath.getDatabaseName(), 
DatabaseDescriptor.EMPTY, true);
        if (!admin.tableExists(tablePath).get()) {
            admin.createTable(tablePath, inferredFlussTable, false).get();
        } else {
            TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
            // sanity check to prevent unexpected table schema evolution.
            sanityCheck(inferredFlussTable, currentTableInfo);
        }
    } catch (Exception e) {
        LOG.error("Failed to apply schema change {}", event, e);
        throw new RuntimeException(e);
    }
}

private void applyDropTable(DropTableEvent event) {
    try (Connection connection = 
ConnectionFactory.createConnection(flussClientConfig);
            Admin admin = connection.getAdmin()) {
        TableId tableId = event.tableId();
        TablePath tablePath = new TablePath(tableId.getSchemaName(), 
tableId.getTableName());
        admin.dropTable(tablePath, true).get();
    } catch (Exception e) {
        LOG.error("Failed to apply schema change {}", event, e);
        throw new RuntimeException(e);
    }
}  {code}
 

 

 

> fluss‘s connection use connection pool be better
> ------------------------------------------------
>
>                 Key: FLINK-39143
>                 URL: https://issues.apache.org/jira/browse/FLINK-39143
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: Thorne
>            Priority: Major
>
> As shown in the code, this part will create a new connection during the 
> process of handling the ddl of the relevant flux connector. If there are a 
> large number of tables that need to be initialized, this is not a friendly 
> way, but to use connection pooling for replacement.
> {code:java}
> //代码占位符
> private void applyCreateTable(CreateTableEvent event) {
>     try (Connection connection = 
> ConnectionFactory.createConnection(flussClientConfig);
>             Admin admin = connection.getAdmin()) {
>         TableId tableId = event.tableId();
>         TablePath tablePath = new TablePath(tableId.getSchemaName(), 
> tableId.getTableName());
>         String tableIdentifier = tablePath.getDatabaseName() + "." + 
> tablePath.getTableName();
>         List<String> bucketKeys = bucketKeysMap.get(tableIdentifier);
>         Integer bucketNum = bucketNumMap.get(tableIdentifier);
>         TableDescriptor inferredFlussTable =
>                 toFlussTable(event.getSchema(), bucketKeys, bucketNum, 
> tableProperties);
>         admin.createDatabase(tablePath.getDatabaseName(), 
> DatabaseDescriptor.EMPTY, true);
>         if (!admin.tableExists(tablePath).get()) {
>             admin.createTable(tablePath, inferredFlussTable, false).get();
>         } else {
>             TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
>             // sanity check to prevent unexpected table schema evolution.
>             sanityCheck(inferredFlussTable, currentTableInfo);
>         }
>     } catch (Exception e) {
>         LOG.error("Failed to apply schema change {}", event, e);
>         throw new RuntimeException(e);
>     }
> }
> private void applyDropTable(DropTableEvent event) {
>     try (Connection connection = 
> ConnectionFactory.createConnection(flussClientConfig);
>             Admin admin = connection.getAdmin()) {
>         TableId tableId = event.tableId();
>         TablePath tablePath = new TablePath(tableId.getSchemaName(), 
> tableId.getTableName());
>         admin.dropTable(tablePath, true).get();
>     } catch (Exception e) {
>         LOG.error("Failed to apply schema change {}", event, e);
>         throw new RuntimeException(e);
>     }
> }  {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to