mneethiraj commented on code in PR #886:
URL: https://github.com/apache/ranger/pull/886#discussion_r3156346565
##########
audit-server/audit-common/src/main/java/org/apache/ranger/audit/server/AuditServerConstants.java:
##########
@@ -77,18 +77,18 @@ private AuditServerConstants() {}
public static final int DEFAULT_BUFFER_PARTITIONS =
9; // For plugin-based partitioning
// Hadoop security configuration for UGI
- public static final String PROP_HADOOP_AUTHENTICATION_TYPE =
"hadoop.security.authentication";
- public static final String PROP_HADOOP_AUTH_TYPE_KERBEROS =
"kerberos";
- public static final String PROP_HADOOP_KERBEROS_NAME_RULES =
"hadoop.security.auth_to_local";
+ public static final String PROP_HADOOP_AUTHENTICATION_TYPE =
"hadoop.security.authentication";
+ public static final String PROP_HADOOP_AUTH_TYPE_KERBEROS = "kerberos";
+ public static final String PROP_HADOOP_KERBEROS_NAME_RULES =
"hadoop.security.auth_to_local";
// Kafka Topic and defaults
- public static final String DEFAULT_TOPIC =
"ranger_audits";
- public static final String DEFAULT_SASL_MECHANISM =
"PLAIN";
- public static final String DEFAULT_SECURITY_PROTOCOL =
"PLAINTEXT";
- public static final String DEFAULT_SERVICE_NAME =
"kafka";
- public static final String DEFAULT_RANGER_AUDIT_HDFS_CONSUMER_GROUP =
"ranger_audit_hdfs_consumer_group";
- public static final String DEFAULT_RANGER_AUDIT_SOLR_CONSUMER_GROUP =
"ranger_audit_solr_consumer_group";
- public static final String PROP_SECURITY_PROTOCOL_VALUE =
"SASL";
+ public static final String DEFAULT_TOPIC =
"ranger_audits";
+ public static final String DEFAULT_SASL_MECHANISM =
"PLAIN";
+ public static final String DEFAULT_SECURITY_PROTOCOL =
"PLAINTEXT";
+ public static final String DEFAULT_SERVICE_NAME =
"kafka";
+ public static final String DEFAULT_RANGER_AUDIT_HDFS_DISPATCHER_GROUP =
"ranger_audit_hdfs_dispatcher_group";
Review Comment:
Consider moving following constants to dispatcher specific files:
- PROP_SOLR_DEST_PREFIX
- PROP_HDFS_DEST_PREFIX
- DEFAULT_RANGER_AUDIT_HDFS_DISPATCHER_GROUP
- DEFAULT_RANGER_AUDIT_SOLR_DISPATCHER_GROUP
- DESTINATION_HDFS
- DESTINATION_SOLR
##########
audit-server/audit-common/src/main/java/org/apache/ranger/audit/utils/AuditMessageQueueUtils.java:
##########
@@ -121,9 +114,10 @@ public String createAuditsTopicIfNotExists(Properties
props, String propPrefix)
ret = updateExistingTopicPartitions(admin, topicName,
partitions, replicationFactor);
}
} catch (Exception ex) {
- if (currentAttempt <= maxRetries) {
+ if (currentAttempt < maxAttempts) {
LOG.warn("AuditMessageQueueUtils:createAuditsTopicIfNotExists(): Failed to
connect to Kafka on attempt {}/{}. Retrying in {} ms. Error: {}",
- currentAttempt, maxRetries + 1, retryDelayMs,
ex.getMessage());
+ currentAttempt, maxAttempts, retryDelayMs,
ex.getMessage());
Review Comment:
Consider including exception details in the log i.e., not just the message:
```
LOG.warn("AuditMessageQueueUtils:createAuditsTopicIfNotExists(): Failed to
connect to Kafka on attempt {}/{}. Retrying in {} ms.", currentAttempt,
maxAttempts, retryDelayMs, ex);
```
##########
audit-server/audit-common/src/main/java/org/apache/ranger/audit/utils/AuditServerUtils.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.utils;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class AuditServerUtils {
+ private AuditServerUtils() {
+ }
+
+ public static boolean waitUntilTopicReady(Admin admin, String topic,
Duration totalWait) throws Exception {
+ long endTime = System.nanoTime() + totalWait.toNanos();
+ long baseSleepMs = 100L;
+ long maxSleepMs = 2000L;
+
+ while (System.nanoTime() < endTime) {
+ try {
+ DescribeTopicsResult describeTopicsResult =
admin.describeTopics(Collections.singleton(topic));
+ TopicDescription topicDescription =
describeTopicsResult.values().get(topic).get();
+ boolean allHaveLeader =
topicDescription.partitions().stream().allMatch(partitionInfo ->
partitionInfo.leader() != null);
+ boolean allHaveISR =
topicDescription.partitions().stream().allMatch(partitionInfo ->
!partitionInfo.isr().isEmpty());
+
+ if (allHaveLeader && allHaveISR) {
+ return true;
+ }
+ } catch (Exception e) {
+ // If topic hasn't propagated yet, you'll see
UnknownTopicOrPartitionException
+ // continue to wait for topic availability
+ if (!(rootCause(e) instanceof
UnknownTopicOrPartitionException)) {
+ throw e;
+ }
+ }
+
+ // Sleep until the created topic is available for metadata fetch
+ baseSleepMs = Math.min(maxSleepMs, baseSleepMs * 2);
+
+ long sleep = baseSleepMs + ThreadLocalRandom.current().nextLong(0,
baseSleepMs / 2 + 1);
+
Review Comment:
Consider adding a info log here, to help troubleshooting:
```
LOG.info("topic {} is not ready yet. will retry after {}ms", topic, sleep);
```
Also, consider adding similar logs after line 47 and 65 as well.
##########
audit-server/audit-dispatcher/dispatcher-app/src/main/java/org/apache/ranger/audit/rest/AuditDispatcherHealthREST.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Health check REST endpoint for Audit Dispatcher Service
+ */
+@Path("/health")
+@Component
+@Scope("request")
+public class AuditDispatcherHealthREST {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditDispatcherHealthREST.class);
+
+ /**
+ * Simple ping endpoint to verify the service is running
+ */
+ @GET
+ @Path("/ping")
+ @Produces("application/json")
+ public Response ping() {
+ Map<String, String> resp = new HashMap<>();
+ resp.put("status", "UP");
+ resp.put("service", "audit-dispatcher");
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ return Response.ok(mapper.writeValueAsString(resp)).build();
+ } catch (Exception e) {
+ LOG.error("Error creating ping response", e);
+ return Response.serverError().build();
+ }
+ }
+
+ /**
+ * Detailed health check endpoint that verifies internal components
+ */
+ @GET
+ @Path("/status")
+ @Produces("application/json")
+ public Response status() {
+ Map<String, Object> resp = new HashMap<>();
+ try {
+ String dispatcherType =
System.getProperty("ranger.audit.dispatcher.type");
+ resp.put("service", "audit-dispatcher-" + (dispatcherType != null
? dispatcherType : "unknown"));
+
+ if (dispatcherType != null && !dispatcherType.trim().isEmpty()) {
+ String auditDispatcherType = dispatcherType.toLowerCase();
+ boolean isActive =
AuditDispatcherTracker.getInstance().getActiveDispatchers().stream()
Review Comment:
Instead of looking for `dispatcherType` in the class name, consider the
following:
```
boolean isAction =
AuditDispatcherTracker.getInstance().getActiveDispatchers().containsKey(dispatcherType);
```
##########
audit-server/audit-common/src/main/java/org/apache/ranger/audit/server/AuditServerConstants.java:
##########
@@ -97,18 +97,18 @@ private AuditServerConstants() {}
public static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS =
30000; // 30 seconds
public static final int DEFAULT_MAX_POLL_RECORDS = 500;
// Kafka default batch size
- // Kafka consumer rebalancing timeouts (for subscribe mode)
- public static final int DEFAULT_SESSION_TIMEOUT_MS =
60000; // 60 seconds - failure detection
- public static final int DEFAULT_MAX_POLL_INTERVAL_MS =
300000; // 5 minutes - max processing time
- public static final int DEFAULT_HEARTBEAT_INTERVAL_MS =
10000; // 10 seconds - heartbeat frequency
+ // Kafka dispatcher rebalancing timeouts (for subscribe mode)
+ public static final int DEFAULT_SESSION_TIMEOUT_MS = 60000; // 60
seconds - failure detection
+ public static final int DEFAULT_MAX_POLL_INTERVAL_MS = 300000; // 5
minutes - max processing time
+ public static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 10000; // 10
seconds - heartbeat frequency
- // Kafka consumer partition assignment strategy
- public static final String DEFAULT_PARTITION_ASSIGNMENT_STRATEGY =
"org.apache.kafka.clients.consumer.RangeAssignor";
+ // Kafka dispatcher partition assignment strategy
+ public static final String DEFAULT_PARTITION_ASSIGNMENT_STRATEGY =
"org.apache.kafka.clients.consumer.RangeAssignor";
// Destination
- public static final String DESTINATION_HDFS =
"HDFS";
- public static final String DESTINATION_SOLR =
"SOLR";
+ public static final String DESTINATION_HDFS = "HDFS";
+ public static final String DESTINATION_SOLR = "SOLR";
- // Consumer Registry Configuration
- public static final String PROP_CONSUMER_CLASSES =
"consumer.classes";
+ // Dispatcher Registry Configuration
+ public static final String PROP_DISPATCHER_CLASSES = "dispatcher.classes";
Review Comment:
What is the usecase for multiple dispatcher classes? If none, consider
renaming to `PROP_DISPATCHER_CLASS = dispatcher.class`
##########
audit-server/audit-dispatcher/dispatcher-solr/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditSolrDispatcher.java:
##########
Review Comment:
Consider marking `processMessageBatch()` as `private` as it is called only
from this class.
##########
audit-server/audit-common/src/main/java/org/apache/ranger/audit/utils/AuditMessageQueueUtils.java:
##########
@@ -228,9 +211,9 @@ public String getJAASConfig(Properties props, String
propPrefix) {
public String updateExistingTopicPartitions(AdminClient admin, String
topicName, int partitions, short replicationFactor) {
Review Comment:
Consider marking `updateExistingTopicPartitions()` as `private` as this
method is only called from this class.
##########
audit-server/audit-dispatcher/dispatcher-app/src/main/java/org/apache/ranger/audit/rest/AuditDispatcherHealthREST.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Health check REST endpoint for Audit Dispatcher Service
+ */
+@Path("/health")
+@Component
+@Scope("request")
+public class AuditDispatcherHealthREST {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditDispatcherHealthREST.class);
+
+ /**
+ * Simple ping endpoint to verify the service is running
+ */
+ @GET
+ @Path("/ping")
+ @Produces("application/json")
+ public Response ping() {
+ Map<String, String> resp = new HashMap<>();
+ resp.put("status", "UP");
+ resp.put("service", "audit-dispatcher");
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ return Response.ok(mapper.writeValueAsString(resp)).build();
+ } catch (Exception e) {
+ LOG.error("Error creating ping response", e);
+ return Response.serverError().build();
+ }
+ }
+
+ /**
+ * Detailed health check endpoint that verifies internal components
+ */
+ @GET
+ @Path("/status")
+ @Produces("application/json")
+ public Response status() {
+ Map<String, Object> resp = new HashMap<>();
+ try {
+ String dispatcherType =
System.getProperty("ranger.audit.dispatcher.type");
Review Comment:
`ranger.audit.dispatcher.type` is referenced in several places; consider
defining a constant (in `AtlasServerConstants.java`?)
##########
audit-server/audit-dispatcher/dispatcher-solr/src/main/java/org/apache/ranger/audit/dispatcher/SolrDispatcherManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcher;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.server.AuditServerConstants;
+import org.apache.ranger.audit.utils.AuditServerLogFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+
+import java.util.Properties;
+
+/**
+ * Spring component that manages the lifecycle of Solr dispatcher threads.
+ * Manager that manages the lifecycle of Solr dispatcher threads.
+ * - Initializes the dispatcher tracker
+ * - Creates Solr dispatcher instances
+ * - Starts dispatcher threads
+ * - Handles graceful shutdown
+ */
+public class SolrDispatcherManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(SolrDispatcherManager.class);
+ private static final String CONFIG_DISPATCHER_TYPE =
"ranger.audit.dispatcher.type";
+
+ private final AuditDispatcherTracker tracker =
AuditDispatcherTracker.getInstance();
+ private AuditDispatcher dispatcher;
+ private Thread dispatcherThread;
+
+ @PostConstruct
Review Comment:
Is `@PostConstruct` necessary given this method is called from
`AuditDispatcherApplication.initializeDispatcherManager()` via reflection?
##########
audit-server/audit-dispatcher/dispatcher-app/src/main/java/org/apache/ranger/audit/dispatcher/AuditDispatcherApplication.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.server.AuditConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class AuditDispatcherApplication {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditDispatcherApplication.class);
+ private static final String APP_NAME = "audit-dispatcher";
+ private static final String CONFIG_PREFIX =
"ranger.audit.dispatcher.";
+ private static final String COMMON_CONFIG_FILE =
"ranger-audit-dispatcher-site.xml";
+
+ private AuditDispatcherApplication() {
+ }
+
+ public static void main(String[] args) {
+ AuditConfig config = new AuditConfig();
+ config.addResourceIfReadable(COMMON_CONFIG_FILE);
+ LOG.info("Loaded common configuration from classpath: {}",
COMMON_CONFIG_FILE);
+
+ String dispatcherType = System.getProperty(CONFIG_PREFIX + "type");
+ if (dispatcherType == null) {
+ dispatcherType = config.get(CONFIG_PREFIX + "type");
+ }
+
+ // Load dispatcher-specific configuration from classpath
+ if (dispatcherType != null) {
+ String specificConfig = "ranger-audit-dispatcher-" +
dispatcherType + "-site.xml";
+ config.addResourceIfReadable(specificConfig);
+ LOG.info("Loaded dispatcher-specific configuration from classpath:
{}", specificConfig);
+ } else {
+ LOG.warn("No dispatcher type specified. Service might fail to
start correctly.");
Review Comment:
Looking at line 63 below, it seems `dispatcherType` can't be null. If yes,
an exception should be thrown from here - terminating the application.
##########
audit-server/audit-common/src/main/java/org/apache/ranger/audit/utils/AuditMessageQueueUtils.java:
##########
@@ -43,35 +43,26 @@
public class AuditMessageQueueUtils {
Review Comment:
Consider making all methods in `AuditMessageQueueUtils` class as `static`,
as they don't share any state in the class. And mark the constructor as
`private` to prevent instantiation.
##########
audit-server/audit-dispatcher/dispatcher-app/src/main/java/org/apache/ranger/audit/rest/AuditDispatcherHealthREST.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Health check REST endpoint for Audit Dispatcher Service
+ */
+@Path("/health")
+@Component
+@Scope("request")
+public class AuditDispatcherHealthREST {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditDispatcherHealthREST.class);
+
+ /**
+ * Simple ping endpoint to verify the service is running
+ */
+ @GET
+ @Path("/ping")
+ @Produces("application/json")
+ public Response ping() {
+ Map<String, String> resp = new HashMap<>();
+ resp.put("status", "UP");
+ resp.put("service", "audit-dispatcher");
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
Review Comment:
Consider using a `static final` instance of `ObjectMapper`, instead of
creatin an instance in each call to `/ping` and `/status`.
##########
audit-server/audit-dispatcher/dispatcher-app/src/main/java/org/apache/ranger/audit/rest/AuditDispatcherHealthREST.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Health check REST endpoint for Audit Dispatcher Service
+ */
+@Path("/health")
+@Component
+@Scope("request")
+public class AuditDispatcherHealthREST {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditDispatcherHealthREST.class);
+
+ /**
+ * Simple ping endpoint to verify the service is running
+ */
+ @GET
+ @Path("/ping")
+ @Produces("application/json")
+ public Response ping() {
+ Map<String, String> resp = new HashMap<>();
+ resp.put("status", "UP");
+ resp.put("service", "audit-dispatcher");
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ return Response.ok(mapper.writeValueAsString(resp)).build();
+ } catch (Exception e) {
+ LOG.error("Error creating ping response", e);
+ return Response.serverError().build();
+ }
+ }
+
+ /**
+ * Detailed health check endpoint that verifies internal components
+ */
+ @GET
+ @Path("/status")
+ @Produces("application/json")
+ public Response status() {
+ Map<String, Object> resp = new HashMap<>();
+ try {
+ String dispatcherType =
System.getProperty("ranger.audit.dispatcher.type");
+ resp.put("service", "audit-dispatcher-" + (dispatcherType != null
? dispatcherType : "unknown"));
+
+ if (dispatcherType != null && !dispatcherType.trim().isEmpty()) {
+ String auditDispatcherType = dispatcherType.toLowerCase();
+ boolean isActive =
AuditDispatcherTracker.getInstance().getActiveDispatchers().stream()
+ .filter(d -> d != null)
+ .anyMatch(d ->
d.getClass().getName().toLowerCase().contains(auditDispatcherType));
+
+ if (isActive) {
+ resp.put("status", "UP");
+ } else {
+ resp.put("status", "DOWN");
+ resp.put("reason", dispatcherType + " Dispatcher is not
active");
+ }
+ } else {
+ resp.put("status", "DOWN");
+ resp.put("reason", "Unknown dispatcher type: " +
dispatcherType);
Review Comment:
`Unknown dispatcher type:` => `Dispatcher type not provided`
##########
audit-server/audit-dispatcher/dispatcher-hdfs/src/main/resources/conf/ranger-audit-dispatcher-hdfs-site.xml:
##########
@@ -0,0 +1,140 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<configuration>
+ <!-- HDFS DISPATCHER SERVICE CONFIGURATION -->
+ <property>
+ <name>ranger.audit.dispatcher.hdfs.class</name>
+ <value>org.apache.ranger.audit.dispatcher.HdfsDispatcherManager</value>
+ </property>
+
+
+
+ <property>
+ <name>log.dir</name>
+ <value>${audit.dispatcher.hdfs.log.dir}</value>
+ <description>Log directory for HDFS dispatcher service</description>
+ </property>
+
+ <!-- KAFKA DISPATCHER CONFIGURATION -->
+ <property>
+ <name>xasecure.audit.destination.kafka.host</name>
+ <value>ranger-audit-dispatcher-hdfs.rangernw</value>
+ <description>
+ - In Docker: Use full service name with domain (e.g.,
ranger-audit-server.rangernw)
+ - In VM: Use the actual FQDN (e.g., ranger.example.com)
+ </description>
+ </property>
+
+ <property>
+ <name>xasecure.audit.destination.kafka.kerberos.principal</name>
+ <value>rangerauditserver/[email protected]</value>
+ <description>
+ rangerauditserver user kerberos principal for authentication into
kafka
+ </description>
+ </property>
+
+ <property>
+ <name>xasecure.audit.destination.kafka.kerberos.keytab</name>
+ <value>/etc/keytabs/rangerauditserver.keytab</value>
+ <description>
+ keytab of the rangerauditserver principal
+ </description>
+ </property>
+ <property>
+ <name>xasecure.audit.destination.kafka.bootstrap.servers</name>
+ <value>ranger-kafka:9092</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.destination.kafka.topic.name</name>
+ <value>ranger_audits</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.destination.kafka.security.protocol</name>
+ <value>SASL_PLAINTEXT</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.destination.kafka.sasl.mechanism</name>
+ <value>GSSAPI</value>
+ </property>
+
+ <!-- HDFS Dispatcher Threading Configuration -->
+ <property>
+ <name>xasecure.audit.destination.kafka.dispatcher.thread.count</name>
+ <value>3</value>
+ <description>Number of HDFS dispatcher worker threads</description>
+ </property>
+
+ <!-- Offset Management -->
+ <property>
+
<name>xasecure.audit.destination.kafka.dispatcher.offset.commit.strategy</name>
+ <value>batch</value>
+ <description>batch or manual</description>
+ </property>
+
+ <property>
+
<name>xasecure.audit.destination.kafka.dispatcher.offset.commit.interval.ms</name>
+ <value>30000</value>
+ <description>Used only if strategy is 'manual'</description>
+ </property>
+
+ <property>
+
<name>xasecure.audit.destination.kafka.dispatcher.max.poll.records</name>
+ <value>500</value>
+ <description>Maximum records per poll</description>
+ </property>
+
+ <!-- Dispatcher Classes (ONLY HDFS) -->
+ <property>
+ <name>xasecure.audit.destination.kafka.dispatcher.classes</name>
+
<value>org.apache.ranger.audit.dispatcher.kafka.AuditHDFSDispatcher</value>
+ </property>
+
+ <!-- HDFS DESTINATION CONFIGURATION - MUST BE ENABLED -->
+ <property>
+ <name>xasecure.audit.destination.hdfs</name>
+ <value>true</value>
+ <description>MUST be true for HDFS dispatcher to work</description>
Review Comment:
Instead of requiring config file to have
`xasecure.audit.destination.hdfs=true`, consider internally settig this
property to true when dispatcher class is `AuditHDFSDispatcher`.
##########
audit-server/audit-dispatcher/dispatcher-hdfs/src/main/java/org/apache/ranger/audit/dispatcher/HdfsDispatcherManager.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcher;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.server.AuditServerConstants;
+import org.apache.ranger.audit.utils.AuditServerLogFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Spring component that manages the lifecycle of HDFS dispatcher threads.
+ * Manager that handles the lifecycle of HDFS dispatcher threads.
+ * - Initializes the dispatcher tracker
+ * - Creates HDFS dispatcher instances
+ * - Starts dispatcher threads
+ * - Handles graceful shutdown
+ */
+public class HdfsDispatcherManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(HdfsDispatcherManager.class);
+ private static final String CONFIG_DISPATCHER_TYPE =
"ranger.audit.dispatcher.type";
+
+ private final AuditDispatcherTracker tracker =
AuditDispatcherTracker.getInstance();
+ private AuditDispatcher dispatcher;
+ private Thread dispatcherThread;
+
+ public void init(Properties props) {
+ LOG.info("==> HdfsDispatcherManager.init()");
+
+ String dispatcherType = System.getProperty(CONFIG_DISPATCHER_TYPE);
+ if (dispatcherType != null &&
!dispatcherType.equalsIgnoreCase("hdfs")) {
Review Comment:
Consider replacing `"hdfs"` with a constant in line 52 and 112.
```
private statis final String DISPATCHER_TYPE_HDFS = "hdfs";
```
##########
audit-server/audit-dispatcher/dispatcher-hdfs/src/main/java/org/apache/ranger/audit/dispatcher/kafka/AuditHDFSDispatcher.java:
##########
@@ -224,178 +217,250 @@ private void initializeRangerUGI(Properties props,
String propPrefix) throws Exc
* Add Hadoop configuration properties from core-site.xml and
hdfs-site.xml to props.
*/
private void addHadoopConfigToProps(Properties props) {
- LOG.info("==> AuditHDFSConsumer.addHadoopConfigToProps()");
+ LOG.info("==> AuditHDFSDispatcher.addHadoopConfigToProps()");
try {
- HdfsConsumerConfig hdfsConfig = HdfsConsumerConfig.getInstance();
- String configPrefix =
"xasecure.audit.destination.hdfs.config.";
-
- Properties hadoopProps =
hdfsConfig.getHadoopPropertiesWithPrefix(configPrefix);
+ String configPrefix =
"xasecure.audit.destination.hdfs.config.";
+ Properties hadoopProps =
getHadoopPropertiesWithPrefix(configPrefix);
props.putAll(hadoopProps);
- LOG.info("<== AuditHDFSConsumer.addHadoopConfigToProps(): Added {}
Hadoop configuration properties from HdfsConsumerConfig", hadoopProps.size());
+ LOG.info("<== AuditHDFSDispatcher.addHadoopConfigToProps(): Added
{} Hadoop configuration properties", hadoopProps.size());
} catch (Exception e) {
LOG.error("Failed to add Hadoop configuration properties to
props", e);
}
}
- private void initConsumerConfig(Properties props, String propPrefix) {
- LOG.info("==> AuditHDFSConsumer.initConsumerConfig()");
+ private void initDispatcherConfig(Properties props, String propPrefix) {
+ LOG.info("==> AuditHDFSDispatcher.initDispatcherConfig()");
- // Get consumer thread count
- this.consumerThreadCount = MiscUtil.getIntProperty(props, propPrefix +
"." + AuditServerConstants.PROP_CONSUMER_THREAD_COUNT, 1);
- LOG.info("HDFS consumer thread count: {}", consumerThreadCount);
+ // Get dispatcher thread count
+ this.dispatcherThreadCount = MiscUtil.getIntProperty(props, propPrefix
+ "." + AuditServerConstants.PROP_DISPATCHER_THREAD_COUNT, 1);
+ LOG.info("HDFS dispatcher thread count: {}", dispatcherThreadCount);
// Initialize offset management configuration
initializeOffsetManagement(props, propPrefix);
- LOG.info("<== AuditHDFSConsumer.initConsumerConfig()");
+ LOG.info("<== AuditHDFSDispatcher.initDispatcherConfig()");
+ }
+
+ /**
+ * Get Hadoop configuration properties (from core-site.xml and
hdfs-site.xml) with a specific prefix.
+ * This is need for the HdfsAuditDestination and its parent classes for
routing the audits to hdfs location.
+ * @param prefix The prefix to add to each property name
("xasecure.audit.destination.hdfs.config.")
+ * @return Properties from core-site.xml and hdfs-site.xml with the
specified prefix
+ */
+ private Properties getHadoopPropertiesWithPrefix(String prefix) {
+ LOG.debug("==>
AuditHDFSDispatcher.getHadoopPropertiesWithPrefix(prefix={})", prefix);
+
+ Properties prefixedProps = new java.util.Properties();
+ int propsAdded = 0;
+
+ try {
+ // Load core-site.xml separately to get pure Hadoop security
properties
+ Configuration coreSite = new Configuration(false);
+ coreSite.addResource(CONFIG_CORE_SITE);
+
+ for (java.util.Map.Entry<String, String> entry : coreSite) {
+ String propName = entry.getKey();
+ String propValue = entry.getValue();
+
+ if (propValue != null && !propValue.trim().isEmpty()) {
+ prefixedProps.setProperty(prefix + propName, propValue);
+ LOG.trace("Added from core-site.xml: {} = {}", propName,
propValue);
+ propsAdded++;
+ }
+ }
+
+ // Load hdfs-site.xml separately to get pure HDFS client properties
+ Configuration hdfsSite = new Configuration(false);
+ hdfsSite.addResource(CONFIG_HDFS_SITE);
+
+ for (java.util.Map.Entry<String, String> entry : hdfsSite) {
+ String propName = entry.getKey();
+ String propValue = entry.getValue();
+
+ if (propValue != null && !propValue.trim().isEmpty()) {
+ prefixedProps.setProperty(prefix + propName, propValue);
+ LOG.trace("Added from hdfs-site.xml: {} = {}", propName,
propValue);
+ propsAdded++;
+ }
+ }
+
+ LOG.debug("<==
AuditHDFSDispatcher.getHadoopPropertiesWithPrefix(): Added {} Hadoop properties
with prefix '{}'", propsAdded, prefix);
+ } catch (Exception e) {
+ LOG.error("Failed to load Hadoop properties from core-site.xml and
hdfs-site.xml", e);
+ }
+
+ return prefixedProps;
+ }
+
+ /**
+ * Get core-site.xml Configuration for UGI initialization.
+ * @return Configuration loaded from core-site.xml
+ */
+ private Configuration getCoreSiteConfiguration() {
+ LOG.debug("==> AuditHDFSDispatcher.getCoreSiteConfiguration()");
+
+ Configuration coreSite = new Configuration(false);
+ coreSite.addResource(CONFIG_CORE_SITE);
+
+ LOG.debug("<== AuditHDFSDispatcher.getCoreSiteConfiguration():
authentication={}", coreSite.get("hadoop.security.authentication"));
+
+ return coreSite;
}
private void initializeOffsetManagement(Properties props, String
propPrefix) {
- LOG.info("==> AuditHDFSConsumer.initializeOffsetManagement()");
+ LOG.info("==> AuditHDFSDispatcher.initializeOffsetManagement()");
// Get offset commit strategy
this.offsetCommitStrategy = MiscUtil.getStringProperty(props,
- propPrefix + "." +
AuditServerConstants.PROP_CONSUMER_OFFSET_COMMIT_STRATEGY,
+ propPrefix + "." +
AuditServerConstants.PROP_DISPATCHER_OFFSET_COMMIT_STRATEGY,
AuditServerConstants.DEFAULT_OFFSET_COMMIT_STRATEGY);
// Get offset commit interval (only used for manual strategy)
this.offsetCommitInterval = MiscUtil.getLongProperty(props,
- propPrefix + "." +
AuditServerConstants.PROP_CONSUMER_OFFSET_COMMIT_INTERVAL,
+ propPrefix + "." +
AuditServerConstants.PROP_DISPATCHER_OFFSET_COMMIT_INTERVAL,
AuditServerConstants.DEFAULT_OFFSET_COMMIT_INTERVAL_MS);
- AuditServerLogFormatter.builder("HDFS Consumer Offset Management
Configuration")
+ AuditServerLogFormatter.builder("HDFS Dispatcher Offset Management
Configuration")
.add("Commit Strategy", offsetCommitStrategy)
.add("Commit Interval (ms)", offsetCommitInterval + " (used in
manual mode only)")
.logInfo(LOG);
- LOG.info("<== AuditHDFSConsumer.initializeOffsetManagement()");
+ LOG.info("<== AuditHDFSDispatcher.initializeOffsetManagement()");
}
private void startMultithreadedConsumption() {
- LOG.debug("==> AuditHDFSConsumer.startMultithreadedConsumption()");
+ LOG.debug("==> AuditHDFSDispatcher.startMultithreadedConsumption()");
if (running.compareAndSet(false, true)) {
- startConsumerWorkers();
+ startDispatcherWorkers();
}
- LOG.debug("<== AuditHDFSConsumer.startMultithreadedConsumption()");
+ LOG.debug("<== AuditHDFSDispatcher.startMultithreadedConsumption()");
}
- private void startConsumerWorkers() {
- LOG.info("==> AuditHDFSConsumer.startConsumerWorkers(): Creating {}
consumer workers for horizontal scaling", consumerThreadCount);
+ private void startDispatcherWorkers() {
+ LOG.info("==> AuditHDFSDispatcher.startDispatcherWorkers(): Creating
{} dispatcher workers for horizontal scaling", dispatcherThreadCount);
LOG.info("Each worker will subscribe to topic '{}' and process
partitions assigned by Kafka", topicName);
- // Create thread pool sized for consumer workers
- consumerThreadPool = Executors.newFixedThreadPool(consumerThreadCount);
- LOG.info("Created thread pool with {} threads for scalable HDFS
consumption", consumerThreadCount);
+ // Create thread pool sized for dispatcher workers
+ dispatcherThreadPool =
Executors.newFixedThreadPool(dispatcherThreadCount);
+ LOG.info("Created thread pool with {} threads for scalable HDFS
consumption", dispatcherThreadCount);
- // Create HDFS consumer workers
- for (int i = 0; i < consumerThreadCount; i++) {
+ // Create HDFS dispatcher workers
+ for (int i = 0; i < dispatcherThreadCount; i++) {
String workerId = "hdfs-worker-" + i;
- ConsumerWorker worker = new ConsumerWorker(workerId, new
ArrayList<>());
- consumerWorkers.put(workerId, worker);
- consumerThreadPool.submit(worker);
+ DispatcherWorker worker = new DispatcherWorker(workerId, new
ArrayList<>());
+ dispatcherWorkers.put(workerId, worker);
+ dispatcherThreadPool.submit(worker);
- LOG.info("Started HDFS consumer worker '{}' - will process ANY
appId assigned by Kafka", workerId);
+ LOG.info("Started HDFS dispatcher worker '{}' - will process ANY
appId assigned by Kafka", workerId);
}
- LOG.info("<== AuditHDFSConsumer.startConsumerWorkers(): All {} workers
started in SUBSCRIBE mode", consumerThreadCount);
+ LOG.info("<== AuditHDFSDispatcher.startDispatcherWorkers(): All {}
workers started in SUBSCRIBE mode", dispatcherThreadCount);
}
- private class ConsumerWorker implements Runnable {
+ private class DispatcherWorker implements Runnable {
Review Comment:
`AuditHDFSDispatcher.DispatcherWorker` class seems duplicate of the
`AuditSolrDispatcher.DispatcherWorker`. Please review and consider moving the
class to `AuditDispatcherBase` in `ranger-audit-dispatcher-common`.
##########
audit-server/audit-dispatcher/dispatcher-hdfs/src/main/resources/conf/ranger-audit-dispatcher-hdfs-site.xml:
##########
@@ -0,0 +1,140 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<configuration>
+ <!-- HDFS DISPATCHER SERVICE CONFIGURATION -->
+ <property>
+ <name>ranger.audit.dispatcher.hdfs.class</name>
+ <value>org.apache.ranger.audit.dispatcher.HdfsDispatcherManager</value>
+ </property>
+
+
+
+ <property>
+ <name>log.dir</name>
+ <value>${audit.dispatcher.hdfs.log.dir}</value>
+ <description>Log directory for HDFS dispatcher service</description>
+ </property>
+
+ <!-- KAFKA DISPATCHER CONFIGURATION -->
+ <property>
+ <name>xasecure.audit.destination.kafka.host</name>
Review Comment:
Should Kafka related configurations start with `xasecure.audit.destination.`
(which are intrepretted by audit framework)? Consider using prefix like
`ranger.audit.dispatcher.`
##########
audit-server/audit-common/pom.xml:
##########
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ranger</groupId>
+ <artifactId>ranger</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>../..</relativePath>
+ </parent>
+
+ <artifactId>ranger-audit-server-common</artifactId>
+ <packaging>jar</packaging>
+ <name>Ranger Audit Server Common</name>
+ <description>Shared classes between audit ingestor and
dispatcher</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ranger</groupId>
+ <artifactId>ranger-plugins-common</artifactId>
Review Comment:
It looks like `SecureClientLogin` referenced in `AuditMessageQueueUtils`
comes from`ranger-plugins-common`; so this dependency is okay for now.
Eventually, we should consider replacing this usage with an alternate
implementation like `KerberosAuthNHandler` in `pdp` module.
##########
audit-server/audit-dispatcher/dispatcher-hdfs/src/main/java/org/apache/ranger/audit/dispatcher/HdfsDispatcherManager.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.dispatcher;
+
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcher;
+import org.apache.ranger.audit.dispatcher.kafka.AuditDispatcherTracker;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.server.AuditServerConstants;
+import org.apache.ranger.audit.utils.AuditServerLogFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Spring component that manages the lifecycle of HDFS dispatcher threads.
+ * Manager that handles the lifecycle of HDFS dispatcher threads.
+ * - Initializes the dispatcher tracker
+ * - Creates HDFS dispatcher instances
+ * - Starts dispatcher threads
+ * - Handles graceful shutdown
+ */
+public class HdfsDispatcherManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(HdfsDispatcherManager.class);
+ private static final String CONFIG_DISPATCHER_TYPE =
"ranger.audit.dispatcher.type";
+
+ private final AuditDispatcherTracker tracker =
AuditDispatcherTracker.getInstance();
+ private AuditDispatcher dispatcher;
+ private Thread dispatcherThread;
+
+ public void init(Properties props) {
+ LOG.info("==> HdfsDispatcherManager.init()");
+
+ String dispatcherType = System.getProperty(CONFIG_DISPATCHER_TYPE);
+ if (dispatcherType != null &&
!dispatcherType.equalsIgnoreCase("hdfs")) {
+ LOG.info("Skipping HdfsDispatcherManager initialization since
dispatcher type is {}", dispatcherType);
+ return;
+ }
+
+ try {
+ if (props == null) {
+ LOG.error("Configuration properties are null");
+ throw new RuntimeException("Failed to load configuration");
+ }
+
+ boolean isEnabled = MiscUtil.getBooleanProperty(props,
"xasecure.audit.destination.hdfs", false);
+ if (!isEnabled) {
+ LOG.warn("HDFS destination is disabled
(xasecure.audit.destination.hdfs=false). No dispatchers will be created.");
+ return;
+ }
+
+ // Initialize and register HDFS Dispatcher
+ initializeDispatcher(props,
AuditServerConstants.PROP_KAFKA_PROP_PREFIX);
+
+ if (dispatcher == null) {
+ LOG.warn("No dispatcher was created! Verify that
xasecure.audit.destination.hdfs=true and classes are configured correctly.");
+ } else {
+ LOG.info("Created HDFS dispatcher");
+
+ // Register shutdown hook
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ LOG.info("JVM shutdown detected, stopping
HdfsDispatcherManager...");
+ shutdown();
+ }, "HdfsDispatcherManager-ShutdownHook"));
+
+ // Start dispatcher thread
+ startDispatcher();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to initialize HdfsDispatcherManager", e);
+ throw new RuntimeException("Failed to initialize
HdfsDispatcherManager", e);
+ }
+ LOG.info("<== HdfsDispatcherManager.init()");
+ }
+
+ private void initializeDispatcher(Properties props, String propPrefix) {
+ LOG.info("==> HdfsDispatcherManager.initializeDispatcher()");
+
+ String clsStr = MiscUtil.getStringProperty(props,
propPrefix + "." + AuditServerConstants.PROP_DISPATCHER_CLASSES,
"org.apache.ranger.audit.dispatcher.kafka.AuditHDFSDispatcher");
+ String[] hdfsDispatcherClasses = clsStr.split(",");
+
+ LOG.info("Initializing {} dispatcher class(es)",
hdfsDispatcherClasses.length);
+
+ String hdfsDispatcherClassName = clsStr.split(",")[0].trim();
+ if (hdfsDispatcherClassName.isEmpty()) {
+ LOG.error("Dispatcher class name is empty");
+ return;
+ }
+
+ try {
+ Class<?> dispatcherClass = Class.forName(hdfsDispatcherClassName);
+ dispatcher = (AuditDispatcher) dispatcherClass
+ .getConstructor(Properties.class, String.class)
+ .newInstance(props, propPrefix);
+ tracker.addActiveDispatcher("hdfs", dispatcher);
+ LOG.info("Successfully initialized dispatcher class: {}",
dispatcherClass.getName());
+ } catch (ClassNotFoundException e) {
+ LOG.error("Dispatcher class not found: {}. Ensure the class is on
the classpath.", hdfsDispatcherClassName, e);
+ } catch (Exception e) {
+ LOG.error("Error initializing dispatcher class: {}",
hdfsDispatcherClassName, e);
+ }
+
+ LOG.info("<== HdfsDispatcherManager.initializeDispatcher()");
+ }
+
+ /**
+ * Start dispatcher thread
+ */
+ private void startDispatcher() {
+ LOG.info("==> HdfsDispatcherManager.startDispatcher()");
+
+ logDispatcherStartup();
+ try {
+ String dispatcherName = dispatcher.getClass().getSimpleName();
+ Thread dispatcherThread = new Thread(dispatcher, dispatcherName);
+ dispatcherThread.setDaemon(true);
+ dispatcherThread.start();
+ LOG.info("Started {} thread [Thread-ID: {}, Thread-Name: '{}']",
dispatcherName, dispatcherThread.getId(), dispatcherThread.getName());
+ } catch (Exception e) {
+ LOG.error("Error starting dispatcher: {}",
dispatcher.getClass().getSimpleName(), e);
+ }
+
+ LOG.info("<== HdfsDispatcherManager.startDispatcher()");
+ }
+
+ private void logDispatcherStartup() {
+ LOG.info("################## HDFS DISPATCHER SERVICE STARTUP
######################");
+
+ if (dispatcher == null) {
+ LOG.warn("WARNING: No HDFS dispatchers are enabled!");
+ LOG.warn("Verify: xasecure.audit.destination.hdfs=true in
configuration");
+ } else {
+ AuditServerLogFormatter.LogBuilder builder =
AuditServerLogFormatter.builder("HDFS Dispatcher Status");
+ String dispatcherType = dispatcher.getClass().getSimpleName();
+ builder.add(dispatcherType, "ENABLED");
+ builder.add("Topic", dispatcher.getTopicName());
+ builder.logInfo(LOG);
+ LOG.info("Starting HDFS dispatcher thread...");
+ }
+
+
LOG.info("########################################################################");
+ }
+
+ public void shutdown() {
+ LOG.info("==> HdfsDispatcherManager.shutdown()");
+
+ // Shutdown dispatcher
+ if (dispatcher != null) {
+ try {
+ LOG.info("Shutting down dispatcher: {}",
dispatcher.getClass().getSimpleName());
+ dispatcher.shutdown();
+ LOG.info("Dispatcher shutdown completed: {}",
dispatcher.getClass().getSimpleName());
+ } catch (Exception e) {
+ LOG.error("Error shutting down dispatcher: {}",
dispatcher.getClass().getSimpleName(), e);
+ }
+ }
+
+ // Wait for thread to terminate
+ if (dispatcherThread != null && dispatcherThread.isAlive()) {
+ try {
+ LOG.info("Waiting for thread to terminate: {}",
dispatcherThread.getName());
+ dispatcherThread.join(10000); // Wait up to 10 seconds
+ if (dispatcherThread.isAlive()) {
+ LOG.warn("Thread did not terminate within 10 seconds: {}",
dispatcherThread.getName());
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for thread to terminate:
{}", dispatcherThread.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ dispatcher = null;
+ dispatcherThread = null;
+ tracker.clearActiveDispatchers();
Review Comment:
Instead of clearing all dispatchers, shouldn't this only remove "hdfs", with
a call to `tracker.removeDispatcher("hdfs")`?
Simlarly in `SolrDispatcherManager` as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]