This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push:
new b6aeb26c7e2 [FLINK-38815] Mask sensitive values in Pekko debug
configuration logs
b6aeb26c7e2 is described below
commit b6aeb26c7e245f9d2f7e9e58e9571ca278e71462
Author: Ferenc Csaky <[email protected]>
AuthorDate: Wed Apr 1 14:35:03 2026 +0200
[FLINK-38815] Mask sensitive values in Pekko debug configuration logs
Co-authored-by: Vishal
<[email protected]>
---
.../rpc/pekko/ActorSystemBootstrapTools.java | 23 +++++++++++++-
.../rpc/pekko/ActorSystemBootstrapToolsTest.java | 36 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
index 517a48669e6..a206e4ef08b 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.rpc.pekko;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.NetUtils;
@@ -32,7 +33,9 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.net.BindException;
import java.util.Iterator;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
/** Tools for starting the Actor Systems used to run the JobManager and
TaskManager actors. */
public class ActorSystemBootstrapTools {
@@ -241,6 +244,22 @@ public class ActorSystemBootstrapTools {
}
}
+ /**
+ * Converts the given Pekko {@link Config} into a flattened {@link Map}.
+ *
+ * @param config The Pekko configuration
+ * @return A map of configuration keys to string values
+ */
+ @VisibleForTesting
+ static Map<String, String> toMaskedMap(Config config) {
+ return ConfigurationUtils.hideSensitiveValues(
+ config.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
String.valueOf(entry.getValue().unwrapped()))));
+ }
+
/**
* Starts an Actor System with given Pekko config.
*
@@ -251,7 +270,9 @@ public class ActorSystemBootstrapTools {
*/
private static ActorSystem startActorSystem(
Config config, String actorSystemName, Logger logger) {
- logger.debug("Using pekko configuration\n {}", config);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Using pekko configuration\n {}",
toMaskedMap(config));
+ }
ActorSystem actorSystem =
PekkoUtils.createActorSystem(actorSystemName, config);
logger.info("Actor system started at {}",
PekkoUtils.getAddress(actorSystem));
diff --git
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java
index 5343c4011cf..1ebecb27fb9 100644
---
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java
+++
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java
@@ -18,11 +18,14 @@
package org.apache.flink.runtime.rpc.pekko;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.pekko.actor.ActorSystem;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -32,6 +35,7 @@ import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
@@ -40,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link ActorSystemBootstrapTools}. */
@@ -111,4 +116,35 @@ class ActorSystemBootstrapToolsTest {
portOccupier.close();
}
}
+
+ @Test
+ void testToMaskedMapMasksOnlySensitiveKeys() {
+ Config config =
+ ConfigFactory.parseMap(
+ Map.of(
+ "pekko.loglevel", "OFF",
+ "pekko.remote.artery.enabled", "false",
+
"pekko.remote.classic.netty.ssl.security.key-password", "secret",
+
"pekko.remote.classic.netty.ssl.security.key-store-password",
+ "secret2",
+
"pekko.remote.classic.netty.ssl.security.trust-store-password",
+ "secret3"));
+
+ Map<String, String> result =
ActorSystemBootstrapTools.toMaskedMap(config);
+
+ // Non-sensitive values should remain the same
+ assertThat(result.get("pekko.loglevel")).isEqualTo("OFF");
+
assertThat(result.get("pekko.remote.artery.enabled")).isEqualTo("false");
+
+ // Sensitive values should be masked
+
assertThat(result.get("pekko.remote.classic.netty.ssl.security.key-password"))
+ .isNotEqualTo("secret")
+ .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT);
+
assertThat(result.get("pekko.remote.classic.netty.ssl.security.key-store-password"))
+ .isNotEqualTo("secret2")
+ .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT);
+
assertThat(result.get("pekko.remote.classic.netty.ssl.security.trust-store-password"))
+ .isNotEqualTo("secret3")
+ .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT);
+ }
}