[08/11] kylin git commit: upgrade Kafka to 0.10.1.0
upgrade Kafka to 0.10.1.0 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cf5d4948 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cf5d4948 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cf5d4948 Branch: refs/heads/KYLIN-2131 Commit: cf5d4948d20a37cccf02bfd23fe6f38e4448e807 Parents: b8a6118 Author: Billy LiuAuthored: Sat Dec 17 10:08:37 2016 +0800 Committer: Billy Liu Committed: Sun Dec 18 14:15:13 2016 +0800 -- pom.xml | 2 +- .../kafka/config/KafkaConsumerProperties.java | 23 +++- .../config/KafkaConsumerPropertiesTest.java | 5 - 3 files changed, 19 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/pom.xml -- diff --git a/pom.xml b/pom.xml index 51479c8..c837b10 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ 0.98.8-hadoop2 -0.10.0.0 +0.10.1.0 3.4.6 http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java index 29589d5..5bc1a82 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -21,13 +21,17 @@ package org.apache.kylin.source.kafka.config; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigCannotInitException; import org.apache.kylin.common.util.OptionsHelper; @@ -72,16 +76,25 @@ public class KafkaConsumerProperties { } public static Properties getProperties(Configuration configuration) { +Set configNames = new HashSet(); +try { +configNames = ConsumerConfig.configNames(); +} catch (Exception e) { +// the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException +String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms," ++ " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(","); +configNames.addAll(Arrays.asList(configNamesArray)); +} + Properties result = new Properties(); for (Iterator > it = configuration.iterator(); it.hasNext();) { Map.Entry entry = it.next(); String key = entry.getKey(); String value = entry.getValue(); -result.put(key, value); +if (configNames.contains(key)) { +result.put(key, value); +} } -// TODO: Not filter non-kafka properties, no issue, but some annoying logs -// Tried to leverage Kafka API to find non used properties, but the API is -// not open to public
kylin git commit: upgrade Kafka to 0.10.1.0
Repository: kylin Updated Branches: refs/heads/KYLIN-2131 44e959992 -> a0c734d44 upgrade Kafka to 0.10.1.0 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a0c734d4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a0c734d4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a0c734d4 Branch: refs/heads/KYLIN-2131 Commit: a0c734d44583bb15d8ddfb18d9110b38230ae55c Parents: 44e9599 Author: Billy LiuAuthored: Sat Dec 17 10:08:37 2016 +0800 Committer: Billy Liu Committed: Sat Dec 17 10:08:37 2016 +0800 -- pom.xml | 2 +- .../kafka/config/KafkaConsumerProperties.java | 23 +++- .../config/KafkaConsumerPropertiesTest.java | 5 - 3 files changed, 19 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/a0c734d4/pom.xml -- diff --git a/pom.xml b/pom.xml index 51479c8..c837b10 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ 0.98.8-hadoop2 -0.10.0.0 +0.10.1.0 3.4.6 http://git-wip-us.apache.org/repos/asf/kylin/blob/a0c734d4/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java index 29589d5..5bc1a82 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -21,13 +21,17 @@ package org.apache.kylin.source.kafka.config; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigCannotInitException; import org.apache.kylin.common.util.OptionsHelper; @@ -72,16 +76,25 @@ public class KafkaConsumerProperties { } public static Properties getProperties(Configuration configuration) { +Set configNames = new HashSet(); +try { +configNames = ConsumerConfig.configNames(); +} catch (Exception e) { +// the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException +String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms," ++ " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(","); +configNames.addAll(Arrays.asList(configNamesArray)); +} + Properties result = new Properties(); for (Iterator > it = configuration.iterator(); it.hasNext();) { Map.Entry entry = it.next(); String key = entry.getKey(); String value = entry.getValue(); -result.put(key, value); +if (configNames.contains(key)) { +result.put(key, value); +} } -// TODO: Not filter non-kafka properties, no issue, but some annoying logs -// Tried to leverage