[08/11] kylin git commit: upgrade Kafka to 0.10.1.0

2016-12-17 Thread billyliu
upgrade Kafka to 0.10.1.0


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cf5d4948
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cf5d4948
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cf5d4948

Branch: refs/heads/KYLIN-2131
Commit: cf5d4948d20a37cccf02bfd23fe6f38e4448e807
Parents: b8a6118
Author: Billy Liu 
Authored: Sat Dec 17 10:08:37 2016 +0800
Committer: Billy Liu 
Committed: Sun Dec 18 14:15:13 2016 +0800

--
 pom.xml |  2 +-
 .../kafka/config/KafkaConsumerProperties.java   | 23 +++-
 .../config/KafkaConsumerPropertiesTest.java |  5 -
 3 files changed, 19 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 51479c8..c837b10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
 
 
 0.98.8-hadoop2
-0.10.0.0
+0.10.1.0
 
 
 3.4.6

http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
--
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 29589d5..5bc1a82 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -21,13 +21,17 @@ package org.apache.kylin.source.kafka.config;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigCannotInitException;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -72,16 +76,25 @@ public class KafkaConsumerProperties {
 }
 
 public static Properties getProperties(Configuration configuration) {
+Set configNames = new HashSet();
+try {
+configNames = ConsumerConfig.configNames();
+} catch (Exception e) {
+// the Kafka configNames api is supported on 0.10.1.0+, in case 
NoSuchMethodException
+String[] configNamesArray = ("metric.reporters, 
metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + 
"sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, 
bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, 
interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " 
client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, 
request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, 
receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, 
ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, 
send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, 
retry.backoff.ms,"
++ " ssl.secure.random.implementation, 
sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, 
sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, 
ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, 
connections.max.idle.ms, session.timeout.ms, metrics.num.samples, 
key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, 
ssl.keystore.location, ssl.cipher.suites, security.protocol, 
ssl.keymanager.algorithm, metrics.sample.window.ms, 
auto.offset.reset").split(",");
+configNames.addAll(Arrays.asList(configNamesArray));
+}
+
 Properties result = new Properties();
 for (Iterator> it = 
configuration.iterator(); it.hasNext();) {
 Map.Entry entry = it.next();
 String key = entry.getKey();
 String value = entry.getValue();
-result.put(key, value);
+if (configNames.contains(key)) {
+result.put(key, value);
+}
 }
-// TODO: Not filter non-kafka properties, no issue, but some annoying 
logs
-// Tried to leverage Kafka API to find non used properties, but the 
API is
-// not open to public

kylin git commit: upgrade Kafka to 0.10.1.0

2016-12-16 Thread billyliu
Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2131 44e959992 -> a0c734d44


upgrade Kafka to 0.10.1.0


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a0c734d4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a0c734d4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a0c734d4

Branch: refs/heads/KYLIN-2131
Commit: a0c734d44583bb15d8ddfb18d9110b38230ae55c
Parents: 44e9599
Author: Billy Liu 
Authored: Sat Dec 17 10:08:37 2016 +0800
Committer: Billy Liu 
Committed: Sat Dec 17 10:08:37 2016 +0800

--
 pom.xml |  2 +-
 .../kafka/config/KafkaConsumerProperties.java   | 23 +++-
 .../config/KafkaConsumerPropertiesTest.java |  5 -
 3 files changed, 19 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kylin/blob/a0c734d4/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 51479c8..c837b10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
 
 
 0.98.8-hadoop2
-0.10.0.0
+0.10.1.0
 
 
 3.4.6

http://git-wip-us.apache.org/repos/asf/kylin/blob/a0c734d4/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
--
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
index 29589d5..5bc1a82 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java
@@ -21,13 +21,17 @@ package org.apache.kylin.source.kafka.config;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigCannotInitException;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -72,16 +76,25 @@ public class KafkaConsumerProperties {
 }
 
 public static Properties getProperties(Configuration configuration) {
+Set configNames = new HashSet();
+try {
+configNames = ConsumerConfig.configNames();
+} catch (Exception e) {
+// the Kafka configNames api is supported on 0.10.1.0+, in case 
NoSuchMethodException
+String[] configNamesArray = ("metric.reporters, 
metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + 
"sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, 
bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, 
interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " 
client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, 
request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, 
receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, 
ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, 
send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, 
retry.backoff.ms,"
++ " ssl.secure.random.implementation, 
sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, 
sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, 
ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, 
connections.max.idle.ms, session.timeout.ms, metrics.num.samples, 
key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, 
ssl.keystore.location, ssl.cipher.suites, security.protocol, 
ssl.keymanager.algorithm, metrics.sample.window.ms, 
auto.offset.reset").split(",");
+configNames.addAll(Arrays.asList(configNamesArray));
+}
+
 Properties result = new Properties();
 for (Iterator> it = 
configuration.iterator(); it.hasNext();) {
 Map.Entry entry = it.next();
 String key = entry.getKey();
 String value = entry.getValue();
-result.put(key, value);
+if (configNames.contains(key)) {
+result.put(key, value);
+}
 }
-// TODO: Not filter non-kafka properties, no issue, but some annoying 
logs
-// Tried to leverage