This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7c8d3c9210d [improve][broker] PIP-402: Optionally prevent
role/originalPrincipal logging (#23386)
7c8d3c9210d is described below
commit 7c8d3c9210dcaf1d0c3b487af325b7b0a9dc3095
Author: kannar <[email protected]>
AuthorDate: Tue Sep 23 18:07:26 2025 +0200
[improve][broker] PIP-402: Optionally prevent role/originalPrincipal
logging (#23386)
---
conf/broker.conf | 3 +
conf/proxy.conf | 3 +
conf/standalone.conf | 3 +
.../apache/pulsar/broker/ServiceConfiguration.java | 8 +++
...DefaultAuthenticationRoleLoggingAnonymizer.java | 67 ++++++++++++++++++
.../anonymizer/DefaultRoleAnonymizerType.java | 80 ++++++++++++++++++++++
.../configuration/anonymizer/package-info.java | 23 +++++++
.../apache/pulsar/broker/service/ServerCnx.java | 21 ++++--
.../pulsar/proxy/server/ProxyConfiguration.java | 7 ++
.../pulsar/proxy/server/ProxyConnection.java | 18 +++--
10 files changed, 220 insertions(+), 13 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index c5c9c05ca39..a1f59bd3eed 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -909,6 +909,9 @@ metadataStoreBatchingMaxSizeKb=128
# Enable authentication
authenticationEnabled=false
+# Enable authentication role anonymizer, can be REDACTED, hash:SHA256,
hash:MD5, default is NONE
+authenticationRoleLoggingAnonymizer=NONE
+
# Authentication provider name list, which is comma separated list of class
names
authenticationProviders=
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 46d84744e12..dc1fc5f002f 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -184,6 +184,9 @@ forwardAuthorizationCredentials=false
# Whether authentication is enabled for the Pulsar proxy
authenticationEnabled=false
+# Enable authentication role anonymizer, can be REDACTED, hash:SHA256,
hash:MD5, default is NONE
+authenticationRoleLoggingAnonymizer=NONE
+
# Authentication provider name list (a comma-separated list of class names)
authenticationProviders=
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2b205a2b2f6..708d4905b8a 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -572,6 +572,9 @@ authenticateOriginalAuthData=false
# Enable authentication
authenticationEnabled=false
+# Enable authentication role anonymizer, can be REDACTED, hash:SHA256,
hash:MD5, default is NONE
+authenticationRoleLoggingAnonymizer=NONE
+
# Authentication provider name list, which is comma separated list of class
names
authenticationProviders=
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 279ed9ed73e..5d2d94ea477 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -445,6 +445,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private String clusterName;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Defines how the broker will anonymize the role and
originalAuthRole before logging. "
+ + "Possible values are: NONE (no anonymization), REDACTED
(replaces with '[REDACTED]'), "
+ + "hash:SHA256 (hashes using SHA-256), and hash:MD5
(hashes using MD5). Default is NONE."
+ )
+ private String authenticationRoleLoggingAnonymizer = "NONE";
+
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java
new file mode 100644
index 00000000000..151118c6d5e
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.configuration.anonymizer;
+
+import static
org.apache.pulsar.common.configuration.anonymizer.DefaultRoleAnonymizerType.NONE;
+
+/**
+ * This class provides a utility to anonymize authentication roles before
logging,
+ * based on a configured anonymization strategy. The anonymization strategy is
+ * determined by the provided value of the {@link DefaultRoleAnonymizerType}
enum.
+ *
+ * The primary purpose of this class is to enable flexible anonymization of
sensitive
+ * information (such as user roles) in logs, ensuring compliance with security
and
+ * privacy requirements while allowing customization of the anonymization
behavior.
+ *
+ * Usage:
+ * - The class constructor accepts a string that represents the desired
anonymization
+ * strategy (e.g., "NONE", "REDACTED", "SHA256", "MD5"), and it initializes
the
+ * anonymizer type accordingly.
+ * - The {@code anonymize} method applies the selected anonymization strategy
to
+ * the provided role and returns the anonymized value.
+ *
+ * Example:
+ * <pre>
+ * DefaultAuthenticationRoleLoggingAnonymizer roleAnonymizer =
+ * new DefaultAuthenticationRoleLoggingAnonymizer("SHA256");
+ * String anonymizedRole = roleAnonymizer.anonymize("admin");
+ * </pre>
+ *
+ * Anonymization strategies:
+ * - NONE: No anonymization (returns the role as-is).
+ * - REDACTED: Replaces the role with "[REDACTED]".
+ * - hash:SHA256: Hashes the role using the SHA-256 algorithm and prefixes it
with "SHA-256:".
+ * - hash:MD5: Hashes the role using the MD5 algorithm and prefixes it with
"MD5:"
+ */
+public final class DefaultAuthenticationRoleLoggingAnonymizer {
+ private static DefaultRoleAnonymizerType anonymizerType = NONE;
+
+ public DefaultAuthenticationRoleLoggingAnonymizer(String
authenticationRoleLoggingAnonymizer) {
+ if (authenticationRoleLoggingAnonymizer.startsWith("hash:")) {
+ anonymizerType =
DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer
+ .substring("hash:".length()).toUpperCase());
+ } else {
+ anonymizerType =
DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer);
+ }
+ }
+
+ public String anonymize(String role) {
+ return anonymizerType.anonymize(role);
+ }
+}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java
new file mode 100644
index 00000000000..3333b69cf22
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.configuration.anonymizer;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+public enum DefaultRoleAnonymizerType {
+ NONE {
+ @Override
+ public String anonymize(String role) {
+ return role;
+ }
+ },
+ REDACTED {
+ @Override
+ public String anonymize(String role) {
+ return REDACTED_VALUE;
+ }
+ },
+ SHA256 {
+ private static final String PREFIX = "SHA-256:";
+ private final MessageDigest digest;
+
+ {
+ // Initializing the MessageDigest once for SHA-256
+ try {
+ digest = MessageDigest.getInstance("SHA-256");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("SHA-256 algorithm not found", e);
+ }
+ }
+
+ @Override
+ public String anonymize(String role) {
+ byte[] hash = digest.digest(role.getBytes());
+ return PREFIX + Base64.getEncoder().encodeToString(hash);
+ }
+ },
+ MD5 {
+ private static final String PREFIX = "MD5:";
+ private final MessageDigest digest;
+
+ {
+ // Initializing the MessageDigest once for MD5
+ try {
+ // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient
for this use case&
+ digest = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("MD5 algorithm not found", e);
+ }
+ }
+
+ @Override
+ public String anonymize(String role) {
+ byte[] hash = digest.digest(role.getBytes());
+ return PREFIX + Base64.getEncoder().encodeToString(hash);
+ }
+ };
+
+ private static final String REDACTED_VALUE = "[REDACTED]";
+ public abstract String anonymize(String role);
+}
\ No newline at end of file
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java
new file mode 100644
index 00000000000..7b1c506bf72
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Pulsar Client API.
+ */
+package org.apache.pulsar.common.configuration.anonymizer;
+
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 44927c375b5..1c61d4c467f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -148,6 +148,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
+import
org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.Metadata;
@@ -215,6 +216,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private AuthData originalAuthDataCopy;
private boolean pendingAuthChallengeResponse = false;
private ScheduledFuture<?> authRefreshTask;
+ private final DefaultAuthenticationRoleLoggingAnonymizer
authenticationRoleLoggingAnonymizer;
// Max number of pending requests per connections. If multiple producers
are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write
spikes on the broker.
@@ -352,6 +354,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.brokerInterceptor = this.service != null ?
this.service.getInterceptor() : null;
this.throttleTracker = new ServerCnxThrottleTracker(this);
topicsPatternImplementation =
conf.getTopicsPatternRegexImplementation();
+ this.authenticationRoleLoggingAnonymizer = new
DefaultAuthenticationRoleLoggingAnonymizer(
+ conf.getAuthenticationRoleLoggingAnonymizer());
}
@Override
@@ -821,12 +825,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
clientVersion, clientProtoVersion, proxyVersion);
} else if (originalPrincipal != null) {
log.info("[{}] connected role={} and originalAuthRole={} using
authMethod={}, clientVersion={}, "
- + "clientProtocolVersion={}, proxyVersion={}",
remoteAddress, authRole, originalPrincipal,
- authMethod, clientVersion, clientProtoVersion,
proxyVersion);
+ + "clientProtocolVersion={}, proxyVersion={}",
remoteAddress,
+ authenticationRoleLoggingAnonymizer.anonymize(authRole),
+
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), authMethod,
clientVersion,
+ clientProtoVersion, proxyVersion);
} else {
log.info("[{}] connected with role={} using authMethod={},
clientVersion={}, clientProtocolVersion={}, "
- + "proxyVersion={}", remoteAddress, authRole,
authMethod, clientVersion, clientProtoVersion,
- proxyVersion);
+ + "proxyVersion={}", remoteAddress,
authenticationRoleLoggingAnonymizer.anonymize(authRole),
+ authMethod, clientVersion, clientProtoVersion,
proxyVersion);
}
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionCreated(this);
@@ -1214,7 +1220,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (log.isDebugEnabled()) {
log.debug("[{}] Handle subscribe command: auth role = {}, original
auth role = {}",
- remoteAddress, authRole, originalPrincipal);
+ remoteAddress,
authenticationRoleLoggingAnonymizer.anonymize(authRole),
+
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal));
}
final String subscriptionName = subscribe.getSubscription();
@@ -2433,11 +2440,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture,
(isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform
operation {} on namespace {}",
- originalPrincipal, operation, namespaceName);
+
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), operation,
namespaceName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on
namespace {}",
- authRole, operation, namespaceName);
+
authenticationRoleLoggingAnonymizer.anonymize(authRole), operation,
namespaceName);
}
return isProxyAuthorized && isAuthorized;
});
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index d89801d360b..6db1b302c66 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -295,6 +295,13 @@ public class ProxyConfiguration implements
PulsarConfiguration {
+ "is enabled.")
private Boolean webServiceLogDetailedAddresses;
+ @FieldContext(category = CATEGORY_SERVER, doc =
+ "Defines how the broker will anonymize the role and
originalAuthRole before logging. "
+ + "Possible values are: NONE (no anonymization), REDACTED
(replaces with '[REDACTED]'), "
+ + "hash:SHA256 (hashes using SHA-256), and hash:MD5
(hashes using MD5). Default is NONE."
+ )
+ private String authenticationRoleLoggingAnonymizer = "NONE";
+
@FieldContext(category = CATEGORY_SERVER,
doc = "Enables zero-copy transport of data across network
interfaces using the spice. "
+ "Zero copy mode cannot be used when TLS is enabled or
when proxyLogLevel is > 0.")
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 4f89f4bc17e..372d45ffe60 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -72,6 +72,7 @@ import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
+import
org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.Runnables;
@@ -120,6 +121,7 @@ public class ProxyConnection extends PulsarHandler {
private int protocolVersionToAdvertise;
private String proxyToBrokerUrl;
private HAProxyMessage haProxyMessage;
+ private final DefaultAuthenticationRoleLoggingAnonymizer
authenticationRoleLoggingAnonymizer;
protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024;
private static final byte[] EMPTY_CREDENTIALS = new byte[0];
@@ -161,6 +163,8 @@ public class ProxyConnection extends PulsarHandler {
this.state = State.Init;
this.brokerProxyValidator = service.getBrokerProxyValidator();
this.connectionController = proxyService.getConnectionController();
+ this.authenticationRoleLoggingAnonymizer = new
DefaultAuthenticationRoleLoggingAnonymizer(
+
proxyService.getConfiguration().getAuthenticationRoleLoggingAnonymizer());
}
@Override
@@ -343,8 +347,9 @@ public class ProxyConnection extends PulsarHandler {
private synchronized void completeConnect() throws PulsarClientException {
checkArgument(state == State.Connecting);
+ String maybeAnonymizedClientAuthRole =
authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole);
LOG.info("[{}] complete connection, init proxy handler. authenticated
with {} role {}, hasProxyToBrokerUrl: {}",
- remoteAddress, authMethod, clientAuthRole,
hasProxyToBrokerUrl);
+ remoteAddress, authMethod, maybeAnonymizedClientAuthRole,
hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
// Optimize proxy connection to fail-fast if the target broker
isn't active
// Pulsar client will retry connecting after a back off timeout
@@ -352,7 +357,7 @@ public class ProxyConnection extends PulsarHandler {
&& !isBrokerActive(proxyToBrokerUrl)) {
state = State.Closing;
LOG.warn("[{}] Target broker '{}' isn't available.
authenticated with {} role {}.",
- remoteAddress, proxyToBrokerUrl, authMethod,
clientAuthRole);
+ remoteAddress, proxyToBrokerUrl, authMethod,
maybeAnonymizedClientAuthRole);
final ByteBuf msg = Commands.newError(-1,
ServerError.ServiceNotReady, "Target broker isn't
available.");
writeAndFlushAndClose(msg);
@@ -371,10 +376,11 @@ public class ProxyConnection extends PulsarHandler {
LOG.warn("[{}] Target broker '{}' cannot be
validated. {}. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl,
targetAddressDeniedException.getMessage(),
- authMethod, clientAuthRole);
+ authMethod, maybeAnonymizedClientAuthRole);
} else {
LOG.error("[{}] Error validating target broker
'{}'. authenticated with {} role {}.",
- remoteAddress, proxyToBrokerUrl,
authMethod, clientAuthRole, throwable);
+ remoteAddress, proxyToBrokerUrl,
authMethod, maybeAnonymizedClientAuthRole,
+ throwable);
}
final ByteBuf msg = Commands.newError(-1,
ServerError.ServiceNotReady,
"Target broker cannot be validated.");
@@ -401,7 +407,7 @@ public class ProxyConnection extends PulsarHandler {
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())),
null);
} else {
LOG.error("BUG! Connection Pool has already been created for
proxy connection to {} state {} role {}",
- remoteAddress, state, clientAuthRole);
+ remoteAddress, state, maybeAnonymizedClientAuthRole);
}
state = State.ProxyLookupRequests;
@@ -488,7 +494,7 @@ public class ProxyConnection extends PulsarHandler {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {}
role {}",
- remoteAddress, authMethod, clientAuthRole);
+ remoteAddress, authMethod,
authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole));
}
// First connection