lihaosky commented on code in PR #28085:
URL: https://github.com/apache/flink/pull/28085#discussion_r3244073166


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1791,6 +1836,328 @@ public ResolvedCatalogModel 
resolveCatalogModel(CatalogModel model) {
         return ResolvedCatalogModel.of(model, resolvedInputSchema, 
resolvedOutputSchema);
     }
 
+    // ------ connections ------
+
+    /**
+     * Get a connection from the catalog with the given object identifier.
+     *
+     * @param objectIdentifier The fully qualified path of the connection.
+     * @return The requested connection wrapped in Optional.
+     */
+    public Optional<CatalogConnection> getConnection(ObjectIdentifier 
objectIdentifier) {
+        CatalogConnection temporaryConnection = 
temporaryConnections.get(objectIdentifier);
+        if (temporaryConnection != null) {
+            return Optional.of(temporaryConnection);
+        }
+
+        Optional<Catalog> catalog = 
getCatalog(objectIdentifier.getCatalogName());
+        if (catalog.isPresent()) {
+            try {
+                return 
Optional.of(catalog.get().getConnection(objectIdentifier.toObjectPath()));
+            } catch (ConnectionNotExistException | 
UnsupportedOperationException e) {
+                // ConnectionNotExistException: connection does not exist in 
this catalog.
+                // UnsupportedOperationException: catalog does not support 
connections.
+                return Optional.empty();
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * List all connections in the given catalog and database.
+     *
+     * @param catalogName The name of the catalog.
+     * @param databaseName The name of the database.
+     * @return A set of connection names.
+     */
+    public Set<String> listConnections(String catalogName, String 
databaseName) {
+        Catalog catalog = getCatalogOrError(catalogName);
+        try {
+            Set<String> connections = new 
HashSet<>(catalog.listConnections(databaseName));
+
+            // Add temporary connections for this catalog and database
+            temporaryConnections.keySet().stream()
+                    .filter(
+                            identifier ->
+                                    
identifier.getCatalogName().equals(catalogName)
+                                            && 
identifier.getDatabaseName().equals(databaseName))
+                    .map(ObjectIdentifier::getObjectName)
+                    .forEach(connections::add);
+
+            return connections;
+        } catch (DatabaseNotExistException e) {
+            throw new ValidationException(
+                    String.format(
+                            "Database %s does not exist in catalog %s.", 
databaseName, catalogName),
+                    e);
+        } catch (CatalogException e) {
+            throw new TableException(
+                    String.format(
+                            "Failed to list connections in catalog %s and 
database %s.",
+                            catalogName, databaseName),
+                    e);
+        }
+    }
+
+    /**
+     * Create a permanent connection in the given fully qualified path.
+     *
+     * <p>If a {@link ConnectionFactory} and {@link WritableSecretStore} are 
configured, sensitive
+     * fields are extracted from the connection and stored in the secret store 
before persisting the
+     * non-sensitive {@link CatalogConnection} to the catalog.
+     *
+     * @param connection The connection with all options including sensitive 
fields.
+     * @param objectIdentifier The fully qualified path where to create the 
connection.
+     * @param ignoreIfExists If false exception will be thrown if the 
connection already exists.
+     */
+    public void createConnection(
+            SensitiveConnection connection,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfExists) {
+        if (connectionFactory == null || writableSecretStore == null) {
+            throw new ValidationException(
+                    "ConnectionFactory and WritableSecretStore must be 
configured to create connections.");
+        }
+        if (getConnection(objectIdentifier).isPresent()) {
+            if (ignoreIfExists) {
+                return;
+            }
+            throw new ValidationException(
+                    String.format(
+                            "Connection with identifier '%s' already exists.",
+                            objectIdentifier.asSummaryString()));
+        }
+        final CatalogConnection catalogConnection =
+                connectionFactory.createConnection(connection, 
writableSecretStore);
+        boolean persisted = false;
+        try {
+            execute(
+                    (catalog, path) -> {
+                        catalog.createConnection(path, catalogConnection, 
ignoreIfExists);
+                        catalogModificationListeners.forEach(
+                                listener ->
+                                        listener.onEvent(
+                                                
CreateConnectionEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        catalogConnection,
+                                                        ignoreIfExists,
+                                                        false)));
+                    },
+                    objectIdentifier,
+                    ignoreIfExists,
+                    "CreateConnection");
+            persisted = true;
+        } finally {
+            if (!persisted) {
+                tryDeleteSecrets(
+                        catalogConnection,
+                        writableSecretStore,
+                        "rollback createConnection " + objectIdentifier);
+            }
+        }
+    }
+
+    /**
+     * Create a temporary connection in the given fully qualified path.
+     *
+     * @param connection The connection with all options including sensitive 
fields.
+     * @param objectIdentifier The fully qualified path where to create the 
connection.
+     * @param ignoreIfExists If false exception will be thrown if the 
connection already exists.
+     */
+    public void createTemporaryConnection(
+            SensitiveConnection connection,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfExists) {
+        if (connectionFactory == null) {

Review Comment:
   I actually fixed it in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to