This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 76f2b7f32f8 [audit] Add audit logging configuration support with 
cluster config listener (#16649)
76f2b7f32f8 is described below

commit 76f2b7f32f86021244f8c0f33e5c516e0128062a
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Thu Aug 21 14:05:51 2025 -0700

    [audit] Add audit logging configuration support with cluster config 
listener (#16649)
    
    * [audit] Implement dynamic audit configuration updates
    
    * [audit] Add timestamp to audit event logging
---
 .../org/apache/pinot/common/audit/AuditConfig.java |   2 +-
 .../pinot/common/audit/AuditConfigManager.java     |  42 +++++-
 .../org/apache/pinot/common/audit/AuditLogger.java |   2 +-
 .../pinot/common/audit/AuditRequestProcessor.java  |   4 +-
 .../pinot/common/audit/AuditConfigManagerTest.java | 147 +++++++++++++++++++++
 .../pinot/controller/BaseControllerStarter.java    |  12 +-
 6 files changed, 201 insertions(+), 8 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java 
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java
index 5b17fc83927..957b2be5195 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java
@@ -38,7 +38,7 @@ public final class AuditConfig {
   @JsonProperty("capture.request.headers")
   private boolean _captureRequestHeaders = false;
 
-  @JsonProperty("max.payload.size")
+  @JsonProperty("payload.size.max.bytes")
   private int _maxPayloadSize = 10_240;
 
   @JsonProperty("excluded.endpoints")
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java
index 0d79cd86637..ea479072182 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java
@@ -18,10 +18,18 @@
  */
 package org.apache.pinot.common.audit;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import javax.inject.Singleton;
+import org.apache.commons.configuration2.MapConfiguration;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.ClusterConfigChangeListener;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,12 +40,10 @@ import org.slf4j.LoggerFactory;
  * Self-registers with the provided cluster config provider.
  */
 @Singleton
-public final class AuditConfigManager {
-
+public final class AuditConfigManager implements ClusterConfigChangeListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(AuditConfigManager.class);
 
-  // TODO spyne Hardcoded stub code. Wire this up with ClusterConfiguration
-  private final AuditConfig _currentConfig = new AuditConfig();
+  private AuditConfig _currentConfig = new AuditConfig();
 
   /**
    * Checks if the given endpoint should be excluded from audit logging.
@@ -99,6 +105,24 @@ public final class AuditConfigManager {
     return false;
   }
 
+  @VisibleForTesting
+  static AuditConfig buildFromClusterConfig(ClusterConfig clusterConfig) {
+    return 
mapPrefixedConfigToObject(clusterConfig.getRecord().getSimpleFields(),
+        CommonConstants.AuditLogConstants.PREFIX, AuditConfig.class);
+  }
+
+  /**
+   * Maps cluster configuration properties with a common prefix to a POJO 
using Jackson.
+   * Uses PinotConfiguration.subset() to extract properties with the given 
prefix and
+   * Jackson's convertValue() for automatic object mapping.
+   */
+  private static <T> T mapPrefixedConfigToObject(Map<String, String> 
clusterConfigs, String prefix,
+      Class<T> configClass) {
+    final MapConfiguration mapConfig = new MapConfiguration(clusterConfigs);
+    final PinotConfiguration subsetConfig = new 
PinotConfiguration(mapConfig).subset(prefix);
+    return AuditLogger.OBJECT_MAPPER.convertValue(subsetConfig.toMap(), 
configClass);
+  }
+
   /**
    * Gets the current audit configuration.
    * This method is thread-safe and lock-free.
@@ -128,4 +152,14 @@ public final class AuditConfigManager {
   public boolean isEndpointExcluded(String endpoint) {
     return isEndpointExcluded(endpoint, _currentConfig.getExcludedEndpoints());
   }
+
+  @Override
+  public void onClusterConfigChange(ClusterConfig clusterConfig, 
NotificationContext context) {
+    try {
+      _currentConfig = buildFromClusterConfig(clusterConfig);
+      LOG.info("Successfully updated audit configuration");
+    } catch (Exception e) {
+      LOG.error("Failed to update audit configuration from cluster configs", 
e);
+    }
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditLogger.java 
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditLogger.java
index b695959590e..211b6435eb8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditLogger.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditLogger.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 public final class AuditLogger {
 
   private static final String PINOT_AUDIT_LOGGER_NAME = 
"org.apache.pinot.audit";
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   // Default Pinot logger. For logging failures in audit logging itself
   private static final Logger LOG = LoggerFactory.getLogger(AuditLogger.class);
   private static final Logger AUDIT_LOGGER = 
LoggerFactory.getLogger(PINOT_AUDIT_LOGGER_NAME);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java
index 30b3e4878bd..fea1cd69840 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.audit;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -82,7 +83,8 @@ public class AuditRequestProcessor {
       }
 
       // Log the audit event (service ID will be extracted from headers, not 
config)
-      return new AuditEvent().setEndpoint(endpoint)
+      return new AuditEvent().setTimestamp(Instant.now().toString())
+          .setEndpoint(endpoint)
           .setServiceId(extractServiceId(requestContext))
           .setMethod(requestContext.getMethod())
           .setOriginIpAddress(extractClientIpAddress(requestContext, 
remoteAddr))
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/audit/AuditConfigManagerTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/audit/AuditConfigManagerTest.java
new file mode 100644
index 00000000000..7e762f98dbf
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/audit/AuditConfigManagerTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.audit;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.model.ClusterConfig;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+/**
+ * Unit tests for AuditConfigManager to verify correct config injection
+ * when onClusterConfigChange is called.
+ */
+public class AuditConfigManagerTest {
+
+  @Test
+  public void testOnClusterConfigChangeWithAllConfigs() {
+    // Given
+    ClusterConfig clusterConfig = createClusterConfig(
+        Map.of("pinot.audit.enabled", "true", 
"pinot.audit.capture.request.payload.enabled", "true",
+            "pinot.audit.capture.request.headers", "true", 
"pinot.audit.payload.size.max.bytes", "20480",
+            "pinot.audit.excluded.endpoints", "/health,/metrics"));
+
+    AuditConfigManager manager = new AuditConfigManager();
+
+    // When
+    manager.onClusterConfigChange(clusterConfig, null);
+
+    // Then
+    AuditConfig config = manager.getCurrentConfig();
+    assertThat(config.isEnabled()).isTrue();
+    assertThat(config.isCaptureRequestPayload()).isTrue();
+    assertThat(config.isCaptureRequestHeaders()).isTrue();
+    assertThat(config.getMaxPayloadSize()).isEqualTo(20480);
+    assertThat(config.getExcludedEndpoints()).isEqualTo("/health,/metrics");
+  }
+
+  @Test
+  public void testOnClusterConfigChangeWithPartialConfigs() {
+    // Given
+    ClusterConfig clusterConfig =
+        createClusterConfig(Map.of("pinot.audit.enabled", "true", 
"pinot.audit.payload.size.max.bytes", "5000"));
+
+    AuditConfigManager manager = new AuditConfigManager();
+
+    // When
+    manager.onClusterConfigChange(clusterConfig, null);
+
+    // Then
+    AuditConfig config = manager.getCurrentConfig();
+    assertThat(config.isEnabled()).isTrue();
+    assertThat(config.getMaxPayloadSize()).isEqualTo(5000);
+    // Verify defaults for unspecified configs
+    assertThat(config.isCaptureRequestPayload()).isFalse();
+    assertThat(config.isCaptureRequestHeaders()).isFalse();
+    assertThat(config.getExcludedEndpoints()).isEmpty();
+  }
+
+  @Test
+  public void testOnClusterConfigChangeWithNoAuditConfigs() {
+    // Given
+    ClusterConfig clusterConfig = createClusterConfig(Map.of());
+    AuditConfigManager manager = new AuditConfigManager();
+
+    // When
+    manager.onClusterConfigChange(clusterConfig, null);
+
+    // Then
+    AuditConfig config = manager.getCurrentConfig();
+    assertThat(config.isEnabled()).isFalse();
+    assertThat(config.isCaptureRequestPayload()).isFalse();
+    assertThat(config.isCaptureRequestHeaders()).isFalse();
+    assertThat(config.getMaxPayloadSize()).isEqualTo(10240);
+    assertThat(config.getExcludedEndpoints()).isEmpty();
+  }
+
+  @Test
+  public void testConfigUpdateOverwritesPrevious() {
+    // Given
+    AuditConfigManager manager = new AuditConfigManager();
+
+    // Set initial config
+    ClusterConfig initialConfig =
+        createClusterConfig(Map.of("pinot.audit.enabled", "true", 
"pinot.audit.payload.size.max.bytes", "15000"));
+    manager.onClusterConfigChange(initialConfig, null);
+    assertThat(manager.getCurrentConfig().isEnabled()).isTrue();
+    
assertThat(manager.getCurrentConfig().getMaxPayloadSize()).isEqualTo(15000);
+
+    // When - Update with new config
+    ClusterConfig updatedConfig =
+        createClusterConfig(Map.of("pinot.audit.enabled", "false", 
"pinot.audit.payload.size.max.bytes", "25000"));
+    manager.onClusterConfigChange(updatedConfig, null);
+
+    // Then
+    assertThat(manager.getCurrentConfig().isEnabled()).isFalse();
+    
assertThat(manager.getCurrentConfig().getMaxPayloadSize()).isEqualTo(25000);
+  }
+
+  @Test
+  public void testBuildFromClusterConfigDirectly() {
+    // Given
+    ClusterConfig clusterConfig = createClusterConfig(
+        Map.of("pinot.audit.enabled", "true", 
"pinot.audit.capture.request.payload.enabled", "false",
+            "pinot.audit.capture.request.headers", "true"));
+
+    // When
+    AuditConfig config = 
AuditConfigManager.buildFromClusterConfig(clusterConfig);
+
+    // Then
+    assertThat(config.isEnabled()).isTrue();
+    assertThat(config.isCaptureRequestPayload()).isFalse();
+    assertThat(config.isCaptureRequestHeaders()).isTrue();
+    // Verify defaults for unspecified fields
+    assertThat(config.getMaxPayloadSize()).isEqualTo(10240);
+    assertThat(config.getExcludedEndpoints()).isEmpty();
+  }
+
+  // Helper method to create ClusterConfig with given properties
+  private ClusterConfig createClusterConfig(Map<String, String> properties) {
+    ClusterConfig clusterConfig = new ClusterConfig("testCluster");
+    Map<String, String> allProperties = new HashMap<>(properties);
+    // Add some non-audit configs to verify filtering works
+    allProperties.put("some.other.config", "value");
+    allProperties.put("another.config", "123");
+    clusterConfig.getRecord().setSimpleFields(allProperties);
+    return clusterConfig;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 72d8737f93a..d175a57c246 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -220,6 +220,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected ResourceUtilizationManager _resourceUtilizationManager;
   protected RebalancePreChecker _rebalancePreChecker;
   protected TableRebalanceManager _tableRebalanceManager;
+  protected AuditConfigManager _auditConfigManager;
 
   @Override
   public void init(PinotConfiguration pinotConfiguration)
@@ -542,6 +543,15 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     // TODO: Need to put this inside HelixResourceManager when 
HelixControllerLeadershipManager is removed.
     
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
 
+    // Initialize audit config manager and register as cluster config listener
+    LOGGER.info("Initializing audit config manager");
+    _auditConfigManager = new AuditConfigManager();
+    try {
+      
_helixParticipantManager.addClusterfigChangeListener(_auditConfigManager);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to register to cluster config", e);
+    }
+
     SegmentCompletionConfig segmentCompletionConfig = new 
SegmentCompletionConfig(_config);
 
     _segmentCompletionManager =
@@ -630,7 +640,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
         
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
         bind(AuditRequestProcessor.class).to(AuditRequestProcessor.class);
-        bind(AuditConfigManager.class).to(AuditConfigManager.class);
+        bind(_auditConfigManager).to(AuditConfigManager.class);
 
         String loggerRootDir = 
_config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
         if (loggerRootDir != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to