This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 76f2b7f32f8 [audit] Add audit logging configuration support with
cluster config listener (#16649)
76f2b7f32f8 is described below
commit 76f2b7f32f86021244f8c0f33e5c516e0128062a
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Thu Aug 21 14:05:51 2025 -0700
[audit] Add audit logging configuration support with cluster config
listener (#16649)
* [audit] Implement dynamic audit configuration updates
* [audit] Add timestamp to audit event logging
---
.../org/apache/pinot/common/audit/AuditConfig.java | 2 +-
.../pinot/common/audit/AuditConfigManager.java | 42 +++++-
.../org/apache/pinot/common/audit/AuditLogger.java | 2 +-
.../pinot/common/audit/AuditRequestProcessor.java | 4 +-
.../pinot/common/audit/AuditConfigManagerTest.java | 147 +++++++++++++++++++++
.../pinot/controller/BaseControllerStarter.java | 12 +-
6 files changed, 201 insertions(+), 8 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java
index 5b17fc83927..957b2be5195 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java
@@ -38,7 +38,7 @@ public final class AuditConfig {
@JsonProperty("capture.request.headers")
private boolean _captureRequestHeaders = false;
- @JsonProperty("max.payload.size")
+ @JsonProperty("payload.size.max.bytes")
private int _maxPayloadSize = 10_240;
@JsonProperty("excluded.endpoints")
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java
index 0d79cd86637..ea479072182 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java
@@ -18,10 +18,18 @@
*/
package org.apache.pinot.common.audit;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import javax.inject.Singleton;
+import org.apache.commons.configuration2.MapConfiguration;
import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.ClusterConfigChangeListener;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,12 +40,10 @@ import org.slf4j.LoggerFactory;
* Self-registers with the provided cluster config provider.
*/
@Singleton
-public final class AuditConfigManager {
-
+public final class AuditConfigManager implements ClusterConfigChangeListener {
private static final Logger LOG =
LoggerFactory.getLogger(AuditConfigManager.class);
- // TODO spyne Hardcoded stub code. Wire this up with ClusterConfiguration
- private final AuditConfig _currentConfig = new AuditConfig();
+ private AuditConfig _currentConfig = new AuditConfig();
/**
* Checks if the given endpoint should be excluded from audit logging.
@@ -99,6 +105,24 @@ public final class AuditConfigManager {
return false;
}
+ @VisibleForTesting
+ static AuditConfig buildFromClusterConfig(ClusterConfig clusterConfig) {
+ return
mapPrefixedConfigToObject(clusterConfig.getRecord().getSimpleFields(),
+ CommonConstants.AuditLogConstants.PREFIX, AuditConfig.class);
+ }
+
+ /**
+ * Maps cluster configuration properties with a common prefix to a POJO
using Jackson.
+ * Uses PinotConfiguration.subset() to extract properties with the given
prefix and
+ * Jackson's convertValue() for automatic object mapping.
+ */
+ private static <T> T mapPrefixedConfigToObject(Map<String, String>
clusterConfigs, String prefix,
+ Class<T> configClass) {
+ final MapConfiguration mapConfig = new MapConfiguration(clusterConfigs);
+ final PinotConfiguration subsetConfig = new
PinotConfiguration(mapConfig).subset(prefix);
+ return AuditLogger.OBJECT_MAPPER.convertValue(subsetConfig.toMap(),
configClass);
+ }
+
/**
* Gets the current audit configuration.
* This method is thread-safe and lock-free.
@@ -128,4 +152,14 @@ public final class AuditConfigManager {
public boolean isEndpointExcluded(String endpoint) {
return isEndpointExcluded(endpoint, _currentConfig.getExcludedEndpoints());
}
+
+ @Override
+ public void onClusterConfigChange(ClusterConfig clusterConfig,
NotificationContext context) {
+ try {
+ _currentConfig = buildFromClusterConfig(clusterConfig);
+ LOG.info("Successfully updated audit configuration");
+ } catch (Exception e) {
+ LOG.error("Failed to update audit configuration from cluster configs",
e);
+ }
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditLogger.java
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditLogger.java
index b695959590e..211b6435eb8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditLogger.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditLogger.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
public final class AuditLogger {
private static final String PINOT_AUDIT_LOGGER_NAME =
"org.apache.pinot.audit";
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
// Default Pinot logger. For logging failures in audit logging itself
private static final Logger LOG = LoggerFactory.getLogger(AuditLogger.class);
private static final Logger AUDIT_LOGGER =
LoggerFactory.getLogger(PINOT_AUDIT_LOGGER_NAME);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java
index 30b3e4878bd..fea1cd69840 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.audit;
+import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -82,7 +83,8 @@ public class AuditRequestProcessor {
}
// Log the audit event (service ID will be extracted from headers, not
config)
- return new AuditEvent().setEndpoint(endpoint)
+ return new AuditEvent().setTimestamp(Instant.now().toString())
+ .setEndpoint(endpoint)
.setServiceId(extractServiceId(requestContext))
.setMethod(requestContext.getMethod())
.setOriginIpAddress(extractClientIpAddress(requestContext,
remoteAddr))
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/audit/AuditConfigManagerTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/audit/AuditConfigManagerTest.java
new file mode 100644
index 00000000000..7e762f98dbf
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/audit/AuditConfigManagerTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.audit;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.model.ClusterConfig;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+/**
+ * Unit tests for AuditConfigManager to verify correct config injection
+ * when onClusterConfigChange is called.
+ */
+public class AuditConfigManagerTest {
+
+ @Test
+ public void testOnClusterConfigChangeWithAllConfigs() {
+ // Given
+ ClusterConfig clusterConfig = createClusterConfig(
+ Map.of("pinot.audit.enabled", "true",
"pinot.audit.capture.request.payload.enabled", "true",
+ "pinot.audit.capture.request.headers", "true",
"pinot.audit.payload.size.max.bytes", "20480",
+ "pinot.audit.excluded.endpoints", "/health,/metrics"));
+
+ AuditConfigManager manager = new AuditConfigManager();
+
+ // When
+ manager.onClusterConfigChange(clusterConfig, null);
+
+ // Then
+ AuditConfig config = manager.getCurrentConfig();
+ assertThat(config.isEnabled()).isTrue();
+ assertThat(config.isCaptureRequestPayload()).isTrue();
+ assertThat(config.isCaptureRequestHeaders()).isTrue();
+ assertThat(config.getMaxPayloadSize()).isEqualTo(20480);
+ assertThat(config.getExcludedEndpoints()).isEqualTo("/health,/metrics");
+ }
+
+ @Test
+ public void testOnClusterConfigChangeWithPartialConfigs() {
+ // Given
+ ClusterConfig clusterConfig =
+ createClusterConfig(Map.of("pinot.audit.enabled", "true",
"pinot.audit.payload.size.max.bytes", "5000"));
+
+ AuditConfigManager manager = new AuditConfigManager();
+
+ // When
+ manager.onClusterConfigChange(clusterConfig, null);
+
+ // Then
+ AuditConfig config = manager.getCurrentConfig();
+ assertThat(config.isEnabled()).isTrue();
+ assertThat(config.getMaxPayloadSize()).isEqualTo(5000);
+ // Verify defaults for unspecified configs
+ assertThat(config.isCaptureRequestPayload()).isFalse();
+ assertThat(config.isCaptureRequestHeaders()).isFalse();
+ assertThat(config.getExcludedEndpoints()).isEmpty();
+ }
+
+ @Test
+ public void testOnClusterConfigChangeWithNoAuditConfigs() {
+ // Given
+ ClusterConfig clusterConfig = createClusterConfig(Map.of());
+ AuditConfigManager manager = new AuditConfigManager();
+
+ // When
+ manager.onClusterConfigChange(clusterConfig, null);
+
+ // Then
+ AuditConfig config = manager.getCurrentConfig();
+ assertThat(config.isEnabled()).isFalse();
+ assertThat(config.isCaptureRequestPayload()).isFalse();
+ assertThat(config.isCaptureRequestHeaders()).isFalse();
+ assertThat(config.getMaxPayloadSize()).isEqualTo(10240);
+ assertThat(config.getExcludedEndpoints()).isEmpty();
+ }
+
+ @Test
+ public void testConfigUpdateOverwritesPrevious() {
+ // Given
+ AuditConfigManager manager = new AuditConfigManager();
+
+ // Set initial config
+ ClusterConfig initialConfig =
+ createClusterConfig(Map.of("pinot.audit.enabled", "true",
"pinot.audit.payload.size.max.bytes", "15000"));
+ manager.onClusterConfigChange(initialConfig, null);
+ assertThat(manager.getCurrentConfig().isEnabled()).isTrue();
+
assertThat(manager.getCurrentConfig().getMaxPayloadSize()).isEqualTo(15000);
+
+ // When - Update with new config
+ ClusterConfig updatedConfig =
+ createClusterConfig(Map.of("pinot.audit.enabled", "false",
"pinot.audit.payload.size.max.bytes", "25000"));
+ manager.onClusterConfigChange(updatedConfig, null);
+
+ // Then
+ assertThat(manager.getCurrentConfig().isEnabled()).isFalse();
+
assertThat(manager.getCurrentConfig().getMaxPayloadSize()).isEqualTo(25000);
+ }
+
+ @Test
+ public void testBuildFromClusterConfigDirectly() {
+ // Given
+ ClusterConfig clusterConfig = createClusterConfig(
+ Map.of("pinot.audit.enabled", "true",
"pinot.audit.capture.request.payload.enabled", "false",
+ "pinot.audit.capture.request.headers", "true"));
+
+ // When
+ AuditConfig config =
AuditConfigManager.buildFromClusterConfig(clusterConfig);
+
+ // Then
+ assertThat(config.isEnabled()).isTrue();
+ assertThat(config.isCaptureRequestPayload()).isFalse();
+ assertThat(config.isCaptureRequestHeaders()).isTrue();
+ // Verify defaults for unspecified fields
+ assertThat(config.getMaxPayloadSize()).isEqualTo(10240);
+ assertThat(config.getExcludedEndpoints()).isEmpty();
+ }
+
+ // Helper method to create ClusterConfig with given properties
+ private ClusterConfig createClusterConfig(Map<String, String> properties) {
+ ClusterConfig clusterConfig = new ClusterConfig("testCluster");
+ Map<String, String> allProperties = new HashMap<>(properties);
+ // Add some non-audit configs to verify filtering works
+ allProperties.put("some.other.config", "value");
+ allProperties.put("another.config", "123");
+ clusterConfig.getRecord().setSimpleFields(allProperties);
+ return clusterConfig;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 72d8737f93a..d175a57c246 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -220,6 +220,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected ResourceUtilizationManager _resourceUtilizationManager;
protected RebalancePreChecker _rebalancePreChecker;
protected TableRebalanceManager _tableRebalanceManager;
+ protected AuditConfigManager _auditConfigManager;
@Override
public void init(PinotConfiguration pinotConfiguration)
@@ -542,6 +543,15 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
// TODO: Need to put this inside HelixResourceManager when
HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
+ // Initialize audit config manager and register as cluster config listener
+ LOGGER.info("Initializing audit config manager");
+ _auditConfigManager = new AuditConfigManager();
+ try {
+
_helixParticipantManager.addClusterfigChangeListener(_auditConfigManager);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to register to cluster config", e);
+ }
+
SegmentCompletionConfig segmentCompletionConfig = new
SegmentCompletionConfig(_config);
_segmentCompletionManager =
@@ -630,7 +640,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
bind(AuditRequestProcessor.class).to(AuditRequestProcessor.class);
- bind(AuditConfigManager.class).to(AuditConfigManager.class);
+ bind(_auditConfigManager).to(AuditConfigManager.class);
String loggerRootDir =
_config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
if (loggerRootDir != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]