lihaosky commented on code in PR #28085:
URL: https://github.com/apache/flink/pull/28085#discussion_r3244073166
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1791,6 +1836,328 @@ public ResolvedCatalogModel
resolveCatalogModel(CatalogModel model) {
return ResolvedCatalogModel.of(model, resolvedInputSchema,
resolvedOutputSchema);
}
+ // ------ connections ------
+
+ /**
+ * Get a connection from the catalog with the given object identifier.
+ *
+ * @param objectIdentifier The fully qualified path of the connection.
+ * @return The requested connection wrapped in Optional.
+ */
+ public Optional<CatalogConnection> getConnection(ObjectIdentifier
objectIdentifier) {
+ CatalogConnection temporaryConnection =
temporaryConnections.get(objectIdentifier);
+ if (temporaryConnection != null) {
+ return Optional.of(temporaryConnection);
+ }
+
+ Optional<Catalog> catalog =
getCatalog(objectIdentifier.getCatalogName());
+ if (catalog.isPresent()) {
+ try {
+ return
Optional.of(catalog.get().getConnection(objectIdentifier.toObjectPath()));
+ } catch (ConnectionNotExistException |
UnsupportedOperationException e) {
+ // ConnectionNotExistException: connection does not exist in
this catalog.
+ // UnsupportedOperationException: catalog does not support
connections.
+ return Optional.empty();
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * List all connections in the given catalog and database.
+ *
+ * @param catalogName The name of the catalog.
+ * @param databaseName The name of the database.
+ * @return A set of connection names.
+ */
+ public Set<String> listConnections(String catalogName, String
databaseName) {
+ Catalog catalog = getCatalogOrError(catalogName);
+ try {
+ Set<String> connections = new
HashSet<>(catalog.listConnections(databaseName));
+
+ // Add temporary connections for this catalog and database
+ temporaryConnections.keySet().stream()
+ .filter(
+ identifier ->
+
identifier.getCatalogName().equals(catalogName)
+ &&
identifier.getDatabaseName().equals(databaseName))
+ .map(ObjectIdentifier::getObjectName)
+ .forEach(connections::add);
+
+ return connections;
+ } catch (DatabaseNotExistException e) {
+ throw new ValidationException(
+ String.format(
+ "Database %s does not exist in catalog %s.",
databaseName, catalogName),
+ e);
+ } catch (CatalogException e) {
+ throw new TableException(
+ String.format(
+ "Failed to list connections in catalog %s and
database %s.",
+ catalogName, databaseName),
+ e);
+ }
+ }
+
+ /**
+ * Create a permanent connection in the given fully qualified path.
+ *
+ * <p>If a {@link ConnectionFactory} and {@link WritableSecretStore} are
configured, sensitive
+ * fields are extracted from the connection and stored in the secret store
before persisting the
+ * non-sensitive {@link CatalogConnection} to the catalog.
+ *
+ * @param connection The connection with all options including sensitive
fields.
+ * @param objectIdentifier The fully qualified path where to create the
connection.
+ * @param ignoreIfExists If false exception will be thrown if the
connection already exists.
+ */
+ public void createConnection(
+ SensitiveConnection connection,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfExists) {
+ if (connectionFactory == null || writableSecretStore == null) {
+ throw new ValidationException(
+ "ConnectionFactory and WritableSecretStore must be
configured to create connections.");
+ }
+ if (getConnection(objectIdentifier).isPresent()) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new ValidationException(
+ String.format(
+ "Connection with identifier '%s' already exists.",
+ objectIdentifier.asSummaryString()));
+ }
+ final CatalogConnection catalogConnection =
+ connectionFactory.createConnection(connection,
writableSecretStore);
+ boolean persisted = false;
+ try {
+ execute(
+ (catalog, path) -> {
+ catalog.createConnection(path, catalogConnection,
ignoreIfExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+
CreateConnectionEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ catalogConnection,
+ ignoreIfExists,
+ false)));
+ },
+ objectIdentifier,
+ ignoreIfExists,
+ "CreateConnection");
+ persisted = true;
+ } finally {
+ if (!persisted) {
+ tryDeleteSecrets(
+ catalogConnection,
+ writableSecretStore,
+ "rollback createConnection " + objectIdentifier);
+ }
+ }
+ }
+
+ /**
+ * Create a temporary connection in the given fully qualified path.
+ *
+ * @param connection The connection with all options including sensitive
fields.
+ * @param objectIdentifier The fully qualified path where to create the
connection.
+ * @param ignoreIfExists If false exception will be thrown if the
connection already exists.
+ */
+ public void createTemporaryConnection(
+ SensitiveConnection connection,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfExists) {
+ if (connectionFactory == null) {
Review Comment:
I actually fixed it in this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]