Repository: kafka Updated Branches: refs/heads/trunk 3bb38d37b -> bb629f224
KAFKA-3929: Add prefix for underlying clients configs in StreamConfig Add prefixes for consumer and producer configs to StreamsConfig, but be backward compatible. Author: Damian Guy <[email protected]> Reviewers: Eno Thereska, Guozhang Wang Closes #1649 from dguy/kafka-3929 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb629f22 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb629f22 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb629f22 Branch: refs/heads/trunk Commit: bb629f2243c4462db2a863793c190d734f11f3c6 Parents: 3bb38d3 Author: Damian Guy <[email protected]> Authored: Tue Aug 2 14:14:52 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Aug 2 14:14:52 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/StreamsConfig.java | 83 ++++++++++++++++++-- .../apache/kafka/streams/StreamsConfigTest.java | 64 +++++++++++++++ 2 files changed, 139 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bb629f22/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index a68de4f..b624e0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -47,6 +47,12 @@ public class StreamsConfig extends AbstractConfig { private static final ConfigDef CONFIG; + // Prefix used to isolate consumer configs from producer configs. + public static final String CONSUMER_PREFIX = "consumer."; + + // Prefix used to isolate producer configs from consumer configs. + public static final String PRODUCER_PREFIX = "producer."; + /** <code>state.dir</code> */ public static final String STATE_DIR_CONFIG = "state.dir"; private static final String STATE_DIR_DOC = "Directory location for state store."; @@ -122,6 +128,7 @@ public class StreamsConfig extends AbstractConfig { public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface"; + static { CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value Type.STRING, @@ -251,23 +258,55 @@ public class StreamsConfig extends AbstractConfig { public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__"; } + /** + * Prefix a property with {@link StreamsConfig#CONSUMER_PREFIX}. This is used to isolate consumer configs + * from producer configs + * @param consumerProp + * @return CONSUMER_PREFIX + consumerProp + */ + public static String consumerPrefix(final String consumerProp) { + return CONSUMER_PREFIX + consumerProp; + } + + /** + * Prefix a property with {@link StreamsConfig#PRODUCER_PREFIX}. This is used to isolate producer configs + * from consumer configs + * @param producerProp + * @return PRODUCER_PREFIX + consumerProp + */ + public static String producerPrefix(final String producerProp) { + return PRODUCER_PREFIX + producerProp; + } + public StreamsConfig(Map<?, ?> props) { super(CONFIG, props); } + /** + * Get the configs specific to the Consumer. Properties using the prefix {@link StreamsConfig#CONSUMER_PREFIX} + * will be used in favor over their non-prefixed versions except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} + * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster + * @param streamThread the {@link StreamThread} creating a consumer + * @param groupId consumer groupId + * @param clientId clientId + * @return Map of the Consumer configuration. + * @throws ConfigException + */ public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException { - Map<String, Object> originals = this.originals(); + final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX); // disable auto commit and throw exception if there is user overridden values, // this is necessary for streams commit semantics - if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ", as the streams client will always turn off auto committing."); } // generate consumer configs from original properties and overridden maps - Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals, CONSUMER_DEFAULT_OVERRIDES); + Map<String, Object> props = clientProps(ConsumerConfig.configNames(), consumerProps, CONSUMER_DEFAULT_OVERRIDES); + // bootstrap.servers should be from StreamsConfig + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG)); // add client id with stream client id prefix, and group id props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); @@ -283,18 +322,30 @@ public class StreamsConfig extends AbstractConfig { return props; } + + /** + * Get the consumer config for the restore-consumer. Properties using the prefix {@link StreamsConfig#CONSUMER_PREFIX} + * will be used in favor over their non-prefixed versions except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} + * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster + * @param clientId clientId + * @return Map of the Consumer configuration + * @throws ConfigException + */ public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException { - Map<String, Object> originals = this.originals(); + Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX); // disable auto commit and throw exception if there is user overridden values, // this is necessary for streams commit semantics - if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ", as the streams client will always turn off auto committing."); } // generate consumer configs from original properties and overridden maps - Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals, CONSUMER_DEFAULT_OVERRIDES); + Map<String, Object> props = clientProps(ConsumerConfig.configNames(), consumerProps, CONSUMER_DEFAULT_OVERRIDES); + + // bootstrap.servers should be from StreamsConfig + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG)); // no need to set group id for a restore consumer props.remove(ConsumerConfig.GROUP_ID_CONFIG); @@ -305,16 +356,32 @@ public class StreamsConfig extends AbstractConfig { return props; } + + /** + * Get the configs for the Producer. Properties using the prefix {@link StreamsConfig#PRODUCER_PREFIX} + * will be used in favor over their non-prefixed versions except in the case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG} + * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster + * @param clientId clientId + * @return Map of the Consumer configuration + * @throws ConfigException + */ public Map<String, Object> getProducerConfigs(String clientId) { // generate producer configs from original properties and overridden maps - Map<String, Object> props = clientProps(ProducerConfig.configNames(), this.originals(), PRODUCER_DEFAULT_OVERRIDES); - + Map<String, Object> props = clientProps(ProducerConfig.configNames(), getClientPropsWithPrefix(PRODUCER_PREFIX), PRODUCER_DEFAULT_OVERRIDES); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG)); // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); return props; } + private Map<String, Object> getClientPropsWithPrefix(final String prefix) { + // To be backward compatible we first get all the originals. + final Map<String, Object> props = this.originals(); + props.putAll(this.originalsWithPrefix(prefix)); + return props; + } + public Serde keySerde() { Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), true); http://git-wip-us.apache.org/repos/asf/kafka/blob/bb629f22/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 3d4a9cc..30306f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; +import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -108,4 +110,66 @@ public class StreamsConfigTest { assertEquals(expectedBootstrapServers, actualBootstrapServers); } + @Test + public void shouldSupportPrefixedConsumerConfigs() throws Exception { + props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); + assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); + } + + @Test + public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception { + props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); + assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); + } + + + @Test + public void shouldSupportPrefixedProducerConfigs() throws Exception { + props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10); + props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map<String, Object> configs = streamsConfig.getProducerConfigs("client"); + assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)); + assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); + } + + @Test + public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception { + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); + assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); + } + + @Test + public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception { + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("groupId"); + assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); + } + + @Test + public void shouldSupportNonPrefixedProducerConfigs() throws Exception { + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10); + props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map<String, Object> configs = streamsConfig.getProducerConfigs("client"); + assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)); + assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); + } + + }
