This is an automated email from the ASF dual-hosted git repository.

pradeep pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 2258ab0  RANGER-2702: Upgrade Kafka Version in Ranger to 2.4
2258ab0 is described below

commit 2258ab08caab0439712bf9bdafeeb9db57a23bfd
Author: Andras Katona <akat...@cloudera.com>
AuthorDate: Tue Jan 28 13:13:01 2020 +0530

    RANGER-2702: Upgrade Kafka Version in Ranger to 2.4
    
    Signed-off-by: Pradeep <prad...@apache.org>
---
 .../services/kafka/client/ServiceKafkaClient.java  | 44 +++++-----------------
 .../authorizer/KafkaRangerAuthorizerGSSTest.java   | 12 +-----
 .../KafkaRangerAuthorizerSASLSSLTest.java          | 12 +-----
 .../authorizer/KafkaRangerAuthorizerTest.java      | 12 +-----
 .../kafka/authorizer/KafkaTestUtils.java           | 19 +++++++++-
 pom.xml                                            |  2 +-
 6 files changed, 32 insertions(+), 69 deletions(-)

diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 6929257..91a7d27 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -27,16 +27,16 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import kafka.utils.ZkUtils;
-import kafka.utils.ZkUtils$;
-import org.I0Itec.zkclient.*;
+import org.apache.kafka.common.utils.Time;
 import org.apache.log4j.Logger;
 import org.apache.ranger.plugin.client.BaseClient;
 import org.apache.ranger.plugin.service.ResourceLookupContext;
 import org.apache.ranger.plugin.util.TimedEventUtil;
 
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
+import scala.Option;
 import scala.collection.Iterator;
-import scala.collection.Seq;
 
 public class ServiceKafkaClient {
        private static final Logger LOG = 
Logger.getLogger(ServiceKafkaClient.class);
@@ -82,42 +82,18 @@ public class ServiceKafkaClient {
        private List<String> getTopicList(List<String> ignoreTopicList) throws 
Exception {
                List<String> ret = new ArrayList<String>();
 
-               int          sessionTimeout    = 5000;
-        int          connectionTimeout = 10000;
-               ZkClient     zkClient          = null;
-               ZkConnection zkConnection      = null;
-
-               try {
-               zkClient     = 
ZkUtils$.MODULE$.createZkClient(zookeeperConnect, sessionTimeout, 
connectionTimeout);
-               zkConnection = new ZkConnection(zookeeperConnect, 
sessionTimeout);
-
-               ZkUtils      zkUtils           = new ZkUtils(zkClient, 
zkConnection, true);
-               Seq<String>  topicList         = 
zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath());
-
-                       Iterator<String> iter = topicList.iterator();
+               int sessionTimeout = 5000;
+               int connectionTimeout = 10000;
+               ZooKeeperClient zookeeperClient = new 
ZooKeeperClient(zookeeperConnect, sessionTimeout, connectionTimeout,
+                               1, Time.SYSTEM, "kafka.server", 
"SessionExpireListener", Option.empty());
+               try (KafkaZkClient kafkaZkClient = new 
KafkaZkClient(zookeeperClient, true, Time.SYSTEM)) {
+                       Iterator<String> iter = 
kafkaZkClient.getAllTopicsInCluster().iterator();
                        while (iter.hasNext()) {
                                String topic = iter.next();
                                if (ignoreTopicList == null || 
!ignoreTopicList.contains(topic)) {
                                        ret.add(topic);
                                }
                        }
-               } finally {
-                       try {
-                               if(zkClient != null) {
-                                       zkClient.close();
-                               }
-                       } catch (Exception ex) {
-                               LOG.error("Error closing zkClient", ex);
-                       }
-                       
-                       try {
-                               if(zkConnection != null) {
-                                       zkConnection.close();
-                               }
-                               
-                       } catch(Exception ex) {
-                               LOG.error("Error closing zkConnection", ex);
-                       }
                }
                return ret;
        }
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index 43e88b5..0d3665d 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -30,8 +30,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -50,12 +48,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
 
 /**
  * A simple test that starts a Kafka broker, creates "test" and "dev" topics,
@@ -149,11 +143,7 @@ public class KafkaRangerAuthorizerGSSTest {
         kafkaServer.startup();
 
         // Create some topics
-        ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 
30000, ZKStringSerializer$.MODULE$);
-
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(zkServer.getConnectString()), false);
-        AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), 
RackAwareMode.Enforced$.MODULE$);
-        AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), 
RackAwareMode.Enforced$.MODULE$);
+        KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
     }
 
     private static void configureKerby(String baseDir) throws Exception {
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
index 88a3e02..4bcf078 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
@@ -27,8 +27,6 @@ import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -44,12 +42,8 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.junit.Assert;
 import org.junit.Test;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
 
 /**
  * A simple test that starts a Kafka broker, creates "test" and "dev" topics, 
sends a message to them and consumes it. We also plug in a 
@@ -148,11 +142,7 @@ public class KafkaRangerAuthorizerSASLSSLTest {
         kafkaServer.startup();
 
         // Create some topics
-        ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 
30000, ZKStringSerializer$.MODULE$);
-
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(zkServer.getConnectString()), false);
-        AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), 
RackAwareMode.Enforced$.MODULE$);
-        AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), 
RackAwareMode.Enforced$.MODULE$);
+        KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
     }
     
     @org.junit.AfterClass
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
index 8d2f0a4..a042fc7 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
@@ -29,8 +29,6 @@ import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -46,12 +44,8 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.junit.Assert;
 import org.junit.Test;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
 
 /**
  * A simple test that starts a Kafka broker, creates "test" and "dev" topics, 
sends a message to them and consumes it. We also plug in a 
@@ -143,11 +137,7 @@ public class KafkaRangerAuthorizerTest {
         kafkaServer.startup();
 
         // Create some topics
-        ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 
30000, ZKStringSerializer$.MODULE$);
-
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(zkServer.getConnectString()), false);
-        AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), 
RackAwareMode.Enforced$.MODULE$);
-        AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), 
RackAwareMode.Enforced$.MODULE$);
+        KafkaTestUtils.createSomeTopics(zkServer.getConnectString());
     }
     
     @org.junit.AfterClass
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
index c71ddd3..b703f95 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
@@ -28,7 +28,9 @@ import java.security.SecureRandom;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.Date;
+import java.util.Properties;
 
+import org.apache.kafka.common.utils.Time;
 import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x500.style.RFC4519Style;
 import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
@@ -37,6 +39,12 @@ import 
org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
 import org.bouncycastle.operator.ContentSigner;
 import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
 
+import kafka.admin.RackAwareMode;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
+import scala.Option;
+
 public final class KafkaTestUtils {
     
     public static String createAndStoreKey(String subjectName, String 
issuerName, BigInteger serial, String keystorePassword,
@@ -73,5 +81,14 @@ public final class KafkaTestUtils {
        return keystoreFile.getPath();
        
     }
-    
+
+       static void createSomeTopics(String zkConnectString) {
+               ZooKeeperClient zookeeperClient = new 
ZooKeeperClient(zkConnectString, 30000, 30000,
+                               1, Time.SYSTEM, "kafka.server", 
"SessionExpireListener", Option.empty());
+               try (KafkaZkClient kafkaZkClient = new 
KafkaZkClient(zookeeperClient, false, Time.SYSTEM)) {
+                       AdminZkClient adminZkClient = new 
AdminZkClient(kafkaZkClient);
+                       adminZkClient.createTopic("test", 1, 1, new 
Properties(), RackAwareMode.Enforced$.MODULE$);
+                       adminZkClient.createTopic("dev", 1, 1, new 
Properties(), RackAwareMode.Enforced$.MODULE$);
+               }
+       }
 }
diff --git a/pom.xml b/pom.xml
index e3c5ce3..f4cc712 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,7 +150,7 @@
         <jsr250.version>1.0</jsr250.version>
         <jsr305.version>1.3.9</jsr305.version>
         <junit.version>4.12</junit.version>
-        <kafka.version>2.0.0</kafka.version>
+        <kafka.version>2.4.0</kafka.version>
         <kerby.version>1.0.0</kerby.version>
         <knox.gateway.version>1.2.0</knox.gateway.version>
         <kylin.version>2.6.4</kylin.version>

Reply via email to