somandal commented on code in PR #16616: URL: https://github.com/apache/pinot/pull/16616#discussion_r2285860930
########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java: ########## @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.UriInfo; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class for extracting audit information from Jersey HTTP requests. + * Handles all the complex logic for IP address extraction, user identification, + * and request payload capture for audit logging purposes. + * Uses dynamic configuration to control audit behavior. + */ +@Singleton +public class AuditRequestProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AuditRequestProcessor.class); + private static final String ANONYMOUS = "anonymous"; + + @Inject + private AuditConfigManager _configManager; + + public AuditEvent processRequest(ContainerRequestContext requestContext, String remoteAddr) { + // Check if auditing is enabled (if config manager is available) + if (!isEnabled()) { + return null; + } + + try { + UriInfo uriInfo = requestContext.getUriInfo(); + String endpoint = uriInfo.getPath(); + + // Check endpoint exclusions + if (_configManager.isEndpointExcluded(endpoint)) { + return null; + } + + String method = requestContext.getMethod(); + String originIpAddress = extractClientIpAddress(requestContext, remoteAddr); + String userId = extractUserId(requestContext); + + // Capture request payload based on configuration + Object requestPayload = captureRequestPayload(requestContext); + + // Log the audit event (service ID will be extracted from headers, not config) + return new AuditEvent(extractServiceId(requestContext), endpoint, method, originIpAddress, userId, + requestPayload); + } catch (Exception e) { + // Graceful degradation: Never let audit logging failures affect the main request + LOG.warn("Failed to process audit logging for request", e); + } + return null; + } + + public boolean isEnabled() { + return _configManager.isEnabled(); + } + + /** + * Extracts the client IP address from the request. + * Checks common proxy headers before falling back to remote address. + * + * @param requestContext the container request context + * @param remoteAddr the remote address from the underlying request + * @return the client IP address + */ + private String extractClientIpAddress(ContainerRequestContext requestContext, String remoteAddr) { + try { + // Check for proxy headers first + String xForwardedFor = requestContext.getHeaderString("X-Forwarded-For"); + if (StringUtils.isNotBlank(xForwardedFor)) { + // X-Forwarded-For can contain multiple IPs, take the first one + return xForwardedFor.split(",")[0].trim(); + } + + String xRealIp = requestContext.getHeaderString("X-Real-IP"); + if (StringUtils.isNotBlank(xRealIp)) { + return xRealIp.trim(); + } + + // Fall back to remote address + return remoteAddr; + } catch (Exception e) { + LOG.debug("Failed to extract client IP address", e); + return "unknown"; + } + } + + /** + * Extracts user ID from request headers. + * Looks for common authentication headers. + * + * @param requestContext the container request context + * @return the user ID or "anonymous" if not found + */ + private String extractUserId(ContainerRequestContext requestContext) { + try { + // Check for common user identification headers + String authHeader = requestContext.getHeaderString("Authorization"); + if (StringUtils.isNotBlank(authHeader)) { + // For basic auth, extract username; for bearer tokens, use a placeholder + if (authHeader.startsWith("Basic ")) { + // Could decode basic auth to get username, but for security keep it as placeholder + return "basic-auth-user"; + } else if (authHeader.startsWith("Bearer ")) { + return "bearer-token-user"; + } + } + + // Check for custom user headers + String userHeader = requestContext.getHeaderString("X-User-ID"); + if (StringUtils.isNotBlank(userHeader)) { + return userHeader.trim(); + } + + userHeader = requestContext.getHeaderString("X-Username"); + if (StringUtils.isNotBlank(userHeader)) { + return userHeader.trim(); + } + + return ANONYMOUS; + } catch (Exception e) { + LOG.debug("Failed to extract user ID", e); + return ANONYMOUS; + } + } + + /** + * Extracts service ID from request headers. + * Service ID should be provided by the client in headers, not from configuration. + * + * @param requestContext the container request context + * @return the service ID or "unknown" if not found + */ + private String extractServiceId(ContainerRequestContext requestContext) { + try { + // Check for custom service ID headers + String serviceId = requestContext.getHeaderString("X-Service-ID"); + if (StringUtils.isNotBlank(serviceId)) { + return serviceId.trim(); + } + + serviceId = requestContext.getHeaderString("X-Service-Name"); + if (StringUtils.isNotBlank(serviceId)) { + return serviceId.trim(); + } + + return "unknown"; + } catch (Exception e) { + LOG.debug("Failed to extract service ID", e); + return "unknown"; + } + } + + /** + * Captures the request payload for audit logging based on configuration. + * Uses dynamic configuration to control what data is captured. + * + * @param requestContext the request context + * @return the captured request payload + */ + private Object captureRequestPayload(ContainerRequestContext requestContext) { + // Get current configuration (fallback to defaults if no config manager) + AuditConfig config = _configManager != null ? _configManager.getCurrentConfig() : new AuditConfig(); + + Map<String, Object> payload = new HashMap<>(); + + try { + // Always capture query parameters (lightweight) + UriInfo uriInfo = requestContext.getUriInfo(); + MultivaluedMap<String, String> queryParams = uriInfo.getQueryParameters(); + if (!queryParams.isEmpty()) { + Map<String, Object> queryMap = new HashMap<>(); + for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) { + List<String> values = entry.getValue(); + if (values.size() == 1) { + queryMap.put(entry.getKey(), values.get(0)); + } else { + queryMap.put(entry.getKey(), values); + } + } + payload.put("queryParameters", queryMap); + } + + // Conditionally capture request body based on configuration + if (config.isCaptureRequestPayload() && requestContext.hasEntity()) { + String requestBody = readRequestBody(requestContext, config.getMaxPayloadSize()); + if (StringUtils.isNotBlank(requestBody)) { + payload.put("body", requestBody); + } + } + + // Conditionally capture headers based on configuration + if (config.isCaptureRequestHeaders()) { + MultivaluedMap<String, String> headers = requestContext.getHeaders(); + if (!headers.isEmpty()) { + Map<String, String> headerMap = new HashMap<>(); + for (Map.Entry<String, List<String>> entry : headers.entrySet()) { + String headerName = entry.getKey().toLowerCase(); + // Skip sensitive headers + if (!headerName.contains("auth") && !headerName.contains("password") && !headerName.contains("token") + && !headerName.contains("secret")) { + List<String> values = entry.getValue(); + if (!values.isEmpty()) { + headerMap.put(entry.getKey(), values.get(0)); + } + } + } + if (!headerMap.isEmpty()) { + payload.put("headers", headerMap); + } + } + } + } catch (Exception e) { + LOG.debug("Failed to capture request payload", e); + payload.put("error", "Failed to capture payload: " + e.getMessage()); + } + + return payload.isEmpty() ? null : payload; + } + + /** + * Reads the request body from the entity input stream. + * Restores the input stream for downstream processing. + * Limits the amount of data read based on configuration. + * + * @param requestContext the request context + * @param maxPayloadSize maximum bytes to read from the request body + * @return the request body as string (potentially truncated) + */ + private String readRequestBody(ContainerRequestContext requestContext, int maxPayloadSize) { + try { + InputStream entityStream = requestContext.getEntityStream(); + if (entityStream == null) { + return null; + } + + // Read the stream content with size limit + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + byte[] data = new byte[8192]; + int bytesRead; + int totalBytesRead = 0; + boolean truncated = false; + + while ((bytesRead = entityStream.read(data, 0, data.length)) != -1) { + if (totalBytesRead + bytesRead > maxPayloadSize) { + // Truncate to max payload size + int remainingBytes = maxPayloadSize - totalBytesRead; + if (remainingBytes > 0) { + buffer.write(data, 0, remainingBytes); + } + truncated = true; + break; + } + buffer.write(data, 0, bytesRead); + totalBytesRead += bytesRead; + } + + byte[] requestBodyBytes = buffer.toByteArray(); + String requestBody = new String(requestBodyBytes, StandardCharsets.UTF_8); + + // Add truncation indicator if needed + if (truncated) { + requestBody += " [TRUNCATED - exceeds " + maxPayloadSize + " byte limit]"; + } + + // Restore the input stream for downstream processing + // Need to read the entire original stream to restore it properly + if (truncated) { + // Read remaining bytes to fully consume the original stream + while (entityStream.read(data) != -1) { + // Consume remaining bytes + } + // Create new stream with original data (this is complex, for now keep the truncated version) + requestContext.setEntityStream(new ByteArrayInputStream(requestBodyBytes)); + } else { + requestContext.setEntityStream(new ByteArrayInputStream(requestBodyBytes)); + } + + return requestBody; + } catch (IOException e) { + LOG.debug("Failed to read request body", e); + return "Failed to read request body: " + e.getMessage(); Review Comment: nit: can avoid duplication by storing the string in a variable and reusing it ########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java: ########## @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.UriInfo; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class for extracting audit information from Jersey HTTP requests. + * Handles all the complex logic for IP address extraction, user identification, + * and request payload capture for audit logging purposes. + * Uses dynamic configuration to control audit behavior. + */ +@Singleton +public class AuditRequestProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AuditRequestProcessor.class); + private static final String ANONYMOUS = "anonymous"; + + @Inject + private AuditConfigManager _configManager; + + public AuditEvent processRequest(ContainerRequestContext requestContext, String remoteAddr) { + // Check if auditing is enabled (if config manager is available) + if (!isEnabled()) { + return null; + } + + try { + UriInfo uriInfo = requestContext.getUriInfo(); + String endpoint = uriInfo.getPath(); + + // Check endpoint exclusions + if (_configManager.isEndpointExcluded(endpoint)) { + return null; + } + + String method = requestContext.getMethod(); + String originIpAddress = extractClientIpAddress(requestContext, remoteAddr); + String userId = extractUserId(requestContext); + + // Capture request payload based on configuration + Object requestPayload = captureRequestPayload(requestContext); + + // Log the audit event (service ID will be extracted from headers, not config) + return new AuditEvent(extractServiceId(requestContext), endpoint, method, originIpAddress, userId, + requestPayload); + } catch (Exception e) { + // Graceful degradation: Never let audit logging failures affect the main request + LOG.warn("Failed to process audit logging for request", e); + } + return null; + } + + public boolean isEnabled() { + return _configManager.isEnabled(); + } + + /** + * Extracts the client IP address from the request. + * Checks common proxy headers before falling back to remote address. + * + * @param requestContext the container request context + * @param remoteAddr the remote address from the underlying request + * @return the client IP address + */ + private String extractClientIpAddress(ContainerRequestContext requestContext, String remoteAddr) { + try { + // Check for proxy headers first + String xForwardedFor = requestContext.getHeaderString("X-Forwarded-For"); + if (StringUtils.isNotBlank(xForwardedFor)) { + // X-Forwarded-For can contain multiple IPs, take the first one + return xForwardedFor.split(",")[0].trim(); + } + + String xRealIp = requestContext.getHeaderString("X-Real-IP"); + if (StringUtils.isNotBlank(xRealIp)) { + return xRealIp.trim(); + } + + // Fall back to remote address + return remoteAddr; + } catch (Exception e) { + LOG.debug("Failed to extract client IP address", e); + return "unknown"; + } + } + + /** + * Extracts user ID from request headers. + * Looks for common authentication headers. + * + * @param requestContext the container request context + * @return the user ID or "anonymous" if not found + */ + private String extractUserId(ContainerRequestContext requestContext) { + try { + // Check for common user identification headers + String authHeader = requestContext.getHeaderString("Authorization"); + if (StringUtils.isNotBlank(authHeader)) { + // For basic auth, extract username; for bearer tokens, use a placeholder + if (authHeader.startsWith("Basic ")) { + // Could decode basic auth to get username, but for security keep it as placeholder + return "basic-auth-user"; + } else if (authHeader.startsWith("Bearer ")) { + return "bearer-token-user"; + } + } + + // Check for custom user headers + String userHeader = requestContext.getHeaderString("X-User-ID"); + if (StringUtils.isNotBlank(userHeader)) { + return userHeader.trim(); + } + + userHeader = requestContext.getHeaderString("X-Username"); + if (StringUtils.isNotBlank(userHeader)) { + return userHeader.trim(); + } + + return ANONYMOUS; + } catch (Exception e) { + LOG.debug("Failed to extract user ID", e); + return ANONYMOUS; + } + } + + /** + * Extracts service ID from request headers. + * Service ID should be provided by the client in headers, not from configuration. + * + * @param requestContext the container request context + * @return the service ID or "unknown" if not found + */ + private String extractServiceId(ContainerRequestContext requestContext) { + try { + // Check for custom service ID headers + String serviceId = requestContext.getHeaderString("X-Service-ID"); + if (StringUtils.isNotBlank(serviceId)) { + return serviceId.trim(); + } + + serviceId = requestContext.getHeaderString("X-Service-Name"); + if (StringUtils.isNotBlank(serviceId)) { + return serviceId.trim(); + } + + return "unknown"; + } catch (Exception e) { + LOG.debug("Failed to extract service ID", e); + return "unknown"; + } + } + + /** + * Captures the request payload for audit logging based on configuration. + * Uses dynamic configuration to control what data is captured. + * + * @param requestContext the request context + * @return the captured request payload + */ + private Object captureRequestPayload(ContainerRequestContext requestContext) { + // Get current configuration (fallback to defaults if no config manager) + AuditConfig config = _configManager != null ? _configManager.getCurrentConfig() : new AuditConfig(); Review Comment: do we expect `_configManager` to be null? if so, won't other functions in this class need null handling as well? ########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java: ########## @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import java.util.HashSet; +import java.util.Set; +import javax.inject.Singleton; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Thread-safe configuration manager for audit logging settings. + * Handles dynamic configuration updates from cluster configuration changes. + * Self-registers with the provided cluster config provider. + */ +@Singleton +public final class AuditConfigManager { + + private static final Logger LOG = LoggerFactory.getLogger(AuditConfigManager.class); + + private final AuditConfig _currentConfig = new AuditConfig(); + + /** + * Checks if the given endpoint should be excluded from audit logging. + * Supports simple wildcard matching with '*' character. + */ + public static boolean isEndpointExcluded(String endpoint, String excludedEndpointsString) { Review Comment: do we expect use cases where callers pass their own `excludedEndpointsString` rather than using the one in the `AuditConfig`? ########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java: ########## @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import java.util.HashSet; +import java.util.Set; +import javax.inject.Singleton; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Thread-safe configuration manager for audit logging settings. + * Handles dynamic configuration updates from cluster configuration changes. + * Self-registers with the provided cluster config provider. + */ +@Singleton +public final class AuditConfigManager { + + private static final Logger LOG = LoggerFactory.getLogger(AuditConfigManager.class); + + private final AuditConfig _currentConfig = new AuditConfig(); Review Comment: is my understanding correct that this is just doing this for now as the code is disabled? in the future this will be registered with the config change listener with Helix and then the AuditConfig may be changeable. And that this will also be initialized with the controller config on start up? ########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfig.java: ########## @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + + +/** + * Pure data class for audit logging configuration. + * Uses Jackson annotations for automatic JSON mapping. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public final class AuditConfig { Review Comment: do you want to add a constructor for this class? ########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java: ########## @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.UriInfo; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class for extracting audit information from Jersey HTTP requests. + * Handles all the complex logic for IP address extraction, user identification, + * and request payload capture for audit logging purposes. + * Uses dynamic configuration to control audit behavior. + */ +@Singleton +public class AuditRequestProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AuditRequestProcessor.class); + private static final String ANONYMOUS = "anonymous"; + + @Inject + private AuditConfigManager _configManager; + + public AuditEvent processRequest(ContainerRequestContext requestContext, String remoteAddr) { + // Check if auditing is enabled (if config manager is available) + if (!isEnabled()) { + return null; + } + + try { + UriInfo uriInfo = requestContext.getUriInfo(); + String endpoint = uriInfo.getPath(); + + // Check endpoint exclusions + if (_configManager.isEndpointExcluded(endpoint)) { + return null; + } + + String method = requestContext.getMethod(); + String originIpAddress = extractClientIpAddress(requestContext, remoteAddr); + String userId = extractUserId(requestContext); + + // Capture request payload based on configuration + Object requestPayload = captureRequestPayload(requestContext); + + // Log the audit event (service ID will be extracted from headers, not config) + return new AuditEvent(extractServiceId(requestContext), endpoint, method, originIpAddress, userId, + requestPayload); + } catch (Exception e) { + // Graceful degradation: Never let audit logging failures affect the main request + LOG.warn("Failed to process audit logging for request", e); + } + return null; + } + + public boolean isEnabled() { + return _configManager.isEnabled(); + } + + /** + * Extracts the client IP address from the request. + * Checks common proxy headers before falling back to remote address. + * + * @param requestContext the container request context + * @param remoteAddr the remote address from the underlying request + * @return the client IP address + */ + private String extractClientIpAddress(ContainerRequestContext requestContext, String remoteAddr) { + try { + // Check for proxy headers first + String xForwardedFor = requestContext.getHeaderString("X-Forwarded-For"); + if (StringUtils.isNotBlank(xForwardedFor)) { + // X-Forwarded-For can contain multiple IPs, take the first one + return xForwardedFor.split(",")[0].trim(); + } + + String xRealIp = requestContext.getHeaderString("X-Real-IP"); + if (StringUtils.isNotBlank(xRealIp)) { + return xRealIp.trim(); + } + + // Fall back to remote address + return remoteAddr; + } catch (Exception e) { + LOG.debug("Failed to extract client IP address", e); + return "unknown"; + } + } + + /** + * Extracts user ID from request headers. + * Looks for common authentication headers. + * + * @param requestContext the container request context + * @return the user ID or "anonymous" if not found + */ + private String extractUserId(ContainerRequestContext requestContext) { + try { + // Check for common user identification headers + String authHeader = requestContext.getHeaderString("Authorization"); + if (StringUtils.isNotBlank(authHeader)) { + // For basic auth, extract username; for bearer tokens, use a placeholder + if (authHeader.startsWith("Basic ")) { + // Could decode basic auth to get username, but for security keep it as placeholder + return "basic-auth-user"; + } else if (authHeader.startsWith("Bearer ")) { + return "bearer-token-user"; + } + } + + // Check for custom user headers + String userHeader = requestContext.getHeaderString("X-User-ID"); + if (StringUtils.isNotBlank(userHeader)) { + return userHeader.trim(); + } + + userHeader = requestContext.getHeaderString("X-Username"); + if (StringUtils.isNotBlank(userHeader)) { + return userHeader.trim(); + } + + return ANONYMOUS; + } catch (Exception e) { + LOG.debug("Failed to extract user ID", e); + return ANONYMOUS; + } + } + + /** + * Extracts service ID from request headers. + * Service ID should be provided by the client in headers, not from configuration. + * + * @param requestContext the container request context + * @return the service ID or "unknown" if not found + */ + private String extractServiceId(ContainerRequestContext requestContext) { + try { + // Check for custom service ID headers + String serviceId = requestContext.getHeaderString("X-Service-ID"); + if (StringUtils.isNotBlank(serviceId)) { + return serviceId.trim(); + } + + serviceId = requestContext.getHeaderString("X-Service-Name"); + if (StringUtils.isNotBlank(serviceId)) { + return serviceId.trim(); + } + + return "unknown"; + } catch (Exception e) { + LOG.debug("Failed to extract service ID", e); + return "unknown"; + } + } + + /** + * Captures the request payload for audit logging based on configuration. + * Uses dynamic configuration to control what data is captured. + * + * @param requestContext the request context + * @return the captured request payload + */ + private Object captureRequestPayload(ContainerRequestContext requestContext) { + // Get current configuration (fallback to defaults if no config manager) + AuditConfig config = _configManager != null ? _configManager.getCurrentConfig() : new AuditConfig(); + + Map<String, Object> payload = new HashMap<>(); + + try { + // Always capture query parameters (lightweight) + UriInfo uriInfo = requestContext.getUriInfo(); + MultivaluedMap<String, String> queryParams = uriInfo.getQueryParameters(); + if (!queryParams.isEmpty()) { + Map<String, Object> queryMap = new HashMap<>(); + for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) { + List<String> values = entry.getValue(); + if (values.size() == 1) { + queryMap.put(entry.getKey(), values.get(0)); + } else { + queryMap.put(entry.getKey(), values); + } + } + payload.put("queryParameters", queryMap); + } + + // Conditionally capture request body based on configuration + if (config.isCaptureRequestPayload() && requestContext.hasEntity()) { + String requestBody = readRequestBody(requestContext, config.getMaxPayloadSize()); + if (StringUtils.isNotBlank(requestBody)) { + payload.put("body", requestBody); + } + } + + // Conditionally capture headers based on configuration + if (config.isCaptureRequestHeaders()) { + MultivaluedMap<String, String> headers = requestContext.getHeaders(); + if (!headers.isEmpty()) { + Map<String, String> headerMap = new HashMap<>(); + for (Map.Entry<String, List<String>> entry : headers.entrySet()) { + String headerName = entry.getKey().toLowerCase(); + // Skip sensitive headers + if (!headerName.contains("auth") && !headerName.contains("password") && !headerName.contains("token") + && !headerName.contains("secret")) { + List<String> values = entry.getValue(); + if (!values.isEmpty()) { + headerMap.put(entry.getKey(), values.get(0)); + } + } + } + if (!headerMap.isEmpty()) { + payload.put("headers", headerMap); + } + } + } + } catch (Exception e) { + LOG.debug("Failed to capture request payload", e); + payload.put("error", "Failed to capture payload: " + e.getMessage()); + } + + return payload.isEmpty() ? null : payload; + } + + /** + * Reads the request body from the entity input stream. + * Restores the input stream for downstream processing. + * Limits the amount of data read based on configuration. + * + * @param requestContext the request context + * @param maxPayloadSize maximum bytes to read from the request body + * @return the request body as string (potentially truncated) + */ + private String readRequestBody(ContainerRequestContext requestContext, int maxPayloadSize) { + try { + InputStream entityStream = requestContext.getEntityStream(); + if (entityStream == null) { + return null; + } + + // Read the stream content with size limit + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + byte[] data = new byte[8192]; + int bytesRead; + int totalBytesRead = 0; + boolean truncated = false; + + while ((bytesRead = entityStream.read(data, 0, data.length)) != -1) { + if (totalBytesRead + bytesRead > maxPayloadSize) { + // Truncate to max payload size + int remainingBytes = maxPayloadSize - totalBytesRead; + if (remainingBytes > 0) { + buffer.write(data, 0, remainingBytes); + } + truncated = true; + break; + } + buffer.write(data, 0, bytesRead); + totalBytesRead += bytesRead; + } + + byte[] requestBodyBytes = buffer.toByteArray(); + String requestBody = new String(requestBodyBytes, StandardCharsets.UTF_8); + + // Add truncation indicator if needed + if (truncated) { + requestBody += " [TRUNCATED - exceeds " + maxPayloadSize + " byte limit]"; + } Review Comment: nit: can this be moved into the next if block which checks for "if (truncated)" anyways? ########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditConfigManager.java: ########## @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import java.util.HashSet; +import java.util.Set; +import javax.inject.Singleton; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Thread-safe configuration manager for audit logging settings. + * Handles dynamic configuration updates from cluster configuration changes. + * Self-registers with the provided cluster config provider. + */ +@Singleton +public final class AuditConfigManager { + + private static final Logger LOG = LoggerFactory.getLogger(AuditConfigManager.class); + + private final AuditConfig _currentConfig = new AuditConfig(); + + /** + * Checks if the given endpoint should be excluded from audit logging. + * Supports simple wildcard matching with '*' character. + */ + public static boolean isEndpointExcluded(String endpoint, String excludedEndpointsString) { + if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(excludedEndpointsString)) { + return false; + } + + Set<String> excludedEndpoints = parseExcludedEndpoints(excludedEndpointsString); + if (excludedEndpoints.isEmpty()) { + return false; + } + + // Check for exact matches first + if (excludedEndpoints.contains(endpoint)) { + return true; + } + + // Check for wildcard matches + for (String excluded : excludedEndpoints) { + if (excluded.contains("*")) { + if (matchesWildcard(endpoint, excluded)) { + return true; + } + } + } + + return false; + } + + private static Set<String> parseExcludedEndpoints(String excludedEndpointsString) { + Set<String> excludedEndpoints = new HashSet<>(); + if (StringUtils.isNotBlank(excludedEndpointsString)) { + String[] endpoints = excludedEndpointsString.split(","); + for (String endpoint : endpoints) { + String trimmed = endpoint.trim(); + if (StringUtils.isNotBlank(trimmed)) { + excludedEndpoints.add(trimmed); + } + } + } + return excludedEndpoints; + } + + private static boolean matchesWildcard(String endpoint, String pattern) { Review Comment: does this mean that the wildcard can't be somewhere in the middle? like "end*point"? Maybe have a comment somewhere that calls out allowed patterns? Just wondering if it makes sense to use the `Pattern` class here to compile the pattern and then search for the match? It might be overkill if we don't expect matches other than what's mentioned here. ########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java: ########## @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.UriInfo; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class for extracting audit information from Jersey HTTP requests. + * Handles all the complex logic for IP address extraction, user identification, + * and request payload capture for audit logging purposes. + * Uses dynamic configuration to control audit behavior. + */ +@Singleton +public class AuditRequestProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AuditRequestProcessor.class); + private static final String ANONYMOUS = "anonymous"; + + @Inject + private AuditConfigManager _configManager; + + public AuditEvent processRequest(ContainerRequestContext requestContext, String remoteAddr) { + // Check if auditing is enabled (if config manager is available) + if (!isEnabled()) { + return null; + } + + try { + UriInfo uriInfo = requestContext.getUriInfo(); + String endpoint = uriInfo.getPath(); + + // Check endpoint exclusions + if (_configManager.isEndpointExcluded(endpoint)) { + return null; + } + + String method = requestContext.getMethod(); + String originIpAddress = extractClientIpAddress(requestContext, remoteAddr); + String userId = extractUserId(requestContext); + + // Capture request payload based on configuration + Object requestPayload = captureRequestPayload(requestContext); + + // Log the audit event (service ID will be extracted from headers, not config) + return new AuditEvent(extractServiceId(requestContext), endpoint, method, originIpAddress, userId, + requestPayload); + } catch (Exception e) { + // Graceful degradation: Never let audit logging failures affect the main request + LOG.warn("Failed to process audit logging for request", e); + } + return null; + } + + public boolean isEnabled() { + return _configManager.isEnabled(); + } + + /** + * Extracts the client IP address from the request. + * Checks common proxy headers before falling back to remote address. + * + * @param requestContext the container request context + * @param remoteAddr the remote address from the underlying request + * @return the client IP address + */ + private String extractClientIpAddress(ContainerRequestContext requestContext, String remoteAddr) { + try { + // Check for proxy headers first + String xForwardedFor = requestContext.getHeaderString("X-Forwarded-For"); + if (StringUtils.isNotBlank(xForwardedFor)) { + // X-Forwarded-For can contain multiple IPs, take the first one + return xForwardedFor.split(",")[0].trim(); + } + + String xRealIp = requestContext.getHeaderString("X-Real-IP"); + if (StringUtils.isNotBlank(xRealIp)) { + return xRealIp.trim(); + } + + // Fall back to remote address + return remoteAddr; + } catch (Exception e) { + LOG.debug("Failed to extract client IP address", e); + return "unknown"; + } + } + + /** + * Extracts user ID from request headers. + * Looks for common authentication headers. + * + * @param requestContext the container request context + * @return the user ID or "anonymous" if not found + */ + private String extractUserId(ContainerRequestContext requestContext) { + try { + // Check for common user identification headers + String authHeader = requestContext.getHeaderString("Authorization"); + if (StringUtils.isNotBlank(authHeader)) { + // For basic auth, extract username; for bearer tokens, use a placeholder + if (authHeader.startsWith("Basic ")) { + // Could decode basic auth to get username, but for security keep it as placeholder + return "basic-auth-user"; + } else if (authHeader.startsWith("Bearer ")) { + return "bearer-token-user"; Review Comment: nit: create static variables for these? ########## pinot-common/src/main/java/org/apache/pinot/common/audit/AuditRequestProcessor.java: ########## @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.audit; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.UriInfo; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class for extracting audit information from Jersey HTTP requests. + * Handles all the complex logic for IP address extraction, user identification, + * and request payload capture for audit logging purposes. + * Uses dynamic configuration to control audit behavior. + */ +@Singleton +public class AuditRequestProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AuditRequestProcessor.class); + private static final String ANONYMOUS = "anonymous"; + + @Inject + private AuditConfigManager _configManager; + + public AuditEvent processRequest(ContainerRequestContext requestContext, String remoteAddr) { + // Check if auditing is enabled (if config manager is available) + if (!isEnabled()) { + return null; + } + + try { + UriInfo uriInfo = requestContext.getUriInfo(); + String endpoint = uriInfo.getPath(); + + // Check endpoint exclusions + if (_configManager.isEndpointExcluded(endpoint)) { + return null; + } + + String method = requestContext.getMethod(); + String originIpAddress = extractClientIpAddress(requestContext, remoteAddr); + String userId = extractUserId(requestContext); + + // Capture request payload based on configuration + Object requestPayload = captureRequestPayload(requestContext); + + // Log the audit event (service ID will be extracted from headers, not config) + return new AuditEvent(extractServiceId(requestContext), endpoint, method, originIpAddress, userId, + requestPayload); + } catch (Exception e) { + // Graceful degradation: Never let audit logging failures affect the main request + LOG.warn("Failed to process audit logging for request", e); + } + return null; + } + + public boolean isEnabled() { + return _configManager.isEnabled(); + } + + /** + * Extracts the client IP address from the request. + * Checks common proxy headers before falling back to remote address. + * + * @param requestContext the container request context + * @param remoteAddr the remote address from the underlying request + * @return the client IP address + */ + private String extractClientIpAddress(ContainerRequestContext requestContext, String remoteAddr) { + try { + // Check for proxy headers first + String xForwardedFor = requestContext.getHeaderString("X-Forwarded-For"); + if (StringUtils.isNotBlank(xForwardedFor)) { + // X-Forwarded-For can contain multiple IPs, take the first one + return xForwardedFor.split(",")[0].trim(); + } + + String xRealIp = requestContext.getHeaderString("X-Real-IP"); + if (StringUtils.isNotBlank(xRealIp)) { + return xRealIp.trim(); + } + + // Fall back to remote address + return remoteAddr; + } catch (Exception e) { + LOG.debug("Failed to extract client IP address", e); + return "unknown"; + } + } + + /** + * Extracts user ID from request headers. + * Looks for common authentication headers. + * + * @param requestContext the container request context + * @return the user ID or "anonymous" if not found + */ + private String extractUserId(ContainerRequestContext requestContext) { + try { + // Check for common user identification headers + String authHeader = requestContext.getHeaderString("Authorization"); + if (StringUtils.isNotBlank(authHeader)) { + // For basic auth, extract username; for bearer tokens, use a placeholder + if (authHeader.startsWith("Basic ")) { + // Could decode basic auth to get username, but for security keep it as placeholder + return "basic-auth-user"; + } else if (authHeader.startsWith("Bearer ")) { + return "bearer-token-user"; + } + } + + // Check for custom user headers + String userHeader = requestContext.getHeaderString("X-User-ID"); + if (StringUtils.isNotBlank(userHeader)) { + return userHeader.trim(); + } + + userHeader = requestContext.getHeaderString("X-Username"); + if (StringUtils.isNotBlank(userHeader)) { + return userHeader.trim(); + } + + return ANONYMOUS; + } catch (Exception e) { + LOG.debug("Failed to extract user ID", e); Review Comment: should this be warn? same question for other functions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
