lihaosky commented on code in PR #28085:
URL: https://github.com/apache/flink/pull/28085#discussion_r3236339101
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1791,6 +1836,328 @@ public ResolvedCatalogModel
resolveCatalogModel(CatalogModel model) {
return ResolvedCatalogModel.of(model, resolvedInputSchema,
resolvedOutputSchema);
}
+ // ------ connections ------
+
+ /**
+ * Get a connection from the catalog with the given object identifier.
+ *
+ * @param objectIdentifier The fully qualified path of the connection.
+ * @return The requested connection wrapped in Optional.
+ */
+ public Optional<CatalogConnection> getConnection(ObjectIdentifier
objectIdentifier) {
+ CatalogConnection temporaryConnection =
temporaryConnections.get(objectIdentifier);
+ if (temporaryConnection != null) {
+ return Optional.of(temporaryConnection);
+ }
+
+ Optional<Catalog> catalog =
getCatalog(objectIdentifier.getCatalogName());
+ if (catalog.isPresent()) {
+ try {
+ return
Optional.of(catalog.get().getConnection(objectIdentifier.toObjectPath()));
+ } catch (ConnectionNotExistException |
UnsupportedOperationException e) {
+ // ConnectionNotExistException: connection does not exist in
this catalog.
+ // UnsupportedOperationException: catalog does not support
connections.
+ return Optional.empty();
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * List all connections in the given catalog and database.
+ *
+ * @param catalogName The name of the catalog.
+ * @param databaseName The name of the database.
+ * @return A set of connection names.
+ */
+ public Set<String> listConnections(String catalogName, String
databaseName) {
+ Catalog catalog = getCatalogOrError(catalogName);
+ try {
+ Set<String> connections = new
HashSet<>(catalog.listConnections(databaseName));
+
+ // Add temporary connections for this catalog and database
+ temporaryConnections.keySet().stream()
+ .filter(
+ identifier ->
+
identifier.getCatalogName().equals(catalogName)
+ &&
identifier.getDatabaseName().equals(databaseName))
+ .map(ObjectIdentifier::getObjectName)
+ .forEach(connections::add);
+
+ return connections;
+ } catch (DatabaseNotExistException e) {
+ throw new ValidationException(
+ String.format(
+ "Database %s does not exist in catalog %s.",
databaseName, catalogName),
+ e);
+ } catch (CatalogException e) {
+ throw new TableException(
+ String.format(
+ "Failed to list connections in catalog %s and
database %s.",
+ catalogName, databaseName),
+ e);
+ }
+ }
+
+ /**
+ * Create a permanent connection in the given fully qualified path.
+ *
+ * <p>If a {@link ConnectionFactory} and {@link WritableSecretStore} are
configured, sensitive
+ * fields are extracted from the connection and stored in the secret store
before persisting the
+ * non-sensitive {@link CatalogConnection} to the catalog.
+ *
+ * @param connection The connection with all options including sensitive
fields.
+ * @param objectIdentifier The fully qualified path where to create the
connection.
+ * @param ignoreIfExists If false exception will be thrown if the
connection already exists.
+ */
+ public void createConnection(
+ SensitiveConnection connection,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfExists) {
+ if (connectionFactory == null || writableSecretStore == null) {
+ throw new ValidationException(
+ "ConnectionFactory and WritableSecretStore must be
configured to create connections.");
+ }
+ if (getConnection(objectIdentifier).isPresent()) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new ValidationException(
+ String.format(
+ "Connection with identifier '%s' already exists.",
+ objectIdentifier.asSummaryString()));
+ }
+ final CatalogConnection catalogConnection =
+ connectionFactory.createConnection(connection,
writableSecretStore);
+ boolean persisted = false;
+ try {
+ execute(
+ (catalog, path) -> {
+ catalog.createConnection(path, catalogConnection,
ignoreIfExists);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+
CreateConnectionEvent.createEvent(
+
CatalogContext.createContext(
+
objectIdentifier.getCatalogName(),
+ catalog),
+ objectIdentifier,
+ catalogConnection,
+ ignoreIfExists,
+ false)));
+ },
+ objectIdentifier,
+ ignoreIfExists,
+ "CreateConnection");
+ persisted = true;
+ } finally {
+ if (!persisted) {
+ tryDeleteSecrets(
+ catalogConnection,
+ writableSecretStore,
+ "rollback createConnection " + objectIdentifier);
+ }
+ }
+ }
+
+ /**
+ * Create a temporary connection in the given fully qualified path.
+ *
+ * @param connection The connection with all options including sensitive
fields.
+ * @param objectIdentifier The fully qualified path where to create the
connection.
+ * @param ignoreIfExists If false exception will be thrown if the
connection already exists.
+ */
+ public void createTemporaryConnection(
+ SensitiveConnection connection,
+ ObjectIdentifier objectIdentifier,
+ boolean ignoreIfExists) {
+ if (connectionFactory == null) {
Review Comment:
You are right. ConnectionFactory should be discovered by SPI. Can I fix it
in a separate PR? This PR is already large. Created
https://issues.apache.org/jira/browse/FLINK-39678 to track
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link ConnectionFactory} that identifies
sensitive fields by a
+ * predefined whitelist of field names.
+ *
+ * <p>During {@link #createConnection}, sensitive fields are extracted from
the connection options
+ * and stored as a single secret in the {@link WritableSecretStore}. A
reference key ({@value
+ * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the
returned {@link
+ * CatalogConnection}.
+ *
+ * <p>During {@link #resolveConnection}, the secret reference is used to
retrieve the sensitive
+ * fields from the {@link ReadableSecretStore} and merge them back into the
options.
+ *
+ * <p>The following field names are treated as sensitive by default: {@code
password}, {@code
+ * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key},
{@code auth-params},
+ * {@code service-key}, {@code token}, {@code basic-auth}, {@code
jaas.config}, {@code
+ * http-headers}.
+ */
+@Internal
+public class DefaultConnectionFactory implements ConnectionFactory {
+
+ /**
+ * Reserved option key used to store the reference to secrets in the
secret store. The
+ * surrounding double underscores make collision with user-supplied option
names unlikely; user
+ * options containing this key will be rejected at create-time.
+ */
+ public static final String SECRET_REFERENCE_KEY =
"__flink.encrypted_secret_key__";
+
+ private static final Set<String> SENSITIVE_FIELD_NAMES =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ "password",
+ "secret",
+ "fs.azure.account.key",
+ "apikey",
+ "api-key",
+ "auth-params",
+ "service-key",
+ "token",
+ "basic-auth",
+ "jaas.config",
+ "http-headers")));
+
+ @Override
+ public String factoryIdentifier() {
+ return "default";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public CatalogConnection createConnection(
+ SensitiveConnection connection, WritableSecretStore secretStore) {
+ Map<String, String> allOptions = connection.getOptions();
+
+ if (allOptions.containsKey(SECRET_REFERENCE_KEY)) {
+ throw new ValidationException(
+ String.format(
+ "Connection option '%s' is reserved and cannot be
set by users.",
+ SECRET_REFERENCE_KEY));
+ }
+
+ Map<String, String> sensitiveOptions =
+ allOptions.entrySet().stream()
+ .filter(e ->
SENSITIVE_FIELD_NAMES.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ Map<String, String> nonSensitiveOptions =
+ allOptions.entrySet().stream()
+ .filter(e ->
!SENSITIVE_FIELD_NAMES.contains(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue,
+ (a, b) -> a,
+ HashMap::new));
+
+ if (!sensitiveOptions.isEmpty()) {
+ final String secretId;
+ try {
+ secretId = secretStore.storeSecret(sensitiveOptions);
+ } catch (SecretException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new SecretException("Failed to store connection
secret.", e);
+ }
+ nonSensitiveOptions.put(SECRET_REFERENCE_KEY, secretId);
Review Comment:
Let me add tests. `SECRET_REFERENCE_KEY` is not used out side of this class.
Can convert it to private
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link ConnectionFactory} that identifies
sensitive fields by a
+ * predefined whitelist of field names.
+ *
+ * <p>During {@link #createConnection}, sensitive fields are extracted from
the connection options
+ * and stored as a single secret in the {@link WritableSecretStore}. A
reference key ({@value
+ * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the
returned {@link
+ * CatalogConnection}.
+ *
+ * <p>During {@link #resolveConnection}, the secret reference is used to
retrieve the sensitive
+ * fields from the {@link ReadableSecretStore} and merge them back into the
options.
+ *
+ * <p>The following field names are treated as sensitive by default: {@code
password}, {@code
+ * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key},
{@code auth-params},
+ * {@code service-key}, {@code token}, {@code basic-auth}, {@code
jaas.config}, {@code
+ * http-headers}.
+ */
+@Internal
+public class DefaultConnectionFactory implements ConnectionFactory {
+
+ /**
+ * Reserved option key used to store the reference to secrets in the
secret store. The
+ * surrounding double underscores make collision with user-supplied option
names unlikely; user
+ * options containing this key will be rejected at create-time.
+ */
+ public static final String SECRET_REFERENCE_KEY =
"__flink.encrypted_secret_key__";
+
+ private static final Set<String> SENSITIVE_FIELD_NAMES =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ "password",
Review Comment:
It's from the list here:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L49.
I can add a comment. wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]