upgrade Kafka to 0.10.1.0
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cf5d4948 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cf5d4948 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cf5d4948 Branch: refs/heads/KYLIN-2131 Commit: cf5d4948d20a37cccf02bfd23fe6f38e4448e807 Parents: b8a6118 Author: Billy Liu <billy...@apache.org> Authored: Sat Dec 17 10:08:37 2016 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Sun Dec 18 14:15:13 2016 +0800 ---------------------------------------------------------------------- pom.xml | 2 +- .../kafka/config/KafkaConsumerProperties.java | 23 +++++++++++++++----- .../config/KafkaConsumerPropertiesTest.java | 5 ----- 3 files changed, 19 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 51479c8..c837b10 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ <!-- HBase versions --> <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version> - <kafka.version>0.10.0.0</kafka.version> + <kafka.version>0.10.1.0</kafka.version> <!-- Hadoop deps, keep compatible with hadoop2.version --> <zookeeper.version>3.4.6</zookeeper.version> http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java index 29589d5..5bc1a82 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConsumerProperties.java @@ -21,13 +21,17 @@ package org.apache.kylin.source.kafka.config; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigCannotInitException; import org.apache.kylin.common.util.OptionsHelper; @@ -72,16 +76,25 @@ public class KafkaConsumerProperties { } public static Properties getProperties(Configuration configuration) { + Set<String> configNames = new HashSet<String>(); + try { + configNames = ConsumerConfig.configNames(); + } catch (Exception e) { + // the Kafka configNames api is supported on 0.10.1.0+, in case NoSuchMethodException + String[] configNamesArray = ("metric.reporters, metadata.max.age.ms, partition.assignment.strategy, reconnect.backoff.ms," + "sasl.kerberos.ticket.renew.window.factor, max.partition.fetch.bytes, bootstrap.servers, ssl.keystore.type," + " enable.auto.commit, sasl.mechanism, interceptor.classes, exclude.internal.topics, ssl.truststore.password," + " client.id, ssl.endpoint.identification.algorithm, max.poll.records, check.crcs, request.timeout.ms, heartbeat.interval.ms," + " auto.commit.interval.ms, receive.buffer.bytes, ssl.truststore.type, ssl.truststore.location, ssl.keystore.password, fetch.min.bytes," + " fetch.max.bytes, send.buffer.bytes, max.poll.interval.ms, value.deserializer, group.id, retry.backoff.ms," + + " ssl.secure.random.implementation, sasl.kerberos.kinit.cmd, sasl.kerberos.service.name, sasl.kerberos.ticket.renew.jitter, ssl.trustmanager.algorithm, ssl.key.password, fetch.max.wait.ms, sasl.kerberos.min.time.before.relogin, connections.max.idle.ms, session.timeout.ms, metrics.num.samples, key.deserializer, ssl.protocol, ssl.provider, ssl.enabled.protocols, ssl.keystore.location, ssl.cipher.suites, security.protocol, ssl.keymanager.algorithm, metrics.sample.window.ms, auto.offset.reset").split(","); + configNames.addAll(Arrays.asList(configNamesArray)); + } + Properties result = new Properties(); for (Iterator<Map.Entry<String, String>> it = configuration.iterator(); it.hasNext();) { Map.Entry<String, String> entry = it.next(); String key = entry.getKey(); String value = entry.getValue(); - result.put(key, value); + if (configNames.contains(key)) { + result.put(key, value); + } } - // TODO: Not filter non-kafka properties, no issue, but some annoying logs - // Tried to leverage Kafka API to find non used properties, but the API is - // not open to public return result; } @@ -115,7 +128,7 @@ public class KafkaConsumerProperties { return properties; } - public String getKafkaConsumerHadoopJobConf(){ + public String getKafkaConsumerHadoopJobConf() { File kafkaConsumerFile = getKafkaConsumerFile(); return OptionsHelper.convertToFileURL(kafkaConsumerFile.getAbsolutePath()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/cf5d4948/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java index 378ec73..8edb84d 100644 --- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/config/KafkaConsumerPropertiesTest.java @@ -7,16 +7,11 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URL; -import java.util.Arrays; import java.util.Properties; import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.junit.After; import org.junit.Before;