twalthr commented on code in PR #28085: URL: https://github.com/apache/flink/pull/28085#discussion_r3227874405
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.SensitiveConnection; +import org.apache.flink.table.secret.ReadableSecretStore; +import org.apache.flink.table.secret.WritableSecretStore; +import org.apache.flink.table.secret.exceptions.SecretException; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Default implementation of {@link ConnectionFactory} that identifies sensitive fields by a + * predefined whitelist of field names. + * + * <p>During {@link #createConnection}, sensitive fields are extracted from the connection options + * and stored as a single secret in the {@link WritableSecretStore}. A reference key ({@value + * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the returned {@link + * CatalogConnection}. + * + * <p>During {@link #resolveConnection}, the secret reference is used to retrieve the sensitive + * fields from the {@link ReadableSecretStore} and merge them back into the options. + * + * <p>The following field names are treated as sensitive by default: {@code password}, {@code + * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key}, {@code auth-params}, + * {@code service-key}, {@code token}, {@code basic-auth}, {@code jaas.config}, {@code + * http-headers}. + */ +@Internal +public class DefaultConnectionFactory implements ConnectionFactory { + + /** + * Reserved option key used to store the reference to secrets in the secret store. The + * surrounding double underscores make collision with user-supplied option names unlikely; user + * options containing this key will be rejected at create-time. + */ + public static final String SECRET_REFERENCE_KEY = "__flink.encrypted_secret_key__"; + + private static final Set<String> SENSITIVE_FIELD_NAMES = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + "password", Review Comment: Could you explain how this list was created? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.ObjectPath; + +/** Exception for trying to create a connection that already exists. */ +@PublicEvolving +public class ConnectionAlreadyExistException extends Exception { + + private static final String MSG = "Connection %s already exists in Catalog %s."; Review Comment: can we wrap all template parameters into single quotes, this helps downstream projects to clean sensitive content ```suggestion private static final String MSG = "Connection '%s' already exists in catalog '%s'."; ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.SensitiveConnection; +import org.apache.flink.table.secret.ReadableSecretStore; +import org.apache.flink.table.secret.WritableSecretStore; +import org.apache.flink.table.secret.exceptions.SecretException; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Default implementation of {@link ConnectionFactory} that identifies sensitive fields by a + * predefined whitelist of field names. + * + * <p>During {@link #createConnection}, sensitive fields are extracted from the connection options + * and stored as a single secret in the {@link WritableSecretStore}. A reference key ({@value + * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the returned {@link + * CatalogConnection}. + * + * <p>During {@link #resolveConnection}, the secret reference is used to retrieve the sensitive + * fields from the {@link ReadableSecretStore} and merge them back into the options. + * + * <p>The following field names are treated as sensitive by default: {@code password}, {@code + * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key}, {@code auth-params}, + * {@code service-key}, {@code token}, {@code basic-auth}, {@code jaas.config}, {@code + * http-headers}. + */ +@Internal +public class DefaultConnectionFactory implements ConnectionFactory { + + /** + * Reserved option key used to store the reference to secrets in the secret store. The + * surrounding double underscores make collision with user-supplied option names unlikely; user + * options containing this key will be rejected at create-time. + */ + public static final String SECRET_REFERENCE_KEY = "__flink.encrypted_secret_key__"; + + private static final Set<String> SENSITIVE_FIELD_NAMES = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + "password", + "secret", + "fs.azure.account.key", + "apikey", + "api-key", + "auth-params", + "service-key", + "token", + "basic-auth", + "jaas.config", + "http-headers"))); + + @Override + public String factoryIdentifier() { + return "default"; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public CatalogConnection createConnection( + SensitiveConnection connection, WritableSecretStore secretStore) { + Map<String, String> allOptions = connection.getOptions(); + + if (allOptions.containsKey(SECRET_REFERENCE_KEY)) { + throw new ValidationException( + String.format( + "Connection option '%s' is reserved and cannot be set by users.", + SECRET_REFERENCE_KEY)); + } + + Map<String, String> sensitiveOptions = + allOptions.entrySet().stream() + .filter(e -> SENSITIVE_FIELD_NAMES.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map<String, String> nonSensitiveOptions = + allOptions.entrySet().stream() + .filter(e -> !SENSITIVE_FIELD_NAMES.contains(e.getKey())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (a, b) -> a, + HashMap::new)); + + if (!sensitiveOptions.isEmpty()) { + final String secretId; + try { + secretId = secretStore.storeSecret(sensitiveOptions); + } catch (SecretException e) { + throw e; + } catch (RuntimeException e) { + throw new SecretException("Failed to store connection secret.", e); + } + nonSensitiveOptions.put(SECRET_REFERENCE_KEY, secretId); Review Comment: we should add a test that also verifies how secretStore is called. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.SensitiveConnection; +import org.apache.flink.table.secret.ReadableSecretStore; +import org.apache.flink.table.secret.WritableSecretStore; +import org.apache.flink.table.secret.exceptions.SecretException; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Default implementation of {@link ConnectionFactory} that identifies sensitive fields by a + * predefined whitelist of field names. + * + * <p>During {@link #createConnection}, sensitive fields are extracted from the connection options + * and stored as a single secret in the {@link WritableSecretStore}. A reference key ({@value + * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the returned {@link + * CatalogConnection}. + * + * <p>During {@link #resolveConnection}, the secret reference is used to retrieve the sensitive + * fields from the {@link ReadableSecretStore} and merge them back into the options. + * + * <p>The following field names are treated as sensitive by default: {@code password}, {@code + * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key}, {@code auth-params}, + * {@code service-key}, {@code token}, {@code basic-auth}, {@code jaas.config}, {@code + * http-headers}. + */ +@Internal +public class DefaultConnectionFactory implements ConnectionFactory { + + /** + * Reserved option key used to store the reference to secrets in the secret store. The + * surrounding double underscores make collision with user-supplied option names unlikely; user + * options containing this key will be rejected at create-time. + */ + public static final String SECRET_REFERENCE_KEY = "__flink.encrypted_secret_key__"; Review Comment: nit: because in Flink options are usually separated by `-`. But I'm fine with the surrounding `__` ```suggestion public static final String SECRET_REFERENCE_KEY = "__flink.encrypted-secret-key__"; ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.SensitiveConnection; +import org.apache.flink.table.secret.ReadableSecretStore; +import org.apache.flink.table.secret.WritableSecretStore; +import org.apache.flink.table.secret.exceptions.SecretException; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Default implementation of {@link ConnectionFactory} that identifies sensitive fields by a + * predefined whitelist of field names. + * + * <p>During {@link #createConnection}, sensitive fields are extracted from the connection options + * and stored as a single secret in the {@link WritableSecretStore}. A reference key ({@value + * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the returned {@link + * CatalogConnection}. + * + * <p>During {@link #resolveConnection}, the secret reference is used to retrieve the sensitive + * fields from the {@link ReadableSecretStore} and merge them back into the options. + * + * <p>The following field names are treated as sensitive by default: {@code password}, {@code + * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key}, {@code auth-params}, + * {@code service-key}, {@code token}, {@code basic-auth}, {@code jaas.config}, {@code + * http-headers}. + */ +@Internal +public class DefaultConnectionFactory implements ConnectionFactory { + + /** + * Reserved option key used to store the reference to secrets in the secret store. The + * surrounding double underscores make collision with user-supplied option names unlikely; user + * options containing this key will be rejected at create-time. + */ + public static final String SECRET_REFERENCE_KEY = "__flink.encrypted_secret_key__"; + + private static final Set<String> SENSITIVE_FIELD_NAMES = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + "password", + "secret", + "fs.azure.account.key", + "apikey", + "api-key", + "auth-params", + "service-key", + "token", + "basic-auth", + "jaas.config", + "http-headers"))); + + @Override + public String factoryIdentifier() { + return "default"; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public CatalogConnection createConnection( + SensitiveConnection connection, WritableSecretStore secretStore) { + Map<String, String> allOptions = connection.getOptions(); + + if (allOptions.containsKey(SECRET_REFERENCE_KEY)) { + throw new ValidationException( + String.format( + "Connection option '%s' is reserved and cannot be set by users.", + SECRET_REFERENCE_KEY)); + } + + Map<String, String> sensitiveOptions = + allOptions.entrySet().stream() + .filter(e -> SENSITIVE_FIELD_NAMES.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map<String, String> nonSensitiveOptions = + allOptions.entrySet().stream() + .filter(e -> !SENSITIVE_FIELD_NAMES.contains(e.getKey())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (a, b) -> a, + HashMap::new)); + + if (!sensitiveOptions.isEmpty()) { + final String secretId; + try { + secretId = secretStore.storeSecret(sensitiveOptions); + } catch (SecretException e) { + throw e; + } catch (RuntimeException e) { + throw new SecretException("Failed to store connection secret.", e); + } + nonSensitiveOptions.put(SECRET_REFERENCE_KEY, secretId); Review Comment: is there any test that verifies the behavior of this class? I don't see any references of SECRET_REFERENCE_KEY out side of this class ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ########## @@ -1791,6 +1836,328 @@ public ResolvedCatalogModel resolveCatalogModel(CatalogModel model) { return ResolvedCatalogModel.of(model, resolvedInputSchema, resolvedOutputSchema); } + // ------ connections ------ + + /** + * Get a connection from the catalog with the given object identifier. + * + * @param objectIdentifier The fully qualified path of the connection. + * @return The requested connection wrapped in Optional. + */ + public Optional<CatalogConnection> getConnection(ObjectIdentifier objectIdentifier) { + CatalogConnection temporaryConnection = temporaryConnections.get(objectIdentifier); + if (temporaryConnection != null) { + return Optional.of(temporaryConnection); + } + + Optional<Catalog> catalog = getCatalog(objectIdentifier.getCatalogName()); + if (catalog.isPresent()) { + try { + return Optional.of(catalog.get().getConnection(objectIdentifier.toObjectPath())); + } catch (ConnectionNotExistException | UnsupportedOperationException e) { + // ConnectionNotExistException: connection does not exist in this catalog. + // UnsupportedOperationException: catalog does not support connections. + return Optional.empty(); + } + } else { + return Optional.empty(); + } + } + + /** + * List all connections in the given catalog and database. + * + * @param catalogName The name of the catalog. + * @param databaseName The name of the database. + * @return A set of connection names. + */ + public Set<String> listConnections(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrError(catalogName); + try { + Set<String> connections = new HashSet<>(catalog.listConnections(databaseName)); + + // Add temporary connections for this catalog and database + temporaryConnections.keySet().stream() + .filter( + identifier -> + identifier.getCatalogName().equals(catalogName) + && identifier.getDatabaseName().equals(databaseName)) + .map(ObjectIdentifier::getObjectName) + .forEach(connections::add); + + return connections; + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format( + "Database %s does not exist in catalog %s.", databaseName, catalogName), + e); + } catch (CatalogException e) { + throw new TableException( + String.format( + "Failed to list connections in catalog %s and database %s.", + catalogName, databaseName), + e); + } + } + + /** + * Create a permanent connection in the given fully qualified path. + * + * <p>If a {@link ConnectionFactory} and {@link WritableSecretStore} are configured, sensitive + * fields are extracted from the connection and stored in the secret store before persisting the + * non-sensitive {@link CatalogConnection} to the catalog. + * + * @param connection The connection with all options including sensitive fields. + * @param objectIdentifier The fully qualified path where to create the connection. + * @param ignoreIfExists If false exception will be thrown if the connection already exists. + */ + public void createConnection( + SensitiveConnection connection, + ObjectIdentifier objectIdentifier, + boolean ignoreIfExists) { + if (connectionFactory == null || writableSecretStore == null) { + throw new ValidationException( + "ConnectionFactory and WritableSecretStore must be configured to create connections."); + } + if (getConnection(objectIdentifier).isPresent()) { + if (ignoreIfExists) { + return; + } + throw new ValidationException( + String.format( + "Connection with identifier '%s' already exists.", + objectIdentifier.asSummaryString())); + } + final CatalogConnection catalogConnection = + connectionFactory.createConnection(connection, writableSecretStore); + boolean persisted = false; + try { + execute( + (catalog, path) -> { + catalog.createConnection(path, catalogConnection, ignoreIfExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateConnectionEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + catalogConnection, + ignoreIfExists, + false))); + }, + objectIdentifier, + ignoreIfExists, + "CreateConnection"); + persisted = true; + } finally { + if (!persisted) { + tryDeleteSecrets( + catalogConnection, + writableSecretStore, + "rollback createConnection " + objectIdentifier); + } + } + } + + /** + * Create a temporary connection in the given fully qualified path. + * + * @param connection The connection with all options including sensitive fields. + * @param objectIdentifier The fully qualified path where to create the connection. + * @param ignoreIfExists If false exception will be thrown if the connection already exists. + */ + public void createTemporaryConnection( + SensitiveConnection connection, + ObjectIdentifier objectIdentifier, + boolean ignoreIfExists) { + if (connectionFactory == null) { Review Comment: I thought we use Java Service providers and discover a connection factory. If no factory can be found, we can fall back to the default one configured in EnvironmentSettings and from there passed to CatalogManager. Wdyt? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.ObjectPath; + +/** Exception for trying to create a connection that already exists. */ +@PublicEvolving +public class ConnectionAlreadyExistException extends Exception { + + private static final String MSG = "Connection %s already exists in Catalog %s."; Review Comment: please revisit other messages in this PR ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.SensitiveConnection; +import org.apache.flink.table.secret.ReadableSecretStore; +import org.apache.flink.table.secret.WritableSecretStore; +import org.apache.flink.table.secret.exceptions.SecretException; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Default implementation of {@link ConnectionFactory} that identifies sensitive fields by a + * predefined whitelist of field names. + * + * <p>During {@link #createConnection}, sensitive fields are extracted from the connection options + * and stored as a single secret in the {@link WritableSecretStore}. A reference key ({@value + * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the returned {@link + * CatalogConnection}. + * + * <p>During {@link #resolveConnection}, the secret reference is used to retrieve the sensitive + * fields from the {@link ReadableSecretStore} and merge them back into the options. + * + * <p>The following field names are treated as sensitive by default: {@code password}, {@code + * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key}, {@code auth-params}, + * {@code service-key}, {@code token}, {@code basic-auth}, {@code jaas.config}, {@code + * http-headers}. + */ +@Internal +public class DefaultConnectionFactory implements ConnectionFactory { + + /** + * Reserved option key used to store the reference to secrets in the secret store. The + * surrounding double underscores make collision with user-supplied option names unlikely; user + * options containing this key will be rejected at create-time. + */ + public static final String SECRET_REFERENCE_KEY = "__flink.encrypted_secret_key__"; + + private static final Set<String> SENSITIVE_FIELD_NAMES = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + "password", Review Comment: Shouldn't we start smaller and let this list evolve? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
