upgrade Kafka to 0.10.1.0

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cf5d4948
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cf5d4948
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cf5d4948

Branch: refs/heads/KYLIN-2131
Commit: cf5d4948d20a37cccf02bfd23fe6f38e4448e807
Parents: b8a6118
Author: Billy Liu <billy...@apache.org>
Authored: Sat Dec 17 10:08:37 2016 +0800
Committer: Billy Liu <billy...@apache.org>
Committed: Sun Dec 18 14:15:13 2016 +0800

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 .../kafka/config/KafkaConsumerProperties.java   | 23 +++++++++++++++-----
 .../config/KafkaConsumerPropertiesTest.java     |  5 -----
 3 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 51479c8..c837b10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
 
         <!-- HBase versions -->
         <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
-        <kafka.version>0.10.0.0</kafka.version>
+        <kafka.version>0.10.1.0</kafka.version>
 
         <!-- Hadoop deps, keep compatible with hadoop2.version -->
         <zookeeper.version>3.4.6</zookeeper.version>

http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 29589d5..5bc1a82 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -21,13 +21,17 @@ package org.apache.kylin.source.kafka.config;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigCannotInitException;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -72,16 +76,25 @@ public class KafkaConsumerProperties {
     }
 
     public static Properties getProperties(Configuration configuration) {
+        Set<String> configNames = new HashSet<String>();
+        try {
+            configNames = ConsumerConfig.configNames();
+        } catch (Exception e) {
+            // the Kafka configNames api is supported on 0.10.1.0+, in case 
NoSuchMethodException
+            String[] configNamesArray = ("metric.reporters, 
metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + 
"sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, 
bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, 
interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " 
client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, 
request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, 
receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, 
ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, 
send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, 
retry.backoff.ms,"
+                    + " ssl.secure.random.implementation, 
sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, 
sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, 
ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, 
connections.max.idle.ms, session.timeout.ms, metrics.num.samples, 
key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, 
ssl.keystore.location, ssl.cipher.suites, security.protocol, 
ssl.keymanager.algorithm, metrics.sample.window.ms, 
auto.offset.reset").split(",");
+            configNames.addAll(Arrays.asList(configNamesArray));
+        }
+
         Properties result = new Properties();
         for (Iterator<Map.Entry<String, String>> it = 
configuration.iterator(); it.hasNext();) {
             Map.Entry<String, String> entry = it.next();
             String key = entry.getKey();
             String value = entry.getValue();
-            result.put(key, value);
+            if (configNames.contains(key)) {
+                result.put(key, value);
+            }
         }
-        // TODO: Not filter non-kafka properties, no issue, but some annoying 
logs
-        // Tried to leverage Kafka API to find non used properties, but the 
API is
-        // not open to public
         return result;
     }
 
@@ -115,7 +128,7 @@ public class KafkaConsumerProperties {
         return properties;
     }
 
-    public String getKafkaConsumerHadoopJobConf(){
+    public String getKafkaConsumerHadoopJobConf() {
         File kafkaConsumerFile = getKafkaConsumerFile();
         return 
OptionsHelper.convertToFileURL(kafkaConsumerFile.getAbsolutePath());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
 
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
index 378ec73..8edb84d 100644
--- 
a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
+++ 
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java
@@ -7,16 +7,11 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.util.Arrays;
 import java.util.Properties;
 
 import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Before;

Reply via email to