[
https://issues.apache.org/jira/browse/FLINK-39143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thorne updated FLINK-39143:
---------------------------
Description:
As shown in the code, this part will create a new connection during the process
of handling the ddl of the relevant flux connector. If there are a large number
of tables that need to be initialized, this is not a friendly way, but to use
connection pooling for replacement.
{code:java}
private void applyCreateTable(CreateTableEvent event) {
try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
String tableIdentifier = tablePath.getDatabaseName() + "." +
tablePath.getTableName();
List<String> bucketKeys = bucketKeysMap.get(tableIdentifier);
Integer bucketNum = bucketNumMap.get(tableIdentifier);
TableDescriptor inferredFlussTable =
toFlussTable(event.getSchema(), bucketKeys, bucketNum,
tableProperties);
admin.createDatabase(tablePath.getDatabaseName(),
DatabaseDescriptor.EMPTY, true);
if (!admin.tableExists(tablePath).get()) {
admin.createTable(tablePath, inferredFlussTable, false).get();
} else {
TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
// sanity check to prevent unexpected table schema evolution.
sanityCheck(inferredFlussTable, currentTableInfo);
}
} catch (Exception e) {
LOG.error("Failed to apply schema change {}", event, e);
throw new RuntimeException(e);
}
}
private void applyDropTable(DropTableEvent event) {
try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
admin.dropTable(tablePath, true).get();
} catch (Exception e) {
LOG.error("Failed to apply schema change {}", event, e);
throw new RuntimeException(e);
}
} {code}
was:
As shown in the code, this part will create a new connection during the process
of handling the ddl of the relevant flux connector. If there are a large number
of tables that need to be initialized, this is not a friendly way, but to use
connection pooling for replacement.
{code:java}
//代码占位符
private void applyCreateTable(CreateTableEvent event) {
try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
String tableIdentifier = tablePath.getDatabaseName() + "." +
tablePath.getTableName();
List<String> bucketKeys = bucketKeysMap.get(tableIdentifier);
Integer bucketNum = bucketNumMap.get(tableIdentifier);
TableDescriptor inferredFlussTable =
toFlussTable(event.getSchema(), bucketKeys, bucketNum,
tableProperties);
admin.createDatabase(tablePath.getDatabaseName(),
DatabaseDescriptor.EMPTY, true);
if (!admin.tableExists(tablePath).get()) {
admin.createTable(tablePath, inferredFlussTable, false).get();
} else {
TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
// sanity check to prevent unexpected table schema evolution.
sanityCheck(inferredFlussTable, currentTableInfo);
}
} catch (Exception e) {
LOG.error("Failed to apply schema change {}", event, e);
throw new RuntimeException(e);
}
}
private void applyDropTable(DropTableEvent event) {
try (Connection connection =
ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(),
tableId.getTableName());
admin.dropTable(tablePath, true).get();
} catch (Exception e) {
LOG.error("Failed to apply schema change {}", event, e);
throw new RuntimeException(e);
}
} {code}
> It is better to use a connection pool for fluss's connections
> -------------------------------------------------------------
>
> Key: FLINK-39143
> URL: https://issues.apache.org/jira/browse/FLINK-39143
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: Thorne
> Priority: Major
>
> As shown in the code, this part will create a new connection during the
> process of handling the ddl of the relevant flux connector. If there are a
> large number of tables that need to be initialized, this is not a friendly
> way, but to use connection pooling for replacement.
> {code:java}
> private void applyCreateTable(CreateTableEvent event) {
> try (Connection connection =
> ConnectionFactory.createConnection(flussClientConfig);
> Admin admin = connection.getAdmin()) {
> TableId tableId = event.tableId();
> TablePath tablePath = new TablePath(tableId.getSchemaName(),
> tableId.getTableName());
> String tableIdentifier = tablePath.getDatabaseName() + "." +
> tablePath.getTableName();
> List<String> bucketKeys = bucketKeysMap.get(tableIdentifier);
> Integer bucketNum = bucketNumMap.get(tableIdentifier);
> TableDescriptor inferredFlussTable =
> toFlussTable(event.getSchema(), bucketKeys, bucketNum,
> tableProperties);
> admin.createDatabase(tablePath.getDatabaseName(),
> DatabaseDescriptor.EMPTY, true);
> if (!admin.tableExists(tablePath).get()) {
> admin.createTable(tablePath, inferredFlussTable, false).get();
> } else {
> TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
> // sanity check to prevent unexpected table schema evolution.
> sanityCheck(inferredFlussTable, currentTableInfo);
> }
> } catch (Exception e) {
> LOG.error("Failed to apply schema change {}", event, e);
> throw new RuntimeException(e);
> }
> }
> private void applyDropTable(DropTableEvent event) {
> try (Connection connection =
> ConnectionFactory.createConnection(flussClientConfig);
> Admin admin = connection.getAdmin()) {
> TableId tableId = event.tableId();
> TablePath tablePath = new TablePath(tableId.getSchemaName(),
> tableId.getTableName());
> admin.dropTable(tablePath, true).get();
> } catch (Exception e) {
> LOG.error("Failed to apply schema change {}", event, e);
> throw new RuntimeException(e);
> }
> } {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)