This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 587a9d8b93e [FLINK-39590][table] Add support for CONNECTION in catalog
APIs
587a9d8b93e is described below
commit 587a9d8b93e9b0089d645bb7c4da54dba2619a81
Author: Hao Li <[email protected]>
AuthorDate: Tue May 26 08:08:11 2026 -0700
[FLINK-39590][table] Add support for CONNECTION in catalog APIs
This closes #28085.
---
.../table/tests/test_catalog_completeness.py | 8 +-
.../table/api/internal/TableEnvironmentImpl.java | 7 +
.../apache/flink/table/catalog/CatalogManager.java | 383 +++++++++++++++++++-
.../table/catalog/GenericInMemoryCatalog.java | 76 ++++
.../catalog/listener/AlterConnectionEvent.java | 72 ++++
.../listener/ConnectionModificationEvent.java | 34 ++
.../catalog/listener/CreateConnectionEvent.java | 71 ++++
.../catalog/listener/DropConnectionEvent.java | 72 ++++
.../flink/table/catalog/CatalogManagerTest.java | 222 ++++++++++++
.../table/catalog/GenericInMemoryCatalogTest.java | 5 +
.../org/apache/flink/table/catalog/Catalog.java | 100 ++++++
.../ConnectionAlreadyExistException.java | 38 ++
.../exceptions/ConnectionNotExistException.java | 46 +++
.../flink/table/factories/ConnectionFactory.java | 93 +++++
.../table/factories/DefaultConnectionFactory.java | 191 ++++++++++
.../apache/flink/table/factories/FactoryUtil.java | 8 +
.../flink/table/secret/ReadableSecretStore.java | 5 +-
.../flink/table/secret/WritableSecretStore.java | 13 +-
.../org.apache.flink.table.factories.Factory | 1 +
.../apache/flink/table/catalog/CatalogTest.java | 222 ++++++++++++
.../factories/DefaultConnectionFactoryTest.java | 392 +++++++++++++++++++++
21 files changed, 2052 insertions(+), 7 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py
b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index c1ab64c96aa..3e20aab37ca 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -45,7 +45,13 @@ class
CatalogAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase
'getFactory',
'getTableFactory',
'getFunctionDefinitionFactory',
- 'listPartitionsByFilter'}
+ 'listPartitionsByFilter',
+ 'getConnection',
+ 'dropConnection',
+ 'connectionExists',
+ 'listConnections',
+ 'createConnection',
+ 'alterConnection'}
class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 4743663e8a4..496369e78f2 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -122,6 +122,7 @@ import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.secret.SecretStore;
import org.apache.flink.table.secret.SecretStoreFactory;
+import org.apache.flink.table.secret.WritableSecretStore;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -275,6 +276,11 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
final ResourceManager resourceManager =
new ResourceManager(settings.getConfiguration(),
userClassLoader);
final ModuleManager moduleManager = new ModuleManager();
+ final WritableSecretStore writableSecretStore =
+ secretStore instanceof WritableSecretStore
+ ? (WritableSecretStore) secretStore
+ : null;
+
final CatalogManager catalogManager =
CatalogManager.newBuilder()
.classLoader(userClassLoader)
@@ -297,6 +303,7 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
.sqlFactory(
settings.getSqlFactory()
.orElseGet(() ->
DefaultSqlFactory.INSTANCE))
+ .writableSecretStore(writableSecretStore)
.build();
final FunctionCatalog functionCatalog =
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 047d2c89d5b..cd375bc9554 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.EnvironmentSettings;
@@ -33,6 +34,8 @@ import
org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.StartMode.StartModeKind;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import
org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -41,14 +44,17 @@ import
org.apache.flink.table.catalog.exceptions.ModelNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.listener.AlterConnectionEvent;
import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterModelEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogContext;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
+import org.apache.flink.table.catalog.listener.CreateConnectionEvent;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateModelEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
+import org.apache.flink.table.catalog.listener.DropConnectionEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropModelEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;
@@ -57,9 +63,14 @@ import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.DefaultSqlFactory;
import org.apache.flink.table.expressions.SqlFactory;
import
org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
+import org.apache.flink.table.factories.ConnectionFactory;
+import org.apache.flink.table.factories.DefaultConnectionFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.secret.GenericInMemorySecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -98,6 +109,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*/
@Internal
public final class CatalogManager implements CatalogRegistry, AutoCloseable {
+
private static final Logger LOG =
LoggerFactory.getLogger(CatalogManager.class);
// A map between names and catalogs.
@@ -111,6 +123,15 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
// models coming from catalogs.
private final Map<ObjectIdentifier, CatalogModel> temporaryModels;
+ // Those connections take precedence over corresponding permanent
connections, thus they shadow
+ // connections coming from catalogs.
+ private final Map<ObjectIdentifier, CatalogConnection>
temporaryConnections;
+
+ // Backing store for secrets of temporary connections. Lifetime is tied to
this
+ // CatalogManager — temporary connections are session-scoped, so their
secrets should not
+ // be persisted in the configured (potentially persistent)
writableSecretStore.
+ private final WritableSecretStore temporarySecretStore;
+
// The name of the current catalog and database
private @Nullable String currentCatalogName;
@@ -132,6 +153,10 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
private final MaterializedTableEnricher materializedTableEnricher;
+ private final ClassLoader userClassLoader;
+
+ @Nullable private final WritableSecretStore writableSecretStore;
+
private CatalogManager(
String defaultCatalogName,
Catalog defaultCatalog,
@@ -139,7 +164,9 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
List<CatalogModificationListener> catalogModificationListeners,
CatalogStoreHolder catalogStoreHolder,
SqlFactory sqlFactory,
- MaterializedTableEnricher materializedTableEnricher) {
+ MaterializedTableEnricher materializedTableEnricher,
+ ClassLoader userClassLoader,
+ @Nullable WritableSecretStore writableSecretStore) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
"Default catalog name cannot be null or empty");
@@ -152,6 +179,8 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
temporaryTables = new HashMap<>();
temporaryModels = new HashMap<>();
+ temporaryConnections = new HashMap<>();
+ temporarySecretStore = new GenericInMemorySecretStore();
// right now the default catalog is always the built-in one
builtInCatalogName = defaultCatalogName;
@@ -164,6 +193,21 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
this.sqlFactory = sqlFactory;
this.materializedTableEnricher =
checkNotNull(materializedTableEnricher,
"MaterializedTableEnricher cannot be null");
+ this.userClassLoader = checkNotNull(userClassLoader, "User class
loader cannot be null");
+ this.writableSecretStore = writableSecretStore;
+ }
+
+ /**
+ * Discovers the {@link ConnectionFactory} for the given connection
options via SPI, using the
+ * {@link FactoryUtil#CONNECTION_TYPE} option as the factory identifier.
Falls back to {@link
+ * DefaultConnectionFactory} via the option's default value when {@code
type} is absent.
+ *
+ * @param options the options of the connection being created / altered /
dropped
+ * @return the discovered factory
+ */
+ private ConnectionFactory discoverConnectionFactory(Map<String, String>
options) {
+ final String identifier =
Configuration.fromMap(options).get(FactoryUtil.CONNECTION_TYPE);
+ return FactoryUtil.discoverFactory(userClassLoader,
ConnectionFactory.class, identifier);
}
@VisibleForTesting
@@ -203,6 +247,8 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
private MaterializedTableEnricher materializedTableEnricher;
+ private @Nullable WritableSecretStore writableSecretStore;
+
public Builder classLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
@@ -251,6 +297,11 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
return this;
}
+ public Builder writableSecretStore(@Nullable WritableSecretStore
writableSecretStore) {
+ this.writableSecretStore = writableSecretStore;
+ return this;
+ }
+
public CatalogManager build() {
checkNotNull(classLoader, "Class loader cannot be null");
checkNotNull(config, "Config cannot be null");
@@ -271,7 +322,9 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
sqlFactory,
materializedTableEnricher != null
? materializedTableEnricher
- : createDefaultMaterializedTableEnricher());
+ : createDefaultMaterializedTableEnricher(),
+ classLoader,
+ writableSecretStore);
}
private MaterializedTableEnricher
createDefaultMaterializedTableEnricher() {
@@ -1791,6 +1844,330 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
return ResolvedCatalogModel.of(model, resolvedInputSchema,
resolvedOutputSchema);
}
+ // ------ connections ------
+
+ /**
+ * Get a connection from the catalog with the given object identifier.
+ *
+ * @param objectIdentifier The fully qualified path of the connection.
+ * @return The requested connection wrapped in Optional.
+ */
+ public Optional<CatalogConnection> getConnection(ObjectIdentifier
objectIdentifier) {
+ CatalogConnection temporaryConnection =
temporaryConnections.get(objectIdentifier);
+ if (temporaryConnection != null) {
+ return Optional.of(temporaryConnection);
+ }
+
+ Optional<Catalog> catalog =
getCatalog(objectIdentifier.getCatalogName());
+ if (catalog.isPresent()) {
+ try {
+ return
Optional.of(catalog.get().getConnection(objectIdentifier.toObjectPath()));
+ } catch (ConnectionNotExistException |
UnsupportedOperationException e) {
+ // ConnectionNotExistException: connection does not exist in
this catalog.
+ // UnsupportedOperationException: catalog does not support
connections.
+ return Optional.empty();
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * List all connections in the given catalog and database.
+ *
+ * @param catalogName The name of the catalog.
+ * @param databaseName The name of the database.
+ * @return A set of connection names.
+ */
+ public Set<String> listConnections(String catalogName, String
databaseName) {
+ Catalog catalog = getCatalogOrError(catalogName);
+ try {
+ Set<String> connections = new
HashSet<>(catalog.listConnections(databaseName));
+
+ // Add temporary connections for this catalog and database
+ temporaryConnections.keySet().stream()
+ .filter(
+ identifier ->
+
identifier.getCatalogName().equals(catalogName)
+ &&
identifier.getDatabaseName().equals(databaseName))
+ .map(ObjectIdentifier::getObjectName)
+ .forEach(connections::add);
+
+ return connections;
+ } catch (DatabaseNotExistException e) {
+ throw new ValidationException(
+ String.format(
+ "Database '%s' does not exist in catalog '%s'.",
+ databaseName, catalogName),
+ e);
+ } catch (CatalogException e) {
+ throw new TableException(
+ String.format(
+ "Failed to list connections in catalog '%s' and
database '%s'.",
+ catalogName, databaseName),
+ e);
+ }
+ }
+
+ /**
+ * Create a permanent connection in the given fully qualified path.
+ *
+ * <p>The {@link ConnectionFactory} is discovered from the connection's
{@code type} option (see
+ * FLIP-529). If a {@link WritableSecretStore} is configured, sensitive
fields are extracted
+ * from the connection and stored in the secret store before persisting
the non-sensitive {@link
+ * CatalogConnection} to the catalog.
+ *
+ * @param connection The connection with all options including sensitive
fields.
+ * @param objectIdentifier The fully qualified path where to create the
connection.
+ * @param ignoreIfExists If false exception will be thrown if the
connection already exists.
+ */
+ public void createConnection(
+ SensitiveConnection connection,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfExists) {
+ if (writableSecretStore == null) {
+ throw new ValidationException(
+ "WritableSecretStore must be configured to create
connections.");
+ }
+ if (getConnection(objectIdentifier).isPresent()) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new ValidationException(
+ String.format(
+ "Connection with identifier '%s' already exists.",
+ objectIdentifier.asSummaryString()));
+ }
+ final ConnectionFactory connectionFactory =
+ discoverConnectionFactory(connection.getOptions());
+ final CatalogConnection catalogConnection =
+ connectionFactory.createConnection(connection,
writableSecretStore);
+ boolean persisted = false;
+ try {
+ execute(
+ (catalog, path) -> {
+ catalog.createConnection(path, catalogConnection,
ignoreIfExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+
CreateConnectionEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ catalogConnection,
+ ignoreIfExists,
+ false)));
+ },
+ objectIdentifier,
+ ignoreIfExists,
+ "CreateConnection");
+ persisted = true;
+ } finally {
+ if (!persisted) {
+ tryDeleteSecrets(
+ catalogConnection,
+ writableSecretStore,
+ "rollback createConnection " + objectIdentifier);
+ }
+ }
+ }
+
+ /**
+ * Create a temporary connection in the given fully qualified path.
+ *
+ * @param connection The connection with all options including sensitive
fields.
+ * @param objectIdentifier The fully qualified path where to create the
connection.
+ * @param ignoreIfExists If false exception will be thrown if the
connection already exists.
+ */
+ public void createTemporaryConnection(
+ SensitiveConnection connection,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfExists) {
+ if (temporaryConnections.containsKey(objectIdentifier)) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new ValidationException(
+ String.format("Temporary connection '%s' already exists.",
objectIdentifier));
+ }
+ // Temporary connections are session-scoped; store secrets in an
in-memory store rather
+ // than the configured (potentially persistent) writableSecretStore.
+ final ConnectionFactory connectionFactory =
+ discoverConnectionFactory(connection.getOptions());
+ final CatalogConnection catalogConnection =
+ connectionFactory.createConnection(connection,
temporarySecretStore);
+ temporaryConnections.put(objectIdentifier, catalogConnection);
+ Catalog catalog =
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ CreateConnectionEvent.createEvent(
+ CatalogContext.createContext(
+
objectIdentifier.getCatalogName(), catalog),
+ objectIdentifier,
+ catalogConnection,
+ ignoreIfExists,
+ true)));
+ }
+
+ /**
+ * Alter a connection in the given fully qualified path.
+ *
+ * @param newConnection The new connection containing changes.
+ * @param objectIdentifier The fully qualified path where to alter the
connection.
+ * @param ignoreIfNotExists If false exception will be thrown if the
connection to be altered
+ * does not exist.
+ */
+ public void alterConnection(
+ SensitiveConnection newConnection,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfNotExists) {
+ if (writableSecretStore == null) {
+ throw new ValidationException(
+ "WritableSecretStore must be configured to alter
connections.");
+ }
+ Optional<CatalogConnection> existingOpt =
getConnection(objectIdentifier);
+ if (!existingOpt.isPresent()) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new ValidationException(
+ String.format(
+ "Connection with identifier '%s' does not exist.",
+ objectIdentifier.asSummaryString()));
+ }
+ final CatalogConnection existing = existingOpt.get();
+ final ConnectionFactory connectionFactory =
+ discoverConnectionFactory(newConnection.getOptions());
+ final CatalogConnection newCatalogConnection =
+ connectionFactory.createConnection(newConnection,
writableSecretStore);
+ boolean persisted = false;
+ try {
+ execute(
+ (catalog, path) -> {
+ catalog.alterConnection(path, newCatalogConnection,
ignoreIfNotExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+
AlterConnectionEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ newCatalogConnection,
+ ignoreIfNotExists)));
+ },
+ objectIdentifier,
+ ignoreIfNotExists,
+ "AlterConnection");
+ persisted = true;
+ } finally {
+ // On success: drop the OLD secret. On failure: drop the
freshly-stored NEW secret.
+ tryDeleteSecrets(
+ persisted ? existing : newCatalogConnection,
+ writableSecretStore,
+ persisted
+ ? "post-alter cleanup of old secret for " +
objectIdentifier
+ : "rollback alterConnection " + objectIdentifier);
+ }
+ }
+
+ /**
+ * Drop a permanent connection from the given fully qualified path.
+ *
+ * @param objectIdentifier The fully qualified path of the connection to
be dropped.
+ * @param ignoreIfNotExists If false exception will be thrown if the
connection to be dropped
+ * does not exist.
+ */
+ public void dropConnection(ObjectIdentifier objectIdentifier, boolean
ignoreIfNotExists) {
+ Optional<CatalogConnection> existingOpt =
getConnection(objectIdentifier);
+ if (!existingOpt.isPresent()) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new ValidationException(
+ String.format(
+ "Connection with identifier '%s' does not exist.",
+ objectIdentifier.asSummaryString()));
+ }
+ final CatalogConnection existing = existingOpt.get();
+ execute(
+ (catalog, path) -> {
+ catalog.dropConnection(path, ignoreIfNotExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ DropConnectionEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ existing,
+ ignoreIfNotExists,
+ false)));
+ },
+ objectIdentifier,
+ ignoreIfNotExists,
+ "DropConnection");
+ if (writableSecretStore != null) {
+ tryDeleteSecrets(
+ existing, writableSecretStore, "post-drop cleanup for " +
objectIdentifier);
+ }
+ }
+
+ /**
+ * Drop a temporary connection from the given fully qualified path.
+ *
+ * @param objectIdentifier The fully qualified path of the connection to
be dropped.
+ * @param ignoreIfNotExists If false exception will be thrown if the
connection to be dropped
+ * does not exist.
+ */
+ public void dropTemporaryConnection(
+ ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
+ CatalogConnection connection =
temporaryConnections.get(objectIdentifier);
+ if (connection != null) {
+ temporaryConnections.remove(objectIdentifier);
+ Catalog catalog =
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ DropConnectionEvent.createEvent(
+ CatalogContext.createContext(
+
objectIdentifier.getCatalogName(), catalog),
+ objectIdentifier,
+ connection,
+ ignoreIfNotExists,
+ true)));
+ tryDeleteSecrets(
+ connection,
+ temporarySecretStore,
+ "post-drop cleanup of temporary " + objectIdentifier);
+ } else if (!ignoreIfNotExists) {
+ throw new ValidationException(
+ String.format(
+ "Temporary connection with identifier '%s' does
not exist.",
+ objectIdentifier.asSummaryString()));
+ }
+ }
+
+ /**
+ * Best-effort cleanup of a connection's secrets. The catalog state has
already been mutated (or
+ * failed); a cleanup failure should not mask the user-visible result.
Logs the failure (which
+ * may indicate an orphaned secret in the underlying store) and swallows
the exception.
+ */
+ private void tryDeleteSecrets(
+ CatalogConnection connection, WritableSecretStore store, String
context) {
+ try {
+
discoverConnectionFactory(connection.getOptions()).deleteSecrets(connection,
store);
+ } catch (SecretException | ValidationException e) {
+ LOG.warn(
+ "Failed to delete connection secrets during {}; the
catalog state is correct, but the secret may be orphaned in the secret store.",
+ context,
+ e);
+ }
+ }
+
/**
* A command that modifies given {@link Catalog} in an {@link ObjectPath}.
This unifies error
* handling across different commands.
@@ -1813,6 +2190,8 @@ public final class CatalogManager implements
CatalogRegistry, AutoCloseable {
| TableNotExistException
| ModelNotExistException
| ModelAlreadyExistException
+ | ConnectionNotExistException
+ | ConnectionAlreadyExistException
| DatabaseNotExistException e) {
throw new
ValidationException(getErrorMessage(objectIdentifier, commandName), e);
} catch (Exception e) {
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 15c8f00dc91..bc678672d91 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import
org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -59,6 +61,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog {
private final Map<String, CatalogDatabase> databases;
private final Map<ObjectPath, CatalogBaseTable> tables;
private final Map<ObjectPath, CatalogModel> models;
+ private final Map<ObjectPath, CatalogConnection> connections;
private final Map<ObjectPath, CatalogFunction> functions;
private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>>
partitions;
@@ -79,6 +82,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog {
this.databases.put(defaultDatabase, new CatalogDatabaseImpl(new
HashMap<>(), null));
this.tables = new LinkedHashMap<>();
this.models = new LinkedHashMap<>();
+ this.connections = new LinkedHashMap<>();
this.functions = new LinkedHashMap<>();
this.partitions = new LinkedHashMap<>();
this.tableStats = new LinkedHashMap<>();
@@ -453,6 +457,78 @@ public class GenericInMemoryCatalog extends
AbstractCatalog {
return databaseExists(modelPath.getDatabaseName()) &&
models.containsKey(modelPath);
}
+ // ------ connections ------
+
+ @Override
+ public void createConnection(
+ ObjectPath connectionPath, CatalogConnection connection, boolean
ignoreIfExists)
+ throws ConnectionAlreadyExistException, DatabaseNotExistException {
+ checkNotNull(connectionPath);
+ checkNotNull(connection);
+ if (!databaseExists(connectionPath.getDatabaseName())) {
+ throw new DatabaseNotExistException(getName(),
connectionPath.getDatabaseName());
+ }
+ if (connectionExists(connectionPath)) {
+ if (!ignoreIfExists) {
+ throw new ConnectionAlreadyExistException(getName(),
connectionPath);
+ }
+ } else {
+ connections.put(connectionPath, connection.copy());
+ }
+ }
+
+ @Override
+ public void alterConnection(
+ ObjectPath connectionPath, CatalogConnection newConnection,
boolean ignoreIfNotExists)
+ throws ConnectionNotExistException {
+ checkNotNull(connectionPath);
+ checkNotNull(newConnection);
+
+ if (!connectionExists(connectionPath)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new ConnectionNotExistException(getName(), connectionPath);
+ }
+
+ connections.put(connectionPath, newConnection.copy());
+ }
+
+ @Override
+ public void dropConnection(ObjectPath connectionPath, boolean
ignoreIfNotExists)
+ throws ConnectionNotExistException {
+ checkNotNull(connectionPath);
+ if (connectionExists(connectionPath)) {
+ connections.remove(connectionPath);
+ } else if (!ignoreIfNotExists) {
+ throw new ConnectionNotExistException(getName(), connectionPath);
+ }
+ }
+
+ @Override
+ public List<String> listConnections(String databaseName) throws
DatabaseNotExistException {
+ return listObjectsUnderDatabase(connections, databaseName, k -> true);
+ }
+
+ @Override
+ public CatalogConnection getConnection(ObjectPath connectionPath)
+ throws ConnectionNotExistException {
+ checkNotNull(connectionPath);
+
+ if (!connectionExists(connectionPath)) {
+ throw new ConnectionNotExistException(getName(), connectionPath);
+ } else {
+ return connections.get(connectionPath).copy();
+ }
+ }
+
+ @Override
+ public boolean connectionExists(ObjectPath connectionPath) {
+ checkNotNull(connectionPath);
+ return databaseExists(connectionPath.getDatabaseName())
+ && connections.containsKey(connectionPath);
+ }
+
// ------ functions ------
@Override
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java
new file mode 100644
index 00000000000..475a877386f
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** When a connection is altered, a {@link AlterConnectionEvent} event will be
created and fired. */
+@PublicEvolving
+public interface AlterConnectionEvent extends ConnectionModificationEvent {
+ ObjectIdentifier identifier();
+
+ CatalogConnection newConnection();
+
+ boolean ignoreIfNotExists();
+
+ static AlterConnectionEvent createEvent(
+ final CatalogContext context,
+ final ObjectIdentifier identifier,
+ final CatalogConnection newConnection,
+ final boolean ignoreIfNotExists) {
+ return new AlterConnectionEvent() {
+ @Override
+ public CatalogConnection newConnection() {
+ return newConnection;
+ }
+
+ @Override
+ public boolean ignoreIfNotExists() {
+ return ignoreIfNotExists;
+ }
+
+ @Override
+ public ObjectIdentifier identifier() {
+ return identifier;
+ }
+
+ @Override
+ public CatalogConnection connection() {
+ throw new IllegalStateException(
+ "There is no connection in AlterConnectionEvent, use
identifier() instead.");
+ }
+
+ @Override
+ public boolean isTemporary() {
+ return false;
+ }
+
+ @Override
+ public CatalogContext context() {
+ return context;
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java
new file mode 100644
index 00000000000..98609622f88
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** Basic event for connection modification such as create, alter and drop. */
+@PublicEvolving
+public interface ConnectionModificationEvent extends CatalogModificationEvent {
+
+ ObjectIdentifier identifier();
+
+ CatalogConnection connection();
+
+ boolean isTemporary();
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java
new file mode 100644
index 00000000000..21a5297132b
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/**
+ * When a connection is created, a {@link CreateConnectionEvent} event will be
created and fired.
+ */
+@PublicEvolving
+public interface CreateConnectionEvent extends ConnectionModificationEvent {
+ ObjectIdentifier identifier();
+
+ CatalogConnection connection();
+
+ boolean ignoreIfExists();
+
+ boolean isTemporary();
+
+ static CreateConnectionEvent createEvent(
+ final CatalogContext context,
+ final ObjectIdentifier identifier,
+ final CatalogConnection connection,
+ final boolean ignoreIfExists,
+ final boolean isTemporary) {
+ return new CreateConnectionEvent() {
+ @Override
+ public boolean ignoreIfExists() {
+ return ignoreIfExists;
+ }
+
+ @Override
+ public ObjectIdentifier identifier() {
+ return identifier;
+ }
+
+ @Override
+ public CatalogConnection connection() {
+ return connection;
+ }
+
+ @Override
+ public CatalogContext context() {
+ return context;
+ }
+
+ @Override
+ public boolean isTemporary() {
+ return isTemporary;
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java
new file mode 100644
index 00000000000..3affd30326e
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import javax.annotation.Nullable;
+
+/** When a connection is dropped, a {@link DropConnectionEvent} event will be
created and fired. */
+@PublicEvolving
+public interface DropConnectionEvent extends ConnectionModificationEvent {
+ ObjectIdentifier identifier();
+
+ boolean ignoreIfNotExists();
+
+ boolean isTemporary();
+
+ CatalogConnection connection();
+
+ static DropConnectionEvent createEvent(
+ final CatalogContext context,
+ final ObjectIdentifier identifier,
+ @Nullable final CatalogConnection connection,
+ final boolean ignoreIfNotExists,
+ final boolean isTemporary) {
+ return new DropConnectionEvent() {
+ @Override
+ public boolean ignoreIfNotExists() {
+ return ignoreIfNotExists;
+ }
+
+ @Override
+ public ObjectIdentifier identifier() {
+ return identifier;
+ }
+
+ @Override
+ @Nullable
+ public CatalogConnection connection() {
+ return connection;
+ }
+
+ @Override
+ public CatalogContext context() {
+ return context;
+ }
+
+ @Override
+ public boolean isTemporary() {
+ return isTemporary;
+ }
+ };
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index 4b79b18c9d1..907d03dfcf8 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -22,17 +22,22 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.listener.AlterConnectionEvent;
import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterModelEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
+import org.apache.flink.table.catalog.listener.CreateConnectionEvent;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateModelEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
+import org.apache.flink.table.catalog.listener.DropConnectionEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropModelEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;
+import org.apache.flink.table.secret.GenericInMemorySecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExpressionResolverMocks;
import org.apache.flink.table.utils.ParserMock;
@@ -365,6 +370,180 @@ class CatalogManagerTest {
assertThat(dropTemporaryEvent.identifier().getObjectName()).isEqualTo("model2");
}
+ @Test
+ public void testConnectionModificationListener() throws Exception {
+ CompletableFuture<CreateConnectionEvent> createFuture = new
CompletableFuture<>();
+ CompletableFuture<CreateConnectionEvent> createTemporaryFuture = new
CompletableFuture<>();
+ CompletableFuture<AlterConnectionEvent> alterFuture = new
CompletableFuture<>();
+ CompletableFuture<DropConnectionEvent> dropFuture = new
CompletableFuture<>();
+ CompletableFuture<DropConnectionEvent> dropTemporaryFuture = new
CompletableFuture<>();
+ WritableSecretStore secretStore = new GenericInMemorySecretStore();
+ CatalogManager catalogManager =
+ CatalogManagerMocks.preparedCatalogManager()
+ .defaultCatalog("default", new
GenericInMemoryCatalog("default"))
+ .classLoader(CatalogManagerTest.class.getClassLoader())
+ .config(new Configuration())
+ .catalogModificationListeners(
+ Collections.singletonList(
+ new
TestingConnectionModificationListener(
+ createFuture,
+ createTemporaryFuture,
+ alterFuture,
+ dropFuture,
+ dropTemporaryFuture)))
+ .catalogStoreHolder(
+ CatalogStoreHolder.newBuilder()
+
.classloader(CatalogManagerTest.class.getClassLoader())
+ .catalogStore(new
GenericInMemoryCatalogStore())
+ .config(new Configuration())
+ .build())
+ .writableSecretStore(secretStore)
+ .build();
+
+ catalogManager.initSchemaResolver(
+ true, ExpressionResolverMocks.dummyResolver(), new
ParserMock());
+
+ HashMap<String, String> options =
+ new HashMap<String, String>() {
+ {
+ put("type", "default");
+ put("bootstrap.servers", "localhost:9092");
+ put("password", "secret-pw");
+ }
+ };
+
+ // Create a connection
+ catalogManager.createConnection(
+ SensitiveConnection.of(options, null),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "conn1"),
+ true);
+ CreateConnectionEvent createConnectionEvent = createFuture.get(10,
TimeUnit.SECONDS);
+
assertThat(createConnectionEvent.identifier().getObjectName()).isEqualTo("conn1");
+ assertThat(createConnectionEvent.ignoreIfExists()).isTrue();
+ assertThat(createConnectionEvent.isTemporary()).isFalse();
+ // Sensitive field should be stripped from the persisted
CatalogConnection
+
assertThat(createConnectionEvent.connection().getOptions()).doesNotContainKey("password");
+
+ // Create a temporary connection
+ catalogManager.createTemporaryConnection(
+ SensitiveConnection.of(options, null),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "conn2"),
+ false);
+ CreateConnectionEvent createTemporaryEvent =
+ createTemporaryFuture.get(10, TimeUnit.SECONDS);
+ assertThat(createTemporaryEvent.isTemporary()).isTrue();
+
assertThat(createTemporaryEvent.identifier().getObjectName()).isEqualTo("conn2");
+ assertThat(createTemporaryEvent.ignoreIfExists()).isFalse();
+
+ // Alter a connection
+ HashMap<String, String> alteredOptions =
+ new HashMap<String, String>() {
+ {
+ put("type", "default");
+ put("bootstrap.servers", "remote:9092");
+ put("password", "rotated-pw");
+ }
+ };
+ catalogManager.alterConnection(
+ SensitiveConnection.of(alteredOptions, "conn1 comment"),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "conn1"),
+ false);
+ AlterConnectionEvent alterEvent = alterFuture.get(10,
TimeUnit.SECONDS);
+ assertThat(alterEvent.identifier().getObjectName()).isEqualTo("conn1");
+ assertThat(alterEvent.newConnection().getComment()).isEqualTo("conn1
comment");
+
assertThat(alterEvent.newConnection().getOptions().get("bootstrap.servers"))
+ .isEqualTo("remote:9092");
+
assertThat(alterEvent.newConnection().getOptions()).doesNotContainKey("password");
+ assertThat(alterEvent.ignoreIfNotExists()).isFalse();
+
+ // Drop a connection
+ ObjectIdentifier oi =
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "conn1");
+ catalogManager.dropConnection(oi, true);
+ DropConnectionEvent dropEvent = dropFuture.get(10, TimeUnit.SECONDS);
+ assertThat(dropEvent.ignoreIfNotExists()).isTrue();
+ assertThat(dropEvent.identifier().getObjectName()).isEqualTo("conn1");
+ assertThat(dropEvent.isTemporary()).isFalse();
+
+ // Drop a temporary connection
+ catalogManager.dropTemporaryConnection(
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "conn2"),
+ false);
+ DropConnectionEvent dropTemporaryEvent = dropTemporaryFuture.get(10,
TimeUnit.SECONDS);
+ assertThat(dropTemporaryEvent.isTemporary()).isTrue();
+ assertThat(dropTemporaryEvent.ignoreIfNotExists()).isFalse();
+
assertThat(dropTemporaryEvent.identifier().getObjectName()).isEqualTo("conn2");
+ }
+
+ @Test
+ public void testCreateConnectionWithoutTypeFallsBackToDefaultFactory()
throws Exception {
+ CompletableFuture<CreateConnectionEvent> createFuture = new
CompletableFuture<>();
+ WritableSecretStore secretStore = new GenericInMemorySecretStore();
+ CatalogManager catalogManager =
+ CatalogManagerMocks.preparedCatalogManager()
+ .defaultCatalog("default", new
GenericInMemoryCatalog("default"))
+ .classLoader(CatalogManagerTest.class.getClassLoader())
+ .config(new Configuration())
+ .catalogModificationListeners(
+ Collections.singletonList(
+ new
TestingConnectionModificationListener(
+ createFuture,
+ new CompletableFuture<>(),
+ new CompletableFuture<>(),
+ new CompletableFuture<>(),
+ new CompletableFuture<>())))
+ .catalogStoreHolder(
+ CatalogStoreHolder.newBuilder()
+
.classloader(CatalogManagerTest.class.getClassLoader())
+ .catalogStore(new
GenericInMemoryCatalogStore())
+ .config(new Configuration())
+ .build())
+ .writableSecretStore(secretStore)
+ .build();
+
+ catalogManager.initSchemaResolver(
+ true, ExpressionResolverMocks.dummyResolver(), new
ParserMock());
+
+ // Omit the 'type' option entirely; discovery should fall back to
DefaultConnectionFactory.
+ HashMap<String, String> options =
+ new HashMap<String, String>() {
+ {
+ put("bootstrap.servers", "localhost:9092");
+ put("password", "secret-pw");
+ }
+ };
+
+ catalogManager.createConnection(
+ SensitiveConnection.of(options, null),
+ ObjectIdentifier.of(
+ catalogManager.getCurrentCatalog(),
+ catalogManager.getCurrentDatabase(),
+ "conn-no-type"),
+ false);
+
+ CreateConnectionEvent event = createFuture.get(10, TimeUnit.SECONDS);
+
assertThat(event.identifier().getObjectName()).isEqualTo("conn-no-type");
+ // Sensitive field stripped — proves DefaultConnectionFactory ran via
the fallback path.
+
assertThat(event.connection().getOptions()).doesNotContainKey("password");
+ assertThat(event.connection().getOptions())
+ .containsEntry("bootstrap.servers", "localhost:9092");
+ }
+
private CatalogManager createCatalogManager(@Nullable
CatalogModificationListener listener) {
CatalogManager.Builder builder =
CatalogManager.newBuilder()
@@ -457,6 +636,49 @@ class CatalogManagerTest {
}
}
+ /** Testing connection modification listener. */
+ static class TestingConnectionModificationListener implements
CatalogModificationListener {
+ private final CompletableFuture<CreateConnectionEvent> createFuture;
+ private final CompletableFuture<CreateConnectionEvent>
createTemporaryFuture;
+ private final CompletableFuture<AlterConnectionEvent> alterFuture;
+ private final CompletableFuture<DropConnectionEvent> dropFuture;
+ private final CompletableFuture<DropConnectionEvent>
dropTemporaryFuture;
+
+ TestingConnectionModificationListener(
+ CompletableFuture<CreateConnectionEvent> createFuture,
+ CompletableFuture<CreateConnectionEvent> createTemporaryFuture,
+ CompletableFuture<AlterConnectionEvent> alterFuture,
+ CompletableFuture<DropConnectionEvent> dropFuture,
+ CompletableFuture<DropConnectionEvent> dropTemporaryFuture) {
+ this.createFuture = createFuture;
+ this.createTemporaryFuture = createTemporaryFuture;
+ this.alterFuture = alterFuture;
+ this.dropFuture = dropFuture;
+ this.dropTemporaryFuture = dropTemporaryFuture;
+ }
+
+ @Override
+ public void onEvent(CatalogModificationEvent event) {
+ if (event instanceof CreateConnectionEvent) {
+ if (((CreateConnectionEvent) event).isTemporary()) {
+ createTemporaryFuture.complete((CreateConnectionEvent)
event);
+ } else {
+ createFuture.complete((CreateConnectionEvent) event);
+ }
+ } else if (event instanceof AlterConnectionEvent) {
+ alterFuture.complete((AlterConnectionEvent) event);
+ } else if (event instanceof DropConnectionEvent) {
+ if (((DropConnectionEvent) event).isTemporary()) {
+ dropTemporaryFuture.complete((DropConnectionEvent) event);
+ } else {
+ dropFuture.complete((DropConnectionEvent) event);
+ }
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+
/** Testing model modification listener. */
static class TestingModelModificationListener implements
CatalogModificationListener {
private final CompletableFuture<CreateModelEvent> createFuture;
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 52a7cd7ccb5..403d94459fb 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -255,6 +255,11 @@ class GenericInMemoryCatalogTest extends CatalogTestBase {
return true;
}
+ @Override
+ protected boolean supportsConnections() {
+ return true;
+ }
+
@Override
protected CatalogFunction createPythonFunction() {
return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON);
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index e230b616cdb..3db7d7cf2ee 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import
org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -932,4 +934,102 @@ public interface Catalog {
throws ModelNotExistException, CatalogException {
alterModel(modelPath, newModel, ignoreIfNotExists);
}
+
+ // ------ connections ------
+
+ /**
+ * Get names of all connections under this database. An empty list is
returned if none exists.
+ *
+ * @return a list of the names of all connections in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default List<String> listConnections(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Returns a {@link CatalogConnection} identified by the given {@link
ObjectPath}.
+ *
+ * @param connectionPath Path of the connection
+ * @return The requested connection
+ * @throws ConnectionNotExistException if the target does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default CatalogConnection getConnection(ObjectPath connectionPath)
+ throws ConnectionNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "getConnection(ObjectPath) is not implemented for
%s.", this.getClass()));
+ }
+
+ /**
+ * Check if a connection exists in this catalog.
+ *
+ * @param connectionPath Path of the connection
+ * @return true if the given connection exists in the catalog false
otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ default boolean connectionExists(ObjectPath connectionPath) throws
CatalogException {
+ return false;
+ }
+
+ /**
+ * Creates a new connection.
+ *
+ * @param connectionPath path of the connection to be created
+ * @param connection the CatalogConnection definition
+ * @param ignoreIfExists flag to specify behavior when a connection
already exists at the given
+ * path: if set to false, it throws a ConnectionAlreadyExistException,
if set to true, do
+ * nothing.
+ * @throws ConnectionAlreadyExistException if connection already exists
and ignoreIfExists is
+ * false
+ * @throws DatabaseNotExistException if the database in connectionPath
doesn't exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void createConnection(
+ ObjectPath connectionPath, CatalogConnection connection, boolean
ignoreIfExists)
+ throws ConnectionAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "createConnection(ObjectPath, CatalogConnection,
boolean) is not implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Modifies an existing connection.
+ *
+ * @param connectionPath path of the connection to be modified
+ * @param newConnection the new connection definition
+ * @param ignoreIfNotExists flag to specify behavior when the connection
does not exist: if set
+ * to false, throw an exception, if set to true, do nothing.
+ * @throws ConnectionNotExistException if the connection does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void alterConnection(
+ ObjectPath connectionPath, CatalogConnection newConnection,
boolean ignoreIfNotExists)
+ throws ConnectionNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "alterConnection(ObjectPath, CatalogConnection,
boolean) is not implemented for %s.",
+ this.getClass()));
+ }
+
+ /**
+ * Drop a connection.
+ *
+ * @param connectionPath Path of the connection to be dropped
+ * @param ignoreIfNotExists Flag to specify behavior when the connection
does not exist: if set
+ * to false, throw an exception, if set to true, do nothing.
+ * @throws ConnectionNotExistException if the connection does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void dropConnection(ObjectPath connectionPath, boolean
ignoreIfNotExists)
+ throws ConnectionNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "dropConnection(ObjectPath, boolean) is not
implemented for %s.",
+ this.getClass()));
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java
new file mode 100644
index 00000000000..5732f470f80
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/** Exception for trying to create a connection that already exists. */
+@PublicEvolving
+public class ConnectionAlreadyExistException extends Exception {
+
+ private static final String MSG = "Connection '%s' already exists in
catalog '%s'.";
+
+ public ConnectionAlreadyExistException(String catalogName, ObjectPath
connectionPath) {
+ this(catalogName, connectionPath, null);
+ }
+
+ public ConnectionAlreadyExistException(
+ String catalogName, ObjectPath connectionPath, Throwable cause) {
+ super(String.format(MSG, connectionPath.getFullName(), catalogName),
cause);
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java
new file mode 100644
index 00000000000..72544f55173
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/** Exception for trying to operate on a connection that doesn't exist. */
+@PublicEvolving
+public class ConnectionNotExistException extends Exception {
+
+ private static final String MSG = "Connection '%s' does not exist in
catalog '%s'.";
+ private static final String MSG_WITHOUT_CATALOG = "Connection '%s' does
not exist.";
+
+ public ConnectionNotExistException(String catalogName, ObjectPath
connectionPath) {
+ this(catalogName, connectionPath, null);
+ }
+
+ public ConnectionNotExistException(
+ String catalogName, ObjectPath connectionPath, Throwable cause) {
+ super(formatMsg(catalogName, connectionPath), cause);
+ }
+
+ private static String formatMsg(String catalogName, ObjectPath
connectionPath) {
+ if (catalogName != null) {
+ return String.format(MSG, connectionPath.getFullName(),
catalogName);
+ }
+ return String.format(MSG_WITHOUT_CATALOG,
connectionPath.getFullName());
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java
new file mode 100644
index 00000000000..25d0bf3c2b8
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+
+/**
+ * Factory for creating and resolving connections, handling the encryption and
decryption of
+ * sensitive connection fields.
+ *
+ * <p>A {@code ConnectionFactory} is responsible for:
+ *
+ * <ul>
+ * <li>Extracting sensitive fields from a {@link SensitiveConnection} and
storing them in a {@link
+ * WritableSecretStore}, returning a {@link CatalogConnection} that is
safe to persist in a
+ * catalog.
+ * <li>Resolving a {@link CatalogConnection} from a catalog by retrieving
its secrets from a
+ * {@link ReadableSecretStore} and returning a complete {@link
SensitiveConnection}.
+ * </ul>
+ *
+ * @see DefaultConnectionFactory
+ */
+@PublicEvolving
+public interface ConnectionFactory extends Factory {
+
+ /**
+ * Creates a {@link CatalogConnection} from a {@link SensitiveConnection}
by extracting
+ * sensitive fields and storing them in the provided {@link
WritableSecretStore}.
+ *
+ * <p>The returned {@link CatalogConnection} contains only non-sensitive
options plus a secret
+ * reference that can be used to retrieve the sensitive fields later via
{@link
+ * #resolveConnection(CatalogConnection, ReadableSecretStore)}.
+ *
+ * @param connection the connection with all options including sensitive
fields
+ * @param secretStore the secret store where sensitive fields will be
stored
+ * @return a catalog-safe connection with sensitive fields replaced by a
secret reference
+ * @throws SecretException if storing the secret fails (e.g.
underlying-store error)
+ */
+ CatalogConnection createConnection(
+ SensitiveConnection connection, WritableSecretStore secretStore)
throws SecretException;
+
+ /**
+ * Resolves a {@link CatalogConnection} into a {@link SensitiveConnection}
by retrieving secrets
+ * from the provided {@link ReadableSecretStore}.
+ *
+ * @param connection the catalog connection containing non-sensitive
options and a secret
+ * reference
+ * @param secretStore the secret store from which sensitive fields are
retrieved
+ * @return the complete connection with all options including sensitive
fields
+ * @throws SecretException if retrieving the secret fails (e.g.
underlying-store error)
+ */
+ SensitiveConnection resolveConnection(
+ CatalogConnection connection, ReadableSecretStore secretStore)
throws SecretException;
+
+ /**
+ * Deletes any secrets associated with the given {@link CatalogConnection}
from the provided
+ * {@link WritableSecretStore}.
+ *
+ * <p>Implementations should locate the secret reference embedded in the
connection (created by
+ * {@link #createConnection(SensitiveConnection, WritableSecretStore)})
and remove the
+ * corresponding entry from the secret store. This is intended to be
called when a connection is
+ * dropped or replaced (e.g. on alter), to avoid orphaned secrets.
+ *
+ * <p>The default implementation is a no-op for factories that do not
externalize secrets.
+ *
+ * @param connection the catalog connection whose backing secrets should
be removed
+ * @param secretStore the secret store from which secrets should be deleted
+ * @throws SecretException if removing the secret fails (e.g.
underlying-store error)
+ */
+ default void deleteSecrets(CatalogConnection connection,
WritableSecretStore secretStore)
+ throws SecretException {}
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java
new file mode 100644
index 00000000000..236e8c6a5f4
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link ConnectionFactory} that identifies
sensitive fields by a
+ * predefined whitelist of field names.
+ *
+ * <p>During {@link #createConnection}, sensitive fields are extracted from
the connection options
+ * and stored as a single secret in the {@link WritableSecretStore}. A
reference key ({@value
+ * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the
returned {@link
+ * CatalogConnection}.
+ *
+ * <p>During {@link #resolveConnection}, the secret reference is used to
retrieve the sensitive
+ * fields from the {@link ReadableSecretStore} and merge them back into the
options.
+ *
+ * <p>See {@link #SENSITIVE_FIELD_NAMES} for the default whitelist of
sensitive field names.
+ */
+@Internal
+public class DefaultConnectionFactory implements ConnectionFactory {
+
+ /**
+ * Factory identifier used to discover this factory via SPI. Also used as
the fallback when a
+ * connection does not specify a {@link FactoryUtil#CONNECTION_TYPE}
option.
+ */
+ public static final String IDENTIFIER = "default";
+
+ /**
+ * Reserved option key used to store the reference to secrets in the
secret store. The
+ * surrounding double underscores make collision with user-supplied option
names unlikely; user
+ * options containing this key will be rejected at create-time.
+ */
+ static final String SECRET_REFERENCE_KEY =
"__flink.encrypted-secret-key__";
+
+ /**
+ * Default whitelist of option keys treated as sensitive. Seeded from
{@link
+ * org.apache.flink.configuration.GlobalConfiguration#SENSITIVE_KEYS}; the
list is intentionally
+ * small to start and can be expanded over time as new sensitive options
are introduced.
+ */
+ private static final Set<String> SENSITIVE_FIELD_NAMES =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ "password",
+ "secret",
+ "fs.azure.account.key",
+ "apikey",
+ "api-key",
+ "auth-params",
+ "service-key",
+ "token",
+ "basic-auth",
+ "jaas.config",
+ "http-headers")));
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public CatalogConnection createConnection(
+ SensitiveConnection connection, WritableSecretStore secretStore) {
+ Map<String, String> allOptions = connection.getOptions();
+
+ if (allOptions.containsKey(SECRET_REFERENCE_KEY)) {
+ throw new ValidationException(
+ String.format(
+ "Connection option '%s' is reserved and cannot be
set by users.",
+ SECRET_REFERENCE_KEY));
+ }
+
+ Map<String, String> sensitiveOptions =
+ allOptions.entrySet().stream()
+ .filter(e ->
SENSITIVE_FIELD_NAMES.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ Map<String, String> nonSensitiveOptions =
+ allOptions.entrySet().stream()
+ .filter(e ->
!SENSITIVE_FIELD_NAMES.contains(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue,
+ (a, b) -> a,
+ HashMap::new));
+
+ if (!sensitiveOptions.isEmpty()) {
+ final String secretId;
+ try {
+ secretId = secretStore.storeSecret(sensitiveOptions);
+ } catch (SecretException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new SecretException("Failed to store connection
secret.", e);
+ }
+ nonSensitiveOptions.put(SECRET_REFERENCE_KEY, secretId);
+ }
+
+ return CatalogConnection.of(nonSensitiveOptions,
connection.getComment());
+ }
+
+ @Override
+ public SensitiveConnection resolveConnection(
+ CatalogConnection connection, ReadableSecretStore secretStore) {
+ Map<String, String> options = new HashMap<>(connection.getOptions());
+
+ String secretId = options.remove(SECRET_REFERENCE_KEY);
+ if (secretId != null) {
+ try {
+ Map<String, String> secrets = secretStore.getSecret(secretId);
+ options.putAll(secrets);
+ } catch (SecretNotFoundException e) {
+ throw new ValidationException(
+ String.format(
+ "Failed to resolve connection secrets. Secret
with ID '%s' not found.",
+ secretId),
+ e);
+ } catch (SecretException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new SecretException(
+ String.format(
+ "Failed to retrieve connection secret with ID
'%s'.", secretId),
+ e);
+ }
+ }
+
+ return SensitiveConnection.of(options, connection.getComment());
+ }
+
+ @Override
+ public void deleteSecrets(CatalogConnection connection,
WritableSecretStore secretStore) {
+ String secretId = connection.getOptions().get(SECRET_REFERENCE_KEY);
+ if (secretId != null) {
+ try {
+ secretStore.removeSecret(secretId);
+ } catch (SecretException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new SecretException(
+ String.format("Failed to remove connection secret with
ID '%s'.", secretId),
+ e);
+ }
+ }
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index a61da0902ae..c4ecf751c6d 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -107,6 +107,14 @@ public final class FactoryUtil {
"Uniquely identifies the provider of a model that
is used for model inference."
+ " Its value is used during model
provider discovery.");
+ public static final ConfigOption<String> CONNECTION_TYPE =
+ ConfigOptions.key("type")
+ .stringType()
+ .defaultValue(DefaultConnectionFactory.IDENTIFIER)
+ .withDescription(
+ "Identifies the type of a connection. Its value is
used during"
+ + " ConnectionFactory discovery.");
+
public static final ConfigOption<String> FORMAT =
ConfigOptions.key("format")
.stringType()
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
index 70dd48e21f9..1cf7120abed 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.secret;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.secret.exceptions.SecretException;
import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
import java.util.Map;
@@ -40,6 +41,8 @@ public interface ReadableSecretStore extends SecretStore {
* @param secretId the unique identifier of the secret to retrieve
* @return a map containing the secret data as key-value pairs
* @throws SecretNotFoundException if the secret with the given identifier
does not exist
+ * @throws SecretException if the operation fails due to underlying-store
errors (network,
+ * permission, etc.)
*/
- Map<String, String> getSecret(String secretId) throws
SecretNotFoundException;
+ Map<String, String> getSecret(String secretId) throws
SecretNotFoundException, SecretException;
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
index db5037b7b2f..11167d861a6 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.secret;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.secret.exceptions.SecretException;
import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
import java.util.Map;
@@ -40,15 +41,19 @@ public interface WritableSecretStore extends SecretStore {
*
* @param secretData a map containing the secret data as key-value pairs
to be stored
* @return a unique identifier for the stored secret
+ * @throws SecretException if the operation fails due to underlying-store
errors (network,
+ * permission, quota, etc.)
*/
- String storeSecret(Map<String, String> secretData);
+ String storeSecret(Map<String, String> secretData) throws SecretException;
/**
* Removes a secret from the secret store.
*
* @param secretId the unique identifier of the secret to remove
+ * @throws SecretException if the operation fails due to underlying-store
errors (network,
+ * permission, etc.)
*/
- void removeSecret(String secretId);
+ void removeSecret(String secretId) throws SecretException;
/**
* Atomically updates an existing secret with new data.
@@ -58,7 +63,9 @@ public interface WritableSecretStore extends SecretStore {
* @param secretId the unique identifier of the secret to update
* @param newSecretData a map containing the new secret data as key-value
pairs
* @throws SecretNotFoundException if the secret with the given identifier
does not exist
+ * @throws SecretException if the operation fails due to underlying-store
errors (network,
+ * permission, etc.)
*/
void updateSecret(String secretId, Map<String, String> newSecretData)
- throws SecretNotFoundException;
+ throws SecretNotFoundException, SecretException;
}
diff --git
a/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 456c161e7dd..ec8fc758b35 100644
---
a/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.table.module.CoreModuleFactory
+org.apache.flink.table.factories.DefaultConnectionFactory
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 228b5485f01..f93e9f0575b 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import
org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -68,12 +70,16 @@ public abstract class CatalogTest {
protected final String t3 = "t3";
protected final String m1 = "m1";
protected final String m2 = "m2";
+ protected final String c1 = "c1";
+ protected final String c2 = "c2";
protected final ObjectPath path1 = new ObjectPath(db1, t1);
protected final ObjectPath path2 = new ObjectPath(db2, t2);
protected final ObjectPath path3 = new ObjectPath(db1, t2);
protected final ObjectPath path4 = new ObjectPath(db1, t3);
protected final ObjectPath modelPath1 = new ObjectPath(db1, m1);
protected final ObjectPath modelPath2 = new ObjectPath(db1, m2);
+ protected final ObjectPath connectionPath1 = new ObjectPath(db1, c1);
+ protected final ObjectPath connectionPath2 = new ObjectPath(db1, c2);
protected final ObjectPath nonExistDbPath =
ObjectPath.fromString("non.exist");
protected final ObjectPath nonExistObjectPath =
ObjectPath.fromString("db1.nonexist");
@@ -108,6 +114,14 @@ public abstract class CatalogTest {
catalog.dropModel(modelPath2, true);
}
}
+ if (supportsConnections()) {
+ if (catalog.connectionExists(connectionPath1)) {
+ catalog.dropConnection(connectionPath1, true);
+ }
+ if (catalog.connectionExists(connectionPath2)) {
+ catalog.dropConnection(connectionPath2, true);
+ }
+ }
// Delete db last so that other resources can be found and dropped
if (catalog.databaseExists(db1)) {
@@ -463,6 +477,201 @@ public abstract class CatalogTest {
catalog.dropModel(modelPath1, true);
}
+ // ------ connections ------
+ @Test
+ public void testCreateConnection() throws Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogConnection connection = createConnection();
+ catalog.createConnection(connectionPath1, connection, false);
+
+ List<String> connections = catalog.listConnections(db1);
+ assertThat(connections).isEqualTo(Collections.singletonList(c1));
+ }
+
+ @Test
+ public void testCreateConnection_DatabaseNotExistException() {
+ if (!supportsConnections()) {
+ return;
+ }
+ assertThat(catalog.databaseExists(db1)).isFalse();
+
+ assertThatThrownBy(
+ () ->
+ catalog.createConnection(
+ nonExistObjectPath,
createConnection(), false))
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessage("Database db1 does not exist in Catalog " +
TEST_CATALOG_NAME + ".");
+ }
+
+ @Test
+ public void testCreateConnection_ConnectionAlreadyExistException() throws
Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createConnection(connectionPath1, createConnection(), false);
+
+ assertThatThrownBy(
+ () -> catalog.createConnection(connectionPath1,
createConnection(), false))
+ .isInstanceOf(ConnectionAlreadyExistException.class)
+ .hasMessage(
+ "Connection 'db1.c1' already exists in catalog '"
+ + TEST_CATALOG_NAME
+ + "'.");
+ }
+
+ @Test
+ public void testCreateConnection_ConnectionAlreadyExist_ignored() throws
Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogConnection connection = createConnection();
+ catalog.createConnection(connectionPath1, connection, false);
+ catalog.createConnection(connectionPath1, connection, true);
+
+ List<String> connections = catalog.listConnections(db1);
+ assertThat(connections).isEqualTo(Collections.singletonList(c1));
+ }
+
+ @Test
+ public void testListConnections() throws Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+
+ catalog.createConnection(connectionPath1, createConnection(), false);
+ catalog.createConnection(connectionPath2, createConnection(), false);
+
+ assertThat(catalog.listConnections(db1)).isEqualTo(Arrays.asList(c1,
c2));
+ }
+
+ @Test
+ public void testGetConnection() throws Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createConnection(connectionPath1, createConnection(), false);
+ assertThat(catalog.getConnection(connectionPath1)).isNotNull();
+ }
+
+ @Test
+ public void testGetConnection_ConnectionNotExistException() throws
Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ assertThatThrownBy(() -> catalog.getConnection(connectionPath1))
+ .isInstanceOf(ConnectionNotExistException.class)
+ .hasMessage("Connection 'db1.c1' does not exist in catalog
'test-catalog'.");
+ }
+
+ @Test
+ public void testDropConnection() throws Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createConnection(connectionPath1, createConnection(), false);
+ assertThat(catalog.getConnection(connectionPath1)).isNotNull();
+ catalog.dropConnection(connectionPath1, false);
+ assertThatThrownBy(() -> catalog.getConnection(connectionPath1))
+ .isInstanceOf(ConnectionNotExistException.class)
+ .hasMessage("Connection 'db1.c1' does not exist in catalog
'test-catalog'.");
+ }
+
+ @Test
+ public void testAlterConnection() throws Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createConnection(connectionPath1, createConnection(), false);
+ assertThat(catalog.getConnection(connectionPath1)).isNotNull();
+ CatalogConnection newConnection =
+ CatalogConnection.of(
+ new HashMap<String, String>() {
+ {
+ put("type", "kafka");
+ put("bootstrap.servers", "remote:9092");
+ put("group.id", "my-group");
+ }
+ },
+ "updated connection");
+ catalog.alterConnection(connectionPath1, newConnection, false);
+ assertThat(catalog.getConnection(connectionPath1).getComment())
+ .isEqualTo("updated connection");
+ Map<String, String> expectedOptions = new HashMap<>();
+ expectedOptions.put("type", "kafka");
+ expectedOptions.put("bootstrap.servers", "remote:9092");
+ expectedOptions.put("group.id", "my-group");
+
assertThat(catalog.getConnection(connectionPath1).getOptions()).isEqualTo(expectedOptions);
+ }
+
+ @Test
+ public void testAlterConnection_ConnectionNotExistException() throws
Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogConnection newConnection =
+ CatalogConnection.of(
+ new HashMap<String, String>() {
+ {
+ put("type", "kafka");
+ put("bootstrap.servers", "remote:9092");
+ }
+ },
+ "new connection");
+ assertThatThrownBy(() -> catalog.alterConnection(connectionPath1,
newConnection, false))
+ .isInstanceOf(ConnectionNotExistException.class)
+ .hasMessage("Connection 'db1.c1' does not exist in catalog
'test-catalog'.");
+ }
+
+ @Test
+ public void testAlterMissingConnectionIgnoreIfNotExist() throws Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogConnection newConnection =
+ CatalogConnection.of(
+ new HashMap<String, String>() {
+ {
+ put("type", "kafka");
+ put("bootstrap.servers", "remote:9092");
+ }
+ },
+ "new connection");
+ catalog.alterConnection(connectionPath1, newConnection, true);
+ }
+
+ @Test
+ public void testDropMissingConnectionNotExistException() throws Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ assertThatThrownBy(() -> catalog.dropConnection(connectionPath1,
false))
+ .isInstanceOf(ConnectionNotExistException.class)
+ .hasMessage("Connection 'db1.c1' does not exist in catalog
'test-catalog'.");
+ }
+
+ @Test
+ public void testDropMissingConnectionIgnoreIfNotExist() throws Exception {
+ if (!supportsConnections()) {
+ return;
+ }
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.dropConnection(connectionPath1, true);
+ }
+
// ------ tables ------
@Test
@@ -1614,6 +1823,19 @@ public abstract class CatalogTest {
protected abstract boolean supportsModels();
+ protected abstract boolean supportsConnections();
+
+ protected CatalogConnection createConnection() {
+ return CatalogConnection.of(
+ new HashMap<String, String>() {
+ {
+ put("type", "kafka");
+ put("bootstrap.servers", "localhost:9092");
+ }
+ },
+ null);
+ }
+
protected ResolvedSchema createSchema() {
return new ResolvedSchema(
Arrays.asList(
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/DefaultConnectionFactoryTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/DefaultConnectionFactoryTest.java
new file mode 100644
index 00000000000..50ebac09c69
--- /dev/null
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/DefaultConnectionFactoryTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DefaultConnectionFactory}. */
+class DefaultConnectionFactoryTest {
+
+ private final DefaultConnectionFactory factory = new
DefaultConnectionFactory();
+
+ //
---------------------------------------------------------------------------------------------
+ // createConnection
+ //
---------------------------------------------------------------------------------------------
+
+ @Test
+ void createConnectionWithoutSensitiveOptionsDoesNotCallSecretStore() {
+ Map<String, String> options = new LinkedHashMap<>();
+ options.put("host", "localhost");
+ options.put("port", "9092");
+
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ CatalogConnection result =
+ factory.createConnection(
+ SensitiveConnection.of(options, "my-comment"),
secretStore);
+
+ assertThat(secretStore.stored).isEmpty();
+ assertThat(result.getOptions())
+ .containsExactlyInAnyOrderEntriesOf(options)
+
.doesNotContainKey(DefaultConnectionFactory.SECRET_REFERENCE_KEY);
+ assertThat(result.getComment()).isEqualTo("my-comment");
+ }
+
+ @Test
+ void createConnectionWithSensitiveOptionsStoresOnlySensitiveOptions() {
+ Map<String, String> options = new LinkedHashMap<>();
+ options.put("host", "localhost");
+ options.put("password", "p@ss");
+ options.put("api-key", "abc");
+ options.put("token", "tok");
+
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ CatalogConnection result =
+ factory.createConnection(SensitiveConnection.of(options,
null), secretStore);
+
+ // secretStore.storeSecret called exactly once with only the sensitive
subset
+ assertThat(secretStore.stored).hasSize(1);
+ Map<String, String> storedSecret =
secretStore.stored.values().iterator().next();
+ assertThat(storedSecret)
+ .containsOnlyKeys("password", "api-key", "token")
+ .containsEntry("password", "p@ss")
+ .containsEntry("api-key", "abc")
+ .containsEntry("token", "tok");
+
+ // Returned connection retains only non-sensitive options plus a
secret reference
+ String secretId = secretStore.stored.keySet().iterator().next();
+ assertThat(result.getOptions())
+ .containsOnlyKeys("host",
DefaultConnectionFactory.SECRET_REFERENCE_KEY)
+ .containsEntry("host", "localhost")
+ .containsEntry(DefaultConnectionFactory.SECRET_REFERENCE_KEY,
secretId);
+ }
+
+ @Test
+ void createConnectionRecognizesAllSensitiveFieldNames() {
+ // Covers the default sensitive whitelist; failure here flags an
accidental change to
+ // the whitelist.
+ List<String> sensitiveKeys =
+ Arrays.asList(
+ "password",
+ "secret",
+ "fs.azure.account.key",
+ "apikey",
+ "api-key",
+ "auth-params",
+ "service-key",
+ "token",
+ "basic-auth",
+ "jaas.config",
+ "http-headers");
+
+ Map<String, String> options = new LinkedHashMap<>();
+ for (String key : sensitiveKeys) {
+ options.put(key, "value-of-" + key);
+ }
+
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ CatalogConnection result =
+ factory.createConnection(SensitiveConnection.of(options,
null), secretStore);
+
+ assertThat(secretStore.stored).hasSize(1);
+ Map<String, String> storedSecret =
secretStore.stored.values().iterator().next();
+ assertThat(storedSecret).containsExactlyInAnyOrderEntriesOf(options);
+ assertThat(result.getOptions())
+
.containsOnlyKeys(DefaultConnectionFactory.SECRET_REFERENCE_KEY);
+ }
+
+ @Test
+ void createConnectionRejectsUserSuppliedReservedKey() {
+ Map<String, String> options =
+ Collections.singletonMap(
+ DefaultConnectionFactory.SECRET_REFERENCE_KEY,
"user-injected");
+
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ assertThatThrownBy(
+ () ->
+ factory.createConnection(
+ SensitiveConnection.of(options, null),
secretStore))
+ .isInstanceOf(ValidationException.class)
+
.hasMessageContaining(DefaultConnectionFactory.SECRET_REFERENCE_KEY)
+ .hasMessageContaining("reserved");
+ assertThat(secretStore.stored).isEmpty();
+ }
+
+ @Test
+ void createConnectionPropagatesSecretException() {
+ Map<String, String> options = Collections.singletonMap("password",
"p@ss");
+ SecretException original = new SecretException("downstream failure");
+
+ RecordingSecretStore secretStore =
+ new RecordingSecretStore() {
+ @Override
+ public String storeSecret(Map<String, String> secretData) {
+ throw original;
+ }
+ };
+
+ assertThatThrownBy(
+ () ->
+ factory.createConnection(
+ SensitiveConnection.of(options, null),
secretStore))
+ .isSameAs(original);
+ }
+
+ @Test
+ void createConnectionWrapsRuntimeExceptionFromStore() {
+ Map<String, String> options = Collections.singletonMap("password",
"p@ss");
+
+ RecordingSecretStore secretStore =
+ new RecordingSecretStore() {
+ @Override
+ public String storeSecret(Map<String, String> secretData) {
+ throw new IllegalStateException("kaboom");
+ }
+ };
+
+ assertThatThrownBy(
+ () ->
+ factory.createConnection(
+ SensitiveConnection.of(options, null),
secretStore))
+ .isInstanceOf(SecretException.class)
+ .hasMessageContaining("Failed to store connection secret")
+ .hasCauseInstanceOf(IllegalStateException.class);
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // resolveConnection
+ //
---------------------------------------------------------------------------------------------
+
+ @Test
+ void resolveConnectionWithoutSecretReferenceReturnsOptionsUnchanged() {
+ Map<String, String> options = new LinkedHashMap<>();
+ options.put("host", "localhost");
+ options.put("port", "9092");
+
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ SensitiveConnection resolved =
+ factory.resolveConnection(
+ CatalogConnection.of(options, "the-comment"),
secretStore);
+
+ assertThat(secretStore.retrieved).isEmpty();
+
assertThat(resolved.getOptions()).containsExactlyInAnyOrderEntriesOf(options);
+ assertThat(resolved.getComment()).isEqualTo("the-comment");
+ }
+
+ @Test
+ void resolveConnectionMergesSecretsBackIntoOptions() {
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ Map<String, String> secrets = new HashMap<>();
+ secrets.put("password", "p@ss");
+ secrets.put("token", "tok");
+ String secretId = secretStore.storeSecret(secrets);
+
+ Map<String, String> catalogOptions = new LinkedHashMap<>();
+ catalogOptions.put("host", "localhost");
+ catalogOptions.put(DefaultConnectionFactory.SECRET_REFERENCE_KEY,
secretId);
+
+ SensitiveConnection resolved =
+ factory.resolveConnection(CatalogConnection.of(catalogOptions,
null), secretStore);
+
+ assertThat(secretStore.retrieved).containsExactly(secretId);
+ assertThat(resolved.getOptions())
+ .containsOnlyKeys("host", "password", "token")
+ .containsEntry("host", "localhost")
+ .containsEntry("password", "p@ss")
+ .containsEntry("token", "tok")
+
.doesNotContainKey(DefaultConnectionFactory.SECRET_REFERENCE_KEY);
+ }
+
+ @Test
+ void resolveConnectionTranslatesSecretNotFoundToValidationException() {
+ // Empty stored map → fake's getSecret will throw
SecretNotFoundException by default.
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+
+ Map<String, String> catalogOptions =
+
Collections.singletonMap(DefaultConnectionFactory.SECRET_REFERENCE_KEY,
"missing");
+
+ assertThatThrownBy(
+ () ->
+ factory.resolveConnection(
+ CatalogConnection.of(catalogOptions,
null), secretStore))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("'missing'")
+ .hasCauseInstanceOf(SecretNotFoundException.class);
+ }
+
+ @Test
+ void resolveConnectionWrapsRuntimeExceptionFromStore() {
+ RecordingSecretStore secretStore =
+ new RecordingSecretStore() {
+ @Override
+ public Map<String, String> getSecret(String secretId) {
+ throw new IllegalStateException("transport down");
+ }
+ };
+
+ Map<String, String> catalogOptions =
+
Collections.singletonMap(DefaultConnectionFactory.SECRET_REFERENCE_KEY, "abc");
+
+ assertThatThrownBy(
+ () ->
+ factory.resolveConnection(
+ CatalogConnection.of(catalogOptions,
null), secretStore))
+ .isInstanceOf(SecretException.class)
+ .hasMessageContaining("'abc'")
+ .hasCauseInstanceOf(IllegalStateException.class);
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // deleteSecrets
+ //
---------------------------------------------------------------------------------------------
+
+ @Test
+ void deleteSecretsWithoutReferenceIsNoOp() {
+ Map<String, String> catalogOptions = Collections.singletonMap("host",
"localhost");
+
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ factory.deleteSecrets(CatalogConnection.of(catalogOptions, null),
secretStore);
+
+ assertThat(secretStore.removed).isEmpty();
+ }
+
+ @Test
+ void deleteSecretsRemovesByReference() {
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ String secretId =
secretStore.storeSecret(Collections.singletonMap("password", "p@ss"));
+
+ Map<String, String> catalogOptions = new LinkedHashMap<>();
+ catalogOptions.put("host", "localhost");
+ catalogOptions.put(DefaultConnectionFactory.SECRET_REFERENCE_KEY,
secretId);
+
+ factory.deleteSecrets(CatalogConnection.of(catalogOptions, null),
secretStore);
+
+ assertThat(secretStore.removed).containsExactly(secretId);
+ assertThat(secretStore.stored).doesNotContainKey(secretId);
+ }
+
+ @Test
+ void deleteSecretsWrapsRuntimeExceptionFromStore() {
+ RecordingSecretStore secretStore =
+ new RecordingSecretStore() {
+ @Override
+ public void removeSecret(String secretId) {
+ throw new IllegalStateException("transport down");
+ }
+ };
+
+ Map<String, String> catalogOptions =
+
Collections.singletonMap(DefaultConnectionFactory.SECRET_REFERENCE_KEY, "abc");
+
+ assertThatThrownBy(
+ () ->
+ factory.deleteSecrets(
+ CatalogConnection.of(catalogOptions,
null), secretStore))
+ .isInstanceOf(SecretException.class)
+ .hasMessageContaining("'abc'")
+ .hasCauseInstanceOf(IllegalStateException.class);
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // round-trip
+ //
---------------------------------------------------------------------------------------------
+
+ @Test
+ void createAndResolveRoundTripPreservesOptionsAndComment() {
+ Map<String, String> options = new LinkedHashMap<>();
+ options.put("host", "localhost");
+ options.put("port", "9092");
+ options.put("password", "p@ss");
+ options.put("token", "tok");
+
+ RecordingSecretStore secretStore = new RecordingSecretStore();
+ CatalogConnection persisted =
+ factory.createConnection(
+ SensitiveConnection.of(options, "round-trip"),
secretStore);
+ SensitiveConnection resolved = factory.resolveConnection(persisted,
secretStore);
+
+
assertThat(resolved.getOptions()).containsExactlyInAnyOrderEntriesOf(options);
+ assertThat(resolved.getComment()).isEqualTo("round-trip");
+ }
+
+ //
---------------------------------------------------------------------------------------------
+ // Helpers
+ //
---------------------------------------------------------------------------------------------
+
+ /**
+ * Fake secret store that records all interactions; tests use the maps
directly to verify how
+ * {@link DefaultConnectionFactory} invokes the store.
+ */
+ private static class RecordingSecretStore implements WritableSecretStore,
ReadableSecretStore {
+
+ final Map<String, Map<String, String>> stored = new LinkedHashMap<>();
+ final List<String> retrieved = new java.util.ArrayList<>();
+ final List<String> removed = new java.util.ArrayList<>();
+
+ @Override
+ public String storeSecret(Map<String, String> secretData) {
+ String id = UUID.randomUUID().toString();
+ stored.put(id, new LinkedHashMap<>(secretData));
+ return id;
+ }
+
+ @Override
+ public void removeSecret(String secretId) {
+ removed.add(secretId);
+ stored.remove(secretId);
+ }
+
+ @Override
+ public void updateSecret(String secretId, Map<String, String>
newSecretData)
+ throws SecretNotFoundException {
+ if (!stored.containsKey(secretId)) {
+ throw new SecretNotFoundException("Secret '" + secretId + "'
not found.");
+ }
+ stored.put(secretId, new LinkedHashMap<>(newSecretData));
+ }
+
+ @Override
+ public Map<String, String> getSecret(String secretId) throws
SecretNotFoundException {
+ retrieved.add(secretId);
+ Map<String, String> result = stored.get(secretId);
+ if (result == null) {
+ throw new SecretNotFoundException("Secret '" + secretId + "'
not found.");
+ }
+ return result;
+ }
+ }
+}