This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f008d0ee0ab [FLINK-39685][table] Redact sensitive options in SHOW 
CREATE and DESCRIBE CATALOG statements
f008d0ee0ab is described below

commit f008d0ee0ab9c066d40115879ae601523a1c2615
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon May 18 17:19:52 2026 +0200

    [FLINK-39685][table] Redact sensitive options in SHOW CREATE and DESCRIBE 
CATALOG statements
---
 .../flink/table/api/internal/ShowCreateUtil.java   |  67 ++++++-------
 .../table/operations/DescribeCatalogOperation.java |  19 +++-
 .../operations/ShowCreateCatalogOperation.java     |   6 +-
 .../ShowCreateMaterializedTableOperation.java      |   4 +-
 .../table/operations/ShowCreateModelOperation.java |   7 +-
 .../table/operations/ShowCreateTableOperation.java |   4 +-
 .../table/api/internal/ShowCreateUtilTest.java     | 107 ++++++++++++++++++++-
 .../flink/table/catalog/DefaultCatalogModel.java   |   4 +-
 .../flink/table/catalog/DefaultCatalogTable.java   |   3 +-
 .../table/catalog/DefaultCatalogTableTest.java     |  84 ++++++++++++++++
 .../table/planner/catalog/DescribeCatalogTest.java |  81 ++++++++++++++++
 11 files changed, 334 insertions(+), 52 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index dfc9fef79c5..9ea15b1eca8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
@@ -67,7 +68,10 @@ public class ShowCreateUtil {
     private ShowCreateUtil() {}
 
     public static String buildShowCreateModelRow(
-            ResolvedCatalogModel model, ObjectIdentifier modelIdentifier, 
boolean isTemporary) {
+            ResolvedCatalogModel model,
+            ObjectIdentifier modelIdentifier,
+            boolean isTemporary,
+            List<String> additionalSensitiveKeys) {
         StringBuilder sb =
                 new StringBuilder()
                         .append(
@@ -81,7 +85,7 @@ public class ShowCreateUtil {
                         c -> sb.append(String.format("OUTPUT (%s)%s", c, 
System.lineSeparator())));
         extractComment(model)
                 .ifPresent(c -> 
sb.append(formatComment(c)).append(System.lineSeparator()));
-        extractFormattedOptions(model.getOptions(), PRINT_INDENT)
+        extractFormattedOptions(model.getOptions(), PRINT_INDENT, 
additionalSensitiveKeys)
                 .ifPresent(
                         v ->
                                 sb.append(String.format("WITH (%s", 
System.lineSeparator()))
@@ -98,7 +102,8 @@ public class ShowCreateUtil {
             ResolvedCatalogBaseTable<?> table,
             ObjectIdentifier tableIdentifier,
             boolean isTemporary,
-            SqlFactory sqlFactory) {
+            SqlFactory sqlFactory,
+            List<String> additionalSensitiveKeys) {
         validateTableKind(table, tableIdentifier, TableKind.TABLE);
         StringBuilder sb =
                 new StringBuilder()
@@ -118,7 +123,7 @@ public class ShowCreateUtil {
                 .ifPresent(
                         partitionedInfoFormatted ->
                                 
sb.append(formatPartitionedBy(partitionedInfoFormatted)));
-        extractFormattedOptions(table.getOptions(), PRINT_INDENT)
+        extractFormattedOptions(table.getOptions(), PRINT_INDENT, 
additionalSensitiveKeys)
                 .ifPresent(v -> sb.append("WITH 
(\n").append(v).append("\n)\n"));
         return sb.toString();
     }
@@ -130,7 +135,8 @@ public class ShowCreateUtil {
             boolean isTemporary,
             boolean createOrAlter,
             ZoneId timeZoneId,
-            SqlFactory sqlFactory) {
+            SqlFactory sqlFactory,
+            List<String> additionalSensitiveKeys) {
         return buildShowCreateMaterializedTableRow(
                 table,
                 tableIdentifier,
@@ -139,7 +145,8 @@ public class ShowCreateUtil {
                 timeZoneId,
                 sqlFactory,
                 true,
-                true);
+                true,
+                additionalSensitiveKeys);
     }
 
     /** Show create materialized table statement only for materialized tables. 
*/
@@ -151,7 +158,8 @@ public class ShowCreateUtil {
             ZoneId timeZoneId,
             SqlFactory sqlFactory,
             boolean includeFreshness,
-            boolean includeRefreshMode) {
+            boolean includeRefreshMode,
+            List<String> additionalSensitiveKeys) {
         validateTableKind(table, tableIdentifier, 
TableKind.MATERIALIZED_TABLE);
         StringBuilder sb =
                 new StringBuilder()
@@ -174,7 +182,7 @@ public class ShowCreateUtil {
                 .ifPresent(d -> sb.append(d).append("\n"));
         extractFormattedPartitionedInfo(table)
                 .ifPresent(partitionedBy -> 
sb.append(formatPartitionedBy(partitionedBy)));
-        extractFormattedOptions(table.getOptions(), PRINT_INDENT)
+        extractFormattedOptions(table.getOptions(), PRINT_INDENT, 
additionalSensitiveKeys)
                 .ifPresent(v -> sb.append("WITH 
(\n").append(v).append("\n)\n"));
         sb.append(extractStartMode(table, timeZoneId)).append("\n");
         if (includeFreshness) {
@@ -211,14 +219,18 @@ public class ShowCreateUtil {
         return sb.toString();
     }
 
-    public static String buildShowCreateCatalogRow(CatalogDescriptor 
catalogDescriptor) {
+    public static String buildShowCreateCatalogRow(
+            CatalogDescriptor catalogDescriptor, List<String> 
additionalSensitiveKeys) {
         final Optional<String> comment = catalogDescriptor.getComment();
         StringBuilder sb = new StringBuilder();
         sb.append("CREATE CATALOG ")
                 
.append(EncodingUtils.escapeIdentifier(catalogDescriptor.getCatalogName()))
                 .append("\n");
         comment.ifPresent(c -> sb.append(formatComment(c)).append("\n"));
-        extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(), 
PRINT_INDENT)
+        extractFormattedOptions(
+                        catalogDescriptor.getConfiguration().toMap(),
+                        PRINT_INDENT,
+                        additionalSensitiveKeys)
                 .ifPresent(o -> sb.append("WITH 
(\n").append(o).append("\n)\n"));
         return sb.toString();
     }
@@ -432,7 +444,8 @@ public class ShowCreateUtil {
         return TIMESTAMP_FORMATTER.format(LocalDateTime.ofInstant(instant, 
timeZone));
     }
 
-    static Optional<String> extractFormattedOptions(Map<String, String> conf, 
String printIndent) {
+    static Optional<String> extractFormattedOptions(
+            Map<String, String> conf, String printIndent, List<String> 
additionalSensitiveKeys) {
         if (Objects.isNull(conf) || conf.isEmpty()) {
             return Optional.empty();
         }
@@ -446,7 +459,12 @@ public class ShowCreateUtil {
                                                 "%s'%s' = '%s'",
                                                 printIndent,
                                                 
EncodingUtils.escapeSingleQuotes(entry),
-                                                
EncodingUtils.escapeSingleQuotes(conf.get(entry))))
+                                                
EncodingUtils.escapeSingleQuotes(
+                                                        
GlobalConfiguration.isSensitive(
+                                                                        entry,
+                                                                        
additionalSensitiveKeys)
+                                                                ? 
GlobalConfiguration.HIDDEN_CONTENT
+                                                                : 
conf.get(entry))))
                         .collect(Collectors.joining(",\n")));
     }
 
@@ -462,31 +480,6 @@ public class ShowCreateUtil {
                 .collect(Collectors.joining(",\n"));
     }
 
-    private static String maybeLowerCaseKey(String key, boolean lowerCaseKey) {
-        return lowerCaseKey ? key.toLowerCase() : key;
-    }
-
-    static Optional<String> extractFormattedOptions(
-            Map<String, String> conf, String printIndent, boolean 
lowerCaseKeys) {
-        if (Objects.isNull(conf) || conf.isEmpty()) {
-            return Optional.empty();
-        }
-        return Optional.of(
-                conf.entrySet().stream()
-                        .map(
-                                entry ->
-                                        String.format(
-                                                "%s'%s' = '%s'",
-                                                printIndent,
-                                                maybeLowerCaseKey(
-                                                        
EncodingUtils.escapeSingleQuotes(
-                                                                
entry.getKey()),
-                                                        lowerCaseKeys),
-                                                
EncodingUtils.escapeSingleQuotes(entry.getValue())))
-                        .sorted()
-                        .collect(Collectors.joining("," + 
System.lineSeparator())));
-    }
-
     private static void validateTableKind(
             ResolvedCatalogBaseTable<?> table,
             ObjectIdentifier tableIdentifier,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
index 08a97d10bec..9d1990fe67d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultInternal;
@@ -87,17 +89,24 @@ public class DescribeCatalogOperation implements Operation, 
ExecutableOperation
                                 Arrays.asList(
                                         "comment", 
catalogDescriptor.getComment().orElse(null))));
         if (isExtended) {
+            List<String> additionalSensitiveKeys =
+                    
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS);
             properties.entrySet().stream()
                     .filter(
                             entry ->
                                     
!CommonCatalogOptions.CATALOG_TYPE.key().equals(entry.getKey()))
                     .sorted(Map.Entry.comparingByKey())
                     .forEach(
-                            entry ->
-                                    rows.add(
-                                            Arrays.asList(
-                                                    String.format("option:%s", 
entry.getKey()),
-                                                    entry.getValue())));
+                            entry -> {
+                                String value =
+                                        GlobalConfiguration.isSensitive(
+                                                        entry.getKey(), 
additionalSensitiveKeys)
+                                                ? 
GlobalConfiguration.HIDDEN_CONTENT
+                                                : entry.getValue();
+                                rows.add(
+                                        Arrays.asList(
+                                                String.format("option:%s", 
entry.getKey()), value));
+                            });
         }
 
         return buildTableResult(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
index 60baecb40a8..fca7b39a5d9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.ShowCreateUtil;
 import org.apache.flink.table.api.internal.TableResultInternal;
@@ -57,7 +58,10 @@ public class ShowCreateCatalogOperation implements 
ShowOperation {
                                                 String.format(
                                                         "Cannot obtain 
metadata information from Catalog %s.",
                                                         catalogName)));
-        String resultRow = 
ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);
+        String resultRow =
+                ShowCreateUtil.buildShowCreateCatalogRow(
+                        catalogDescriptor,
+                        
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
 
         return buildStringArrayResult("result", new String[] {resultRow});
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
index 0ca8a1475ce..1165c44e4bb 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.ShowCreateUtil;
 import org.apache.flink.table.api.internal.TableResultInternal;
@@ -65,7 +66,8 @@ public class ShowCreateMaterializedTableOperation implements 
ShowOperation {
                         table.isTemporary(),
                         createOrAlter,
                         ctx.getTableConfig().getLocalTimeZone(),
-                        ctx.getCatalogManager().getSqlFactory());
+                        ctx.getCatalogManager().getSqlFactory(),
+                        
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
 
         return buildStringArrayResult("result", new String[] {resultRow});
     }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
index d94471ba0c3..ea0bd14174a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.table.api.internal.ShowCreateUtil;
 import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -53,7 +54,11 @@ public class ShowCreateModelOperation implements 
ShowOperation {
     @Override
     public TableResultInternal execute(Context ctx) {
         String resultRow =
-                ShowCreateUtil.buildShowCreateModelRow(model, modelIdentifier, 
isTemporary);
+                ShowCreateUtil.buildShowCreateModelRow(
+                        model,
+                        modelIdentifier,
+                        isTemporary,
+                        
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
         return buildStringArrayResult("result", new String[] {resultRow});
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
index 6608ff263b9..d0144f7abb5 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.ShowCreateUtil;
 import org.apache.flink.table.api.internal.TableResultInternal;
@@ -62,7 +63,8 @@ public class ShowCreateTableOperation implements 
ShowOperation {
                         table.getResolvedTable(),
                         tableIdentifier,
                         table.isTemporary(),
-                        ctx.getCatalogManager().getSqlFactory());
+                        ctx.getCatalogManager().getSqlFactory(),
+                        
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
 
         return buildStringArrayResult("result", new String[] {resultRow});
     }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 69df0555728..750818181d4 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.expressions.DefaultSqlFactory;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -112,7 +113,8 @@ class ShowCreateUtilTest {
                         resolvedCatalogTable,
                         TABLE_IDENTIFIER,
                         isTemporary,
-                        DefaultSqlFactory.INSTANCE);
+                        DefaultSqlFactory.INSTANCE,
+                        List.of());
         assertThat(createTableString).isEqualTo(expected);
     }
 
@@ -139,7 +141,8 @@ class ShowCreateUtilTest {
                         false,
                         createOrAlter,
                         ZoneOffset.UTC,
-                        DefaultSqlFactory.INSTANCE);
+                        DefaultSqlFactory.INSTANCE,
+                        List.of());
         final String fixedTimestamp = "1970-01-02 12:34:56";
         final String normalizedMTString =
                 setFixedTimestamp(createMaterializedTableString, 
fixedTimestamp);
@@ -171,7 +174,8 @@ class ShowCreateUtilTest {
                         ZoneOffset.UTC,
                         DefaultSqlFactory.INSTANCE,
                         includeFreshness,
-                        includeRefreshMode);
+                        includeRefreshMode,
+                        List.of());
 
         final StringBuilder expected =
                 new StringBuilder()
@@ -195,7 +199,7 @@ class ShowCreateUtilTest {
     @MethodSource("argsForShowCreateCatalog")
     void showCreateCatalog(CatalogDescriptor catalogDescriptor, String 
expected) {
         final String createCatalogString =
-                ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);
+                ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor, 
List.of());
         assertThat(createCatalogString).isEqualTo(expected);
     }
 
@@ -228,6 +232,19 @@ class ShowCreateUtilTest {
                                 + "  'k_c' = 'v_c'\n"
                                 + ")\n"));
 
+        final Map<String, String> sensitiveCatalogOptions = new HashMap<>();
+        sensitiveCatalogOptions.put("type", "hive");
+        sensitiveCatalogOptions.put("hive.metastore.token", "tok123");
+        argList.add(
+                Arguments.of(
+                        CatalogDescriptor.of(
+                                "catalogName", 
Configuration.fromMap(sensitiveCatalogOptions)),
+                        "CREATE CATALOG `catalogName`\n"
+                                + "WITH (\n"
+                                + "  'hive.metastore.token' = '******',\n"
+                                + "  'type' = 'hive'\n"
+                                + ")\n"));
+
         return argList;
     }
 
@@ -354,6 +371,22 @@ class ShowCreateUtilTest {
                         + "  'option_key_b' = 'option_value_b',\n"
                         + "  'option_key_c' = 'option_value_c'\n"
                         + ")\n");
+
+        final Map<String, String> sensitiveOptions = new HashMap<>();
+        sensitiveOptions.put("connector", "kafka");
+        sensitiveOptions.put("my.api-key", "abc123");
+        sensitiveOptions.put("password", "topsecret");
+        addTemporaryAndPermanent(
+                argList,
+                createResolvedTable(ONE_COLUMN_SCHEMA, sensitiveOptions, 
List.of(), null, null),
+                "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n"
+                        + "  `id` INT\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'kafka',\n"
+                        + "  'my.api-key' = '******',\n"
+                        + "  'password' = '******'\n"
+                        + ")\n");
         return argList;
     }
 
@@ -477,9 +510,51 @@ class ShowCreateUtilTest {
                         + "REFRESH_MODE = FULL\n"
                         + "AS SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`\n");
 
+        addCreateAndCreateOrAlter(
+                argList,
+                createResolvedMaterializedWithOptions(
+                        ONE_COLUMN_SCHEMA,
+                        Map.of("connector", "kafka", "secret.key", "mysecret"),
+                        StartMode.of(StartModeKind.FROM_BEGINNING),
+                        IntervalFreshness.ofMinute(2),
+                        RefreshMode.CONTINUOUS,
+                        "SELECT 1"),
+                "%sMATERIALIZED TABLE 
`catalogName`.`dbName`.`materializedTableName` (\n"
+                        + "  `id` INT\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'kafka',\n"
+                        + "  'secret.key' = '******'\n"
+                        + ")\n"
+                        + "START_MODE = FROM_BEGINNING\n"
+                        + "FRESHNESS = INTERVAL '2' MINUTE\n"
+                        + "REFRESH_MODE = CONTINUOUS\n"
+                        + "AS SELECT 1\n");
+
         return argList;
     }
 
+    @Test
+    void showCreateTableRedactsAdditionalSensitiveKeys() {
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "kafka");
+        options.put("my.custom.cred", "supersecret");
+        ResolvedCatalogTable table =
+                createResolvedTable(ONE_COLUMN_SCHEMA, options, List.of(), 
null, null);
+
+        String result =
+                ShowCreateUtil.buildShowCreateTableRow(
+                        table,
+                        TABLE_IDENTIFIER,
+                        false,
+                        DefaultSqlFactory.INSTANCE,
+                        List.of("custom.cred"));
+
+        assertThat(result).contains("'connector' = 'kafka'");
+        assertThat(result).contains("'my.custom.cred' = '******'");
+        assertThat(result).doesNotContain("supersecret");
+    }
+
     private static void addTemporaryAndPermanent(
             Collection<Arguments> argList, CatalogBaseTable catalogBaseTable, 
String sql) {
         argList.add(Arguments.of(catalogBaseTable, false, String.format(sql, 
"")));
@@ -554,6 +629,30 @@ class ShowCreateUtilTest {
                 startMode);
     }
 
+    private static ResolvedCatalogMaterializedTable 
createResolvedMaterializedWithOptions(
+            ResolvedSchema resolvedSchema,
+            Map<String, String> options,
+            StartMode startMode,
+            IntervalFreshness freshness,
+            RefreshMode refreshMode,
+            String query) {
+        return new ResolvedCatalogMaterializedTable(
+                CatalogMaterializedTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .options(options)
+                        .freshness(freshness)
+                        .refreshMode(refreshMode)
+                        .originalQuery(query)
+                        .expandedQuery(query)
+                        .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
+                        .refreshStatus(RefreshStatus.ACTIVATED)
+                        .build(),
+                resolvedSchema,
+                refreshMode,
+                freshness,
+                startMode);
+    }
+
     private static String setFixedTimestamp(String sql, String fixedTimestamp) 
{
         return START_MODE_EVALUATED_TIMESTAMP
                 .matcher(sql)
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java
index cc39813ec13..e6b38f8ff94 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java
@@ -19,10 +19,12 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.table.api.Schema;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -107,7 +109,7 @@ public class DefaultCatalogModel implements CatalogModel {
                 + ", outputSchema="
                 + outputSchema
                 + ", modelOptions="
-                + modelOptions
+                + ConfigurationUtils.hideSensitiveValues(modelOptions, 
List.of())
                 + ", comment="
                 + comment
                 + "}";
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
index b06e6acee26..f94c2399acc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.table.api.Schema;
 
 import javax.annotation.Nullable;
@@ -162,7 +163,7 @@ public class DefaultCatalogTable implements CatalogTable {
                 + ", partitionKeys="
                 + partitionKeys
                 + ", options="
-                + options
+                + ConfigurationUtils.hideSensitiveValues(options, List.of())
                 + ", snapshot="
                 + snapshot
                 + '}';
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java
new file mode 100644
index 00000000000..03301ba5cd6
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultCatalogTable}. */
+class DefaultCatalogTableTest {
+
+    @Test
+    void toStringRedactsSensitiveOptions() {
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "kafka");
+        options.put("password", "topsecret");
+        options.put("my.token", "tok123");
+
+        CatalogTable table =
+                CatalogTable.newBuilder()
+                        .schema(Schema.newBuilder().column("id", 
DataTypes.INT()).build())
+                        .options(options)
+                        .build();
+
+        String result = table.toString();
+
+        assertThat(result).contains("connector=kafka");
+        assertThat(result).doesNotContain("topsecret");
+        assertThat(result).doesNotContain("tok123");
+        assertThat(result).contains("password=******");
+        assertThat(result).contains("my.token=******");
+    }
+
+    @Test
+    void toStringKeepsNonSensitiveOptions() {
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "kafka");
+        options.put("topic", "my-topic");
+
+        CatalogTable table =
+                CatalogTable.newBuilder()
+                        .schema(Schema.newBuilder().column("id", 
DataTypes.INT()).build())
+                        .options(options)
+                        .build();
+
+        String result = table.toString();
+
+        assertThat(result).contains("connector=kafka");
+        assertThat(result).contains("topic=my-topic");
+    }
+
+    @Test
+    void toStringWithEmptyOptionsDoesNotFail() {
+        CatalogTable table =
+                CatalogTable.newBuilder()
+                        .schema(Schema.newBuilder().column("id", 
DataTypes.INT()).build())
+                        .options(Map.of())
+                        .build();
+
+        assertThat(table.toString()).contains("options={}");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java
new file mode 100644
index 00000000000..e27e18813f4
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.catalog;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@code DESCRIBE CATALOG} SQL statement, focusing on 
sensitive-option redaction. */
+class DescribeCatalogTest {
+
+    private static final String CATALOG_NAME = "test_cat";
+
+    @Test
+    void describeCatalogExtendedRedactsSensitiveOptions() throws Exception {
+        TableEnvironment tEnv = buildEnvWithSensitiveOptions();
+
+        List<Row> rows =
+                CollectionUtil.iteratorToList(
+                        tEnv.executeSql("DESCRIBE CATALOG EXTENDED " + 
CATALOG_NAME).collect());
+
+        assertThat(rows)
+                .contains(
+                        Row.of("option:password", 
GlobalConfiguration.HIDDEN_CONTENT),
+                        Row.of("option:my.token", 
GlobalConfiguration.HIDDEN_CONTENT),
+                        Row.of("option:safe-option", "safe-value"))
+                .noneMatch(r -> "topsecret".equals(r.getField(1)))
+                .noneMatch(r -> "tok123".equals(r.getField(1)));
+    }
+
+    @Test
+    void describeCatalogNonExtendedDoesNotExposeOptions() throws Exception {
+        TableEnvironment tEnv = buildEnvWithSensitiveOptions();
+
+        List<Row> rows =
+                CollectionUtil.iteratorToList(
+                        tEnv.executeSql("DESCRIBE CATALOG " + 
CATALOG_NAME).collect());
+
+        assertThat(rows).noneMatch(r -> 
String.valueOf(r.getField(0)).startsWith("option:"));
+    }
+
+    private static TableEnvironment buildEnvWithSensitiveOptions() throws 
Exception {
+        GenericInMemoryCatalogStore catalogStore = new 
GenericInMemoryCatalogStore();
+        catalogStore.open();
+        Configuration config = new Configuration();
+        config.setString("type", "generic_in_memory");
+        config.setString("safe-option", "safe-value");
+        config.setString("password", "topsecret");
+        config.setString("my.token", "tok123");
+        catalogStore.storeCatalog(CATALOG_NAME, 
CatalogDescriptor.of(CATALOG_NAME, config));
+        return TableEnvironment.create(
+                
EnvironmentSettings.newInstance().withCatalogStore(catalogStore).build());
+    }
+}


Reply via email to