Added Test for KafkaConsumerConfig
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/32c92828 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/32c92828 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/32c92828 Branch: refs/heads/NewKafkaSystemConsumer Commit: 32c92828eaff98f4c2e6691533ece9f502ef1f98 Parents: 2480aa3 Author: Boris S <[email protected]> Authored: Wed Sep 12 14:06:41 2018 -0700 Committer: Boris S <[email protected]> Committed: Wed Sep 12 14:06:41 2018 -0700 ---------------------------------------------------------------------- .../clients/consumer/KafkaConsumerConfig.java | 23 ++-- .../org/apache/samza/config/KafkaConfig.scala | 5 +- .../samza/system/kafka/KafkaConsumerProxy.java | 14 ++- .../consumer/TestKafkaConsumerConfig.java | 121 +++++++++++++++++++ 4 files changed, 149 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java index 1a97ec7..8ada1b4 100644 --- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java @@ -54,7 +54,7 @@ public class KafkaConsumerConfig extends ConsumerConfig { * By default, KafkaConsumer will fetch ALL available messages for all the partitions. * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). */ - private static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100"; + static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100"; private KafkaConsumerConfig(Properties props) { super(props); @@ -83,6 +83,11 @@ public class KafkaConsumerConfig extends ConsumerConfig { //Kafka client configuration + // put overrides + consumerProps.putAll(injectProps); + + // These are values we enforce in sazma, and they cannot be overwritten. + // Disable consumer auto-commit because Samza controls commits consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -106,28 +111,24 @@ public class KafkaConsumerConfig extends ConsumerConfig { // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should // default to byte[] - if (!config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { LOG.info("setting default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); } - if (!config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { LOG.info("setting default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); } - // NOT SURE THIS IS NEEDED TODO - final String maxPollRecords = - subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); - consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); - - // put overrides - consumerProps.putAll(injectProps); + // Override default max poll config if there is no value + consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); return new KafkaConsumerConfig(consumerProps); } // group id should be unique per job - private static String getConsumerGroupId(Config config) { + static String getConsumerGroupId(Config config) { JobConfig jobConfig = new JobConfig(config); Option<String> jobIdOption = jobConfig.getJobId(); Option<String> jobNameOption = jobConfig.getName(); http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 26664ea..ef43e72 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -289,7 +289,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { properties } - // kafka config + /** + * @deprecated Use KafkaConsumerConfig + */ + @Deprecated def getKafkaSystemConsumerConfig( systemName: String, clientId: String, groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString, http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 4b99fcc..83e7a58 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -431,12 +431,22 @@ public class KafkaConsumerProxy<K, V> { return failureCause; } - public void stop(long timeout) { + /** + * stop the thread and wait for it to stop + * @param timeoutMs how long to wait in join + */ + public void stop(long timeoutMs) { LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName()); isRunning = false; try { - consumerPollThread.join(timeout); + consumerPollThread.join(timeoutMs); + // join returns event if the thread didn't finish + // in this case we should interrupt it and wait again + if (consumerPollThread.isAlive()) { + consumerPollThread.interrupt(); + consumerPollThread.join(timeoutMs); + } } catch (InterruptedException e) { LOG.warn("Join in KafkaConsumerProxy has failed", e); consumerPollThread.interrupt(); http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java new file mode 100644 index 0000000..ee300d0 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java @@ -0,0 +1,121 @@ +package org.apache.kafka.clients.consumer; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestKafkaConsumerConfig { + private final Map<String, String> props = new HashMap<>(); + public final static String SYSTEM_NAME = "testSystem"; + public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."; + public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer."; + private final static String CLIENT_ID = "clientId"; + + @Before + public void setProps() { + + } + + @Test + public void testDefaultsAndOverrides() { + + Map<String, String> overrides = new HashMap<>(); + overrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored + overrides.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored + overrides.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored + + // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored + props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092"); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092"); + + // should be overridden + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "true"); //ignore + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); // ignore + + + // should be overridden + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "200"); + + Config config = new MapConfig(props); + KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( + config, SYSTEM_NAME, CLIENT_ID, overrides); + + Assert.assertEquals(kafkaConsumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), false); + + Assert.assertEquals( + kafkaConsumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), + Integer.valueOf(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS)); + + Assert.assertEquals( + kafkaConsumerConfig.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG).get(0), + RangeAssignor.class.getName()); + + Assert.assertEquals( + kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0), + "useThis:9092"); + Assert.assertEquals( + kafkaConsumerConfig.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG).longValue(), + 100); + + Assert.assertEquals( + kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), + ByteArrayDeserializer.class); + + Assert.assertEquals( + kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), + ByteArrayDeserializer.class); + + Assert.assertEquals( + kafkaConsumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG), + CLIENT_ID); + + Assert.assertEquals( + kafkaConsumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG), + KafkaConsumerConfig.getConsumerGroupId(config)); + } + + @Test + // test stuff that should not be overridden + public void testNotOverride() { + + // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used + props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092"); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); + + + Config config = new MapConfig(props); + KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( + config, SYSTEM_NAME, CLIENT_ID, Collections.emptyMap()); + + Assert.assertEquals( + kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0), + "useThis:9092"); + + Assert.assertEquals( + kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), + TestKafkaConsumerConfig.class); + + Assert.assertEquals( + kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), + TestKafkaConsumerConfig.class); + } + + + + @Test(expected = SamzaException.class) + public void testNoBootstrapServers() { + KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( + new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId", Collections.emptyMap()); + + Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + } +}
