This is an automated email from the ASF dual-hosted git repository.

rmani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d8ff16  RANGER-3001:Update Ranger KafkaClient to use Kafka 
AdminClient API instead of Zookeeper
1d8ff16 is described below

commit 1d8ff1603674b967a4372e8963136945432e98f5
Author: Ramesh Mani <[email protected]>
AuthorDate: Mon Oct 12 09:17:34 2020 -0700

    RANGER-3001:Update Ranger KafkaClient to use Kafka AdminClient API instead 
of Zookeeper
---
 .../services/kafka/client/ServiceKafkaClient.java  | 119 +++++++++++++++------
 .../kafka/client/ServiceKafkaConnectionMgr.java    |  67 ++++++++++--
 2 files changed, 147 insertions(+), 39 deletions(-)

diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 91a7d27..1144081 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -19,25 +19,25 @@
 
 package org.apache.ranger.services.kafka.client;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.log4j.Logger;
 import org.apache.ranger.plugin.client.BaseClient;
 import org.apache.ranger.plugin.service.ResourceLookupContext;
 import org.apache.ranger.plugin.util.TimedEventUtil;
 
-import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
-import scala.Option;
-import scala.collection.Iterator;
-
 public class ServiceKafkaClient {
        private static final Logger LOG = 
Logger.getLogger(ServiceKafkaClient.class);
 
@@ -45,21 +45,32 @@ public class ServiceKafkaClient {
                TOPIC
        }
 
-       String serviceName = null;
-       String zookeeperConnect = null;
+       String serviceName;
+       Map<String,String > configs;
        private static final String errMessage = " You can still save the 
repository and start creating "
                        + "policies, but you would not be able to use 
autocomplete for "
                        + "resource names. Check server logs for more info.";
 
-       private static final String TOPIC_KEY = "topic";
-       private static final long LOOKUP_TIMEOUT_SEC = 5;
-
-       public ServiceKafkaClient(String serviceName, String zookeeperConnect) {
+       private static final String TOPIC_KEY                           = 
"topic";
+       private static final long   LOOKUP_TIMEOUT_SEC          = 5;
+       private static final String KEY_SASL_MECHANISM          = 
"sasl.mechanism";
+       private static final String KEY_SASL_JAAS_CONFIG        = 
"sasl.jaas.config";
+       private static final String KEY_KAFKA_KEYTAB            = 
"kafka.keytab";
+       private static final String KEY_KAFKA_PRINCIPAL         = 
"kafka.principal";
+       private static final String JAAS_KRB5_MODULE            = 
"com.sun.security.auth.module.Krb5LoginModule required";
+       private static final String JAAS_USE_KEYTAB                     = 
"useKeyTab=true";
+       private static final String JAAS_KEYTAB                         = 
"keyTab=\"";
+       private static final String JAAS_STOKE_KEY                      = 
"storeKey=true";
+       private static final String JAAS_SERVICE_NAME           = 
"serviceName=kafka";
+       private static final String JAAS_USER_TICKET_CACHE      = 
"useTicketCache=false";
+       private static final String JAAS_PRINCIPAL                      = 
"principal=\"";
+
+       public ServiceKafkaClient(String serviceName, Map<String,String> 
configs) {
                this.serviceName = serviceName;
-               this.zookeeperConnect = zookeeperConnect;
+               this.configs = configs;
        }
 
-       public Map<String, Object> connectionTest() throws Exception {
+       public Map<String, Object> connectionTest() {
                String errMsg = errMessage;
                Map<String, Object> responseData = new HashMap<String, 
Object>();
                try {
@@ -69,7 +80,7 @@ public class ServiceKafkaClient {
                        String successMsg = "ConnectionTest Successful";
                        BaseClient.generateResponseDataMap(true, successMsg,
                                        successMsg, null, null, responseData);
-               } catch (IOException e) {
+               } catch (Exception e) {
                        LOG.error("Error connecting to Kafka. kafkaClient=" + 
this, e);
                        String failureMsg = "Unable to connect to Kafka 
instance."
                                        + e.getMessage();
@@ -84,22 +95,41 @@ public class ServiceKafkaClient {
 
                int sessionTimeout = 5000;
                int connectionTimeout = 10000;
-               ZooKeeperClient zookeeperClient = new 
ZooKeeperClient(zookeeperConnect, sessionTimeout, connectionTimeout,
-                               1, Time.SYSTEM, "kafka.server", 
"SessionExpireListener", Option.empty());
-               try (KafkaZkClient kafkaZkClient = new 
KafkaZkClient(zookeeperClient, true, Time.SYSTEM)) {
-                       Iterator<String> iter = 
kafkaZkClient.getAllTopicsInCluster().iterator();
-                       while (iter.hasNext()) {
-                               String topic = iter.next();
-                               if (ignoreTopicList == null || 
!ignoreTopicList.contains(topic)) {
-                                       ret.add(topic);
+               AdminClient adminClient = null;
+
+               try {
+                       Properties props = new Properties();
+                       props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+                       props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, 
configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG));
+                       props.put(KEY_SASL_MECHANISM, 
configs.get(KEY_SASL_MECHANISM));
+                       props.put(KEY_SASL_JAAS_CONFIG, getJAASConfig(configs));
+                       props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
getIntProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, sessionTimeout));
+                       
props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 
getIntProperty(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 
connectionTimeout));
+                       adminClient = KafkaAdminClient.create(props);
+                       ListTopicsResult listTopicsResult = 
adminClient.listTopics();
+                       if (listTopicsResult != null) {
+                               Collection<TopicListing> topicListings = 
listTopicsResult.listings().get();
+                               for (TopicListing topicListing : topicListings) 
{
+                                       String topicName = topicListing.name();
+                                       if (ignoreTopicList == null || 
!ignoreTopicList.contains(topicName)) {
+                                               ret.add(topicName);
+                                       }
                                }
                        }
+               } catch (Exception e) {
+                       throw e;
+               } finally {
+                       if (adminClient != null) {
+                               adminClient.close();
+                       }
                }
                return ret;
        }
 
+
+
        /**
-        * @param serviceName
+        * @param context
         * @param context
         * @return
         */
@@ -124,11 +154,11 @@ public class ServiceKafkaClient {
                                topicList = resourceMap.get(TOPIC_KEY);
                        }
                        switch (resource.trim().toLowerCase()) {
-                       case TOPIC_KEY:
-                               lookupResource = RESOURCE_TYPE.TOPIC;
-                               break;
-                       default:
-                               break;
+                               case TOPIC_KEY:
+                                       lookupResource = RESOURCE_TYPE.TOPIC;
+                                       break;
+                               default:
+                                       break;
                        }
                }
 
@@ -182,7 +212,34 @@ public class ServiceKafkaClient {
        @Override
        public String toString() {
                return "ServiceKafkaClient [serviceName=" + serviceName
-                               + ", zookeeperConnect=" + zookeeperConnect + 
"]";
+                               + ", configs=" + configs + "]";
+       }
+
+       private Integer getIntProperty(String key, int defaultValue) {
+               if (key == null) {
+                       return defaultValue;
+               }
+               String rtrnVal = configs.get(key);
+               if (rtrnVal == null) {
+                       return defaultValue;
+               }
+               return Integer.valueOf(rtrnVal);
+       }
+
+       private String getJAASConfig(Map<String,String> configs){
+               String jaasConfig =  new StringBuilder()
+                               .append(JAAS_KRB5_MODULE).append(" ")
+                               .append(JAAS_USE_KEYTAB).append(" ")
+                               
.append(JAAS_KEYTAB).append(configs.get(KEY_KAFKA_KEYTAB)).append("\"").append("
 ")
+                               .append(JAAS_STOKE_KEY).append(" ")
+                               .append(JAAS_USER_TICKET_CACHE).append(" ")
+                               .append(JAAS_SERVICE_NAME).append(" ")
+                               
.append(JAAS_PRINCIPAL).append(configs.get(KEY_KAFKA_PRINCIPAL)).append("\";")
+                               .toString();
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("KafkaClient JAAS: " + jaasConfig);
+               }
+               return jaasConfig;
        }
 
 }
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
index 9e0d6b4..60c55cc 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
@@ -19,20 +19,25 @@
 
 package org.apache.ranger.services.kafka.client;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import java.util.Map;
 
 public class ServiceKafkaConnectionMgr {
+       private static final String SEPARATOR                   = ",";
+       private static final String KEY_SASL_MECHANISM  = "sasl.mechanism";
+       private static final String KEY_KAFKA_KEYTAB    = "kafka.keytab";
+       private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal";
 
        static public ServiceKafkaClient getKafkaClient(String serviceName,
-                       Map<String, String> configs) throws Exception {
-               String zookeeperConnect = configs.get("zookeeper.connect");
-               if (zookeeperConnect != null) {
-                       ServiceKafkaClient serviceKafkaClient = new 
ServiceKafkaClient(
-                                       serviceName, zookeeperConnect);
-                       return serviceKafkaClient;
+                                                                               
                        Map<String, String> configs) throws Exception {
+               String error = getServiceConfigValidationErrors(configs);
+               if (StringUtils.isNotBlank(error)){
+                       error =  "JAAS configuration missing or not correct in 
Ranger Kafka Service..." + error;
+                       throw new Exception(error);
                }
-               throw new Exception("Required properties are not set for "
-                               + serviceName + ". URL or Zookeeper information 
not provided.");
+               ServiceKafkaClient serviceKafkaClient = new 
ServiceKafkaClient(serviceName, configs);
+               return serviceKafkaClient;
        }
 
        /**
@@ -47,4 +52,50 @@ public class ServiceKafkaConnectionMgr {
                return serviceKafkaClient.connectionTest();
        }
 
+       private static String  getServiceConfigValidationErrors(Map<String, 
String> configs) {
+               StringBuilder ret = new StringBuilder();
+
+               String bootstrap_servers = 
configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+               String security_protocol = 
configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+               String sasl_mechanism = configs.get(KEY_SASL_MECHANISM);
+               String kafka_keytab = configs.get(KEY_KAFKA_KEYTAB);
+               String kafka_principal = configs.get(KEY_KAFKA_PRINCIPAL);
+
+               if (StringUtils.isEmpty(bootstrap_servers)) {
+                       ret.append(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+               }
+
+               if (StringUtils.isEmpty(security_protocol)) {
+                       if (StringUtils.isNotBlank(ret.toString())) {
+                               
ret.append(SEPARATOR).append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+                       } else {
+                               
ret.append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+                       }
+               }
+
+               if (StringUtils.isEmpty(sasl_mechanism)) {
+                       if (StringUtils.isNotBlank(ret.toString())) {
+                               
ret.append(SEPARATOR).append(KEY_SASL_MECHANISM);
+                       } else {
+                               ret.append(KEY_SASL_MECHANISM);
+                       }
+               }
+
+               if (StringUtils.isEmpty(kafka_keytab)) {
+                       if (StringUtils.isNotBlank(ret.toString())) {
+                               ret.append(SEPARATOR).append(KEY_KAFKA_KEYTAB);
+                       } else {
+                               ret.append(KEY_KAFKA_KEYTAB);
+                       }
+               }
+
+               if (StringUtils.isEmpty(kafka_principal)) {
+                       if (StringUtils.isNotBlank(ret.toString())) {
+                               
ret.append(SEPARATOR).append(KEY_KAFKA_PRINCIPAL);
+                       } else {
+                               ret.append(KEY_KAFKA_PRINCIPAL);
+                       }
+               }
+               return ret.toString();
+       }
 }

Reply via email to