This is an automated email from the ASF dual-hosted git repository. pradeep pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push: new 2258ab0 RANGER-2702: Upgrade Kafka Version in Ranger to 2.4 2258ab0 is described below commit 2258ab08caab0439712bf9bdafeeb9db57a23bfd Author: Andras Katona <akat...@cloudera.com> AuthorDate: Tue Jan 28 13:13:01 2020 +0530 RANGER-2702: Upgrade Kafka Version in Ranger to 2.4 Signed-off-by: Pradeep <prad...@apache.org> --- .../services/kafka/client/ServiceKafkaClient.java | 44 +++++----------------- .../authorizer/KafkaRangerAuthorizerGSSTest.java | 12 +----- .../KafkaRangerAuthorizerSASLSSLTest.java | 12 +----- .../authorizer/KafkaRangerAuthorizerTest.java | 12 +----- .../kafka/authorizer/KafkaTestUtils.java | 19 +++++++++- pom.xml | 2 +- 6 files changed, 32 insertions(+), 69 deletions(-) diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java index 6929257..91a7d27 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java @@ -27,16 +27,16 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import kafka.utils.ZkUtils; -import kafka.utils.ZkUtils$; -import org.I0Itec.zkclient.*; +import org.apache.kafka.common.utils.Time; import org.apache.log4j.Logger; import org.apache.ranger.plugin.client.BaseClient; import org.apache.ranger.plugin.service.ResourceLookupContext; import org.apache.ranger.plugin.util.TimedEventUtil; +import kafka.zk.KafkaZkClient; +import kafka.zookeeper.ZooKeeperClient; +import scala.Option; import scala.collection.Iterator; -import scala.collection.Seq; public class ServiceKafkaClient { private static final Logger LOG = Logger.getLogger(ServiceKafkaClient.class); @@ -82,42 +82,18 @@ public class ServiceKafkaClient { private List<String> getTopicList(List<String> ignoreTopicList) throws Exception { List<String> ret = new ArrayList<String>(); - int sessionTimeout = 5000; - int connectionTimeout = 10000; - ZkClient zkClient = null; - ZkConnection zkConnection = null; - - try { - zkClient = ZkUtils$.MODULE$.createZkClient(zookeeperConnect, sessionTimeout, connectionTimeout); - zkConnection = new ZkConnection(zookeeperConnect, sessionTimeout); - - ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, true); - Seq<String> topicList = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath()); - - Iterator<String> iter = topicList.iterator(); + int sessionTimeout = 5000; + int connectionTimeout = 10000; + ZooKeeperClient zookeeperClient = new ZooKeeperClient(zookeeperConnect, sessionTimeout, connectionTimeout, + 1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty()); + try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, true, Time.SYSTEM)) { + Iterator<String> iter = kafkaZkClient.getAllTopicsInCluster().iterator(); while (iter.hasNext()) { String topic = iter.next(); if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) { ret.add(topic); } } - } finally { - try { - if(zkClient != null) { - zkClient.close(); - } - } catch (Exception ex) { - LOG.error("Error closing zkClient", ex); - } - - try { - if(zkConnection != null) { - zkConnection.close(); - } - - } catch(Exception ex) { - LOG.error("Error closing zkConnection", ex); - } } return ret; } diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java index 43e88b5..0d3665d 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java @@ -30,8 +30,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; import org.apache.hadoop.security.UserGroupInformation; @@ -50,12 +48,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; /** * A simple test that starts a Kafka broker, creates "test" and "dev" topics, @@ -149,11 +143,7 @@ public class KafkaRangerAuthorizerGSSTest { kafkaServer.startup(); // Create some topics - ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$); - - final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false); - AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); - AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); + KafkaTestUtils.createSomeTopics(zkServer.getConnectString()); } private static void configureKerby(String baseDir) throws Exception { diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java index 88a3e02..4bcf078 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java @@ -27,8 +27,6 @@ import java.util.Arrays; import java.util.Properties; import java.util.concurrent.Future; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.hadoop.security.UserGroupInformation; @@ -44,12 +42,8 @@ import org.apache.kafka.common.config.SslConfigs; import org.junit.Assert; import org.junit.Test; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; /** * A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a @@ -148,11 +142,7 @@ public class KafkaRangerAuthorizerSASLSSLTest { kafkaServer.startup(); // Create some topics - ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$); - - final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false); - AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); - AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); + KafkaTestUtils.createSomeTopics(zkServer.getConnectString()); } @org.junit.AfterClass diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java index 8d2f0a4..a042fc7 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java @@ -29,8 +29,6 @@ import java.util.Arrays; import java.util.Properties; import java.util.concurrent.Future; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.hadoop.security.UserGroupInformation; @@ -46,12 +44,8 @@ import org.apache.kafka.common.config.SslConfigs; import org.junit.Assert; import org.junit.Test; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; /** * A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a @@ -143,11 +137,7 @@ public class KafkaRangerAuthorizerTest { kafkaServer.startup(); // Create some topics - ZkClient zkClient = new ZkClient(zkServer.getConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$); - - final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServer.getConnectString()), false); - AdminUtils.createTopic(zkUtils, "test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); - AdminUtils.createTopic(zkUtils, "dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); + KafkaTestUtils.createSomeTopics(zkServer.getConnectString()); } @org.junit.AfterClass diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java index c71ddd3..b703f95 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java @@ -28,7 +28,9 @@ import java.security.SecureRandom; import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.util.Date; +import java.util.Properties; +import org.apache.kafka.common.utils.Time; import org.bouncycastle.asn1.x500.X500Name; import org.bouncycastle.asn1.x500.style.RFC4519Style; import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; @@ -37,6 +39,12 @@ import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; import org.bouncycastle.operator.ContentSigner; import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; +import kafka.admin.RackAwareMode; +import kafka.zk.AdminZkClient; +import kafka.zk.KafkaZkClient; +import kafka.zookeeper.ZooKeeperClient; +import scala.Option; + public final class KafkaTestUtils { public static String createAndStoreKey(String subjectName, String issuerName, BigInteger serial, String keystorePassword, @@ -73,5 +81,14 @@ public final class KafkaTestUtils { return keystoreFile.getPath(); } - + + static void createSomeTopics(String zkConnectString) { + ZooKeeperClient zookeeperClient = new ZooKeeperClient(zkConnectString, 30000, 30000, + 1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty()); + try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, false, Time.SYSTEM)) { + AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + adminZkClient.createTopic("test", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); + adminZkClient.createTopic("dev", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); + } + } } diff --git a/pom.xml b/pom.xml index e3c5ce3..f4cc712 100644 --- a/pom.xml +++ b/pom.xml @@ -150,7 +150,7 @@ <jsr250.version>1.0</jsr250.version> <jsr305.version>1.3.9</jsr305.version> <junit.version>4.12</junit.version> - <kafka.version>2.0.0</kafka.version> + <kafka.version>2.4.0</kafka.version> <kerby.version>1.0.0</kerby.version> <knox.gateway.version>1.2.0</knox.gateway.version> <kylin.version>2.6.4</kylin.version>