MINOR: update JavaDocs for Kafka Streams DSL helpers - also deprecate ZK config for Streams
Author: Matthias J. Sax <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #2459 from mjsax/javaDocImprovements8 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c42654b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c42654b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c42654b Branch: refs/heads/trunk Commit: 4c42654b1eecebae272dfe5ce018b85ad4db7709 Parents: aef6927 Author: Matthias J. Sax <[email protected]> Authored: Fri Jan 27 16:48:44 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Fri Jan 27 16:48:44 2017 -0800 ---------------------------------------------------------------------- .../kafka/streams/KafkaClientSupplier.java | 42 +- .../org/apache/kafka/streams/KafkaStreams.java | 393 ++++++----- .../java/org/apache/kafka/streams/KeyValue.java | 30 +- .../org/apache/kafka/streams/StreamsConfig.java | 648 ++++++++++--------- .../apache/kafka/streams/StreamsMetrics.java | 116 ++-- .../kafka/streams/kstream/KGroupedStream.java | 6 +- .../kafka/streams/kstream/KStreamBuilder.java | 483 +++++++++----- .../processor/WallclockTimestampExtractor.java | 1 + .../processor/internals/StreamsKafkaClient.java | 12 +- .../GlobalKTableIntegrationTest.java | 1 - .../integration/JoinIntegrationTest.java | 1 - .../KTableKTableJoinIntegrationTest.java | 1 - 12 files changed, 1043 insertions(+), 691 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java index e0312f9..ed3d488 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java @@ -17,31 +17,43 @@ package org.apache.kafka.streams; -import java.util.Map; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StreamThread; + +import java.util.Map; +/** + * {@code KafkaClientSupplier} can be used to provide custom Kafka clients to a {@link KafkaStreams} instance. + * + * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.processor.TopologyBuilder, StreamsConfig, KafkaClientSupplier) + */ public interface KafkaClientSupplier { /** - * Creates an instance of Producer which is used to produce records. - * @param config producer config which supplied by {@link StreamsConfig} given to {@link KafkaStreams} - * @return an instance of kafka Producer + * Create a {@link Producer} which is used to write records to sink topics. + * + * @param config {@link StreamsConfig#getProducerConfigs(String) producer config} which is supplied by the + * {@link StreamsConfig} given to the {@link KafkaStreams} instance + * @return an instance of Kafka producer */ - Producer<byte[], byte[]> getProducer(Map<String, Object> config); + Producer<byte[], byte[]> getProducer(final Map<String, Object> config); /** - * Creates an instance of Consumer which is used to consume records of source topics. - * @param config consumer config which supplied by {@link StreamsConfig} given to {@link KafkaStreams} - * @return an instance of kafka Consumer + * Create a {@link Consumer} which is used to read records of source topics. + * + * @param config {@link StreamsConfig#getConsumerConfigs(StreamThread, String, String) consumer config} which is + * supplied by the {@link StreamsConfig} given to the {@link KafkaStreams} instance + * @return an instance of Kafka consumer */ - Consumer<byte[], byte[]> getConsumer(Map<String, Object> config); + Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config); /** - * Creates an instance of Consumer which is used to consume records of internal topics. - * @param config restore consumer config which supplied by {@link StreamsConfig} given to - * {@link KafkaStreams} - * @return an instance of kafka Consumer + * Create a {@link Consumer} which is used to read records to restore {@link StateStore}s. + * + * @param config {@link StreamsConfig#getRestoreConsumerConfigs(String) restore consumer config} which is supplied + * by the {@link StreamsConfig} given to the {@link KafkaStreams} + * @return an instance of Kafka consumer */ - Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config); + Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 27c8d22..ef0e72d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -17,6 +17,9 @@ package org.apache.kafka.streams; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.annotation.InterfaceStability; @@ -28,6 +31,12 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -67,43 +76,44 @@ import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; /** - * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and - * sends output to zero or more output topics. + * A Kafka client that allows for performing continuous computation on input coming from one or more input topics and + * sends output to zero, one, or more output topics. * <p> - * The computational logic can be specified either by using the {@link TopologyBuilder} class to define the a DAG topology of - * {@link org.apache.kafka.streams.processor.Processor}s or by using the {@link org.apache.kafka.streams.kstream.KStreamBuilder} - * class which provides the high-level {@link org.apache.kafka.streams.kstream.KStream} DSL to define the transformation. + * The computational logic can be specified either by using the {@link TopologyBuilder} to define a DAG topology of + * {@link Processor}s or by using the {@link KStreamBuilder} which provides the high-level DSL to define transformations. * <p> - * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or - * more threads specified in the configs for the processing work. + * One {@code KafkaStreams} instance can contain one or more threads specified in the configs for the processing work. * <p> - * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes - * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work - * based on the assignment of the input topic partitions so that all partitions are being - * consumed. If instances are added or failed, all instances will rebalance the partition assignment among themselves - * to balance processing load. - * <p> - * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} - * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output. + * A {@code KafkaStreams} instance can co-ordinate with any other instances with the same + * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} (whether in the same process, on other processes on this + * machine, or on remote machines) as a single (possibly distributed) stream processing application. + * These instances will divide up the work based on the assignment of the input topic partitions so that all partitions + * are being consumed. + * If instances are added or fail, all (remaining) instances will rebalance the partition assignment among themselves + * to balance processing load and ensure that all input topic partitions are processed. * <p> + * Internally a {@link KafkaStreams} instance contains a normal {@link KafkaProducer} and {@link KafkaConsumer} instance + * that is used for reading input and writing output. * <p> * A simple example might look like this: - * <pre> - * Map<String, Object> props = new HashMap<>(); - * props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); - * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - * props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - * props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - * StreamsConfig config = new StreamsConfig(props); + * <pre>{@code + * Map<String, Object> props = new HashMap<>(); + * props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); + * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + * props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + * props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + * StreamsConfig config = new StreamsConfig(props); + * + * KStreamBuilder builder = new KStreamBuilder(); + * builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic"); * - * KStreamBuilder builder = new KStreamBuilder(); - * builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic"); + * KafkaStreams streams = new KafkaStreams(builder, config); + * streams.start(); + * }</pre> * - * KafkaStreams streams = new KafkaStreams(builder, config); - * streams.start(); - * </pre> + * @see KStreamBuilder + * @see TopologyBuilder */ - @InterfaceStability.Unstable public class KafkaStreams { @@ -163,15 +173,15 @@ public class KafkaStreams { private final Set<Integer> validTransitions = new HashSet<>(); - State(final Integer...validTransitions) { + State(final Integer... validTransitions) { this.validTransitions.addAll(Arrays.asList(validTransitions)); } public boolean isRunning() { - return this.equals(RUNNING) || this.equals(REBALANCING); + return equals(RUNNING) || equals(REBALANCING); } public boolean isCreatedOrRunning() { - return isRunning() || this.equals(CREATED); + return isRunning() || equals(CREATED); } public boolean isValidTransition(final State newState) { return validTransitions.contains(newState.ordinal()); @@ -179,33 +189,34 @@ public class KafkaStreams { } private volatile State state = KafkaStreams.State.CREATED; private StateListener stateListener = null; - private final StreamStateListener streamStateListener = new StreamStateListener(); + /** - * Listen to state change events + * Listen to {@link State} change events. */ public interface StateListener { /** - * Called when state changes - * @param newState current state - * @param oldState previous state + * Called when state changes. + * + * @param newState new state + * @param oldState previous state */ void onChange(final State newState, final State oldState); } /** - * An app can set {@link StateListener} so that the app is notified when state changes - * @param listener + * An app can set a single {@link StateListener} so that the app is notified when state changes. + * @param listener a new state listener */ public void setStateListener(final StateListener listener) { - this.stateListener = listener; + stateListener = listener; } - private synchronized void setState(State newState) { - State oldState = state; + private synchronized void setState(final State newState) { + final State oldState = state; if (!state.isValidTransition(newState)) { - log.warn("Unexpected state transition from " + state + " to " + newState); + log.warn("Unexpected state transition from {} to {}.", oldState, newState); } state = newState; if (stateListener != null) { @@ -213,31 +224,35 @@ public class KafkaStreams { } } - /** - * @return The state this instance is in + * Return the current {@link State} of this {@code KafkaStreams} instance. + * + * @return the currnt state of this Kafka Streams instance */ public synchronized State state() { return state; } /** - * Get read-only handle on global metrics registry + * Get read-only handle on global metrics registry. + * * @return Map of all metrics. */ public Map<MetricName, ? extends Metric> metrics() { - return Collections.unmodifiableMap(this.metrics.metrics()); + return Collections.unmodifiableMap(metrics.metrics()); } private class StreamStateListener implements StreamThread.StateListener { @Override - public synchronized void onChange(final StreamThread thread, final StreamThread.State newState, final StreamThread.State oldState) { + public synchronized void onChange(final StreamThread thread, + final StreamThread.State newState, + final StreamThread.State oldState) { threadState.put(thread.getId(), newState); if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.ASSIGNING_PARTITIONS) { setState(State.REBALANCING); } else if (newState == StreamThread.State.RUNNING) { - for (StreamThread.State state : threadState.values()) { + for (final StreamThread.State state : threadState.values()) { if (state != StreamThread.State.RUNNING) { return; } @@ -246,35 +261,38 @@ public class KafkaStreams { } } } + /** - * Construct the stream instance. + * Create a {@code KafkaStreams} instance. * * @param builder the processor topology builder specifying the computational logic - * @param props properties for the {@link StreamsConfig} + * @param props properties for {@link StreamsConfig} */ public KafkaStreams(final TopologyBuilder builder, final Properties props) { this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier()); } /** - * Construct the stream instance. + * Create a {@code KafkaStreams} instance. * * @param builder the processor topology builder specifying the computational logic - * @param config the stream configs + * @param config the Kafka Streams configuration */ public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config) { this(builder, config, new DefaultKafkaClientSupplier()); } /** - * Construct the stream instance. + * Create a {@code KafkaStreams} instance. * * @param builder the processor topology builder specifying the computational logic - * @param config the stream configs - * @param clientSupplier the kafka clients supplier which provides underlying producer and consumer clients - * for this {@link KafkaStreams} instance + * @param config the Kafka Streams configuration + * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients + * for the new {@code KafkaStreams} instance */ - public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier) { + public KafkaStreams(final TopologyBuilder builder, + final StreamsConfig config, + final KafkaClientSupplier clientSupplier) { // create the metrics final Time time = Time.SYSTEM; @@ -338,7 +356,7 @@ public class KafkaStreams { time, streamsMetadataState, cacheSizeBytes); - threads[i].setStateListener(streamStateListener); + threads[i].setStateListener(new StreamStateListener()); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } @@ -360,7 +378,15 @@ public class KafkaStreams { return new HostInfo(host, port); } - private void checkBrokerVersionCompatibility() { + /** + * Check if the used brokers have version 0.10.1.x or higher. + * <p> + * Note, for <em>pre</em> 0.10.x brokers the broker version cannot be checked and the client will hang and retry + * until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}. + * + * @throws StreamsException if brokers have version 0.10.0.x + */ + private void checkBrokerVersionCompatibility() throws StreamsException { final StreamsKafkaClient client = new StreamsKafkaClient(config); client.checkBrokerCompatibility(); @@ -374,12 +400,17 @@ public class KafkaStreams { } /** - * Start the stream instance by starting all its threads. - * + * Start the {@code KafkaStreams} instance by starting all its threads. + * <p> + * Note, for brokers with version {@code 0.9.x} or lower, the broker version cannot be checked. + * There will be no error and the client will hang and retry to verify the broker version until it + * {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}. + * @throws IllegalStateException if process was already started + * @throws StreamsException if the Kafka brokers have version 0.10.0.x */ - public synchronized void start() { - log.debug("Starting Kafka Stream process"); + public synchronized void start() throws IllegalStateException, StreamsException { + log.debug("Starting Kafka Stream process."); if (state == KafkaStreams.State.CREATED) { checkBrokerVersionCompatibility(); @@ -400,9 +431,7 @@ public class KafkaStreams { } /** - * Shutdown this stream instance by signaling all the threads to stop, - * and then wait for them to join. - * + * Shutdown this {@code KafkaStreams} instance by signaling all the threads to stop, and then wait for them to join. * This will block until all threads have stopped. */ public void close() { @@ -410,17 +439,17 @@ public class KafkaStreams { } /** - * Shutdown this stream instance by signaling all the threads to stop, - * and then wait up to the timeout for the threads to join. - * - * A timeout of 0 means to wait forever + * Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the + * threads to join. + * A {@code timeout} of 0 means to wait forever. * - * @param timeout how long to wait for {@link StreamThread}s to shutdown - * @param timeUnit unit of time used for timeout - * @return true if all threads were successfully stopped + * @param timeout how long to wait for the threads to shutdown + * @param timeUnit unit of time used for timeout + * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached + * before all threads stopped */ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - log.debug("Stopping Kafka Stream process"); + log.debug("Stopping Kafka Stream process."); if (state.isCreatedOrRunning()) { setState(KafkaStreams.State.PENDING_SHUTDOWN); // save the current thread so that if it is a stream thread @@ -428,42 +457,42 @@ public class KafkaStreams { final Thread shutdown = new Thread(new Runnable() { @Override public void run() { - // signal the threads to stop and wait - for (final StreamThread thread : threads) { - // avoid deadlocks by stopping any further state reports - // from the thread since we're shutting down - thread.setStateListener(null); - thread.close(); - } - if (globalStreamThread != null) { - globalStreamThread.close(); - if (!globalStreamThread.stillRunning()) { - try { - globalStreamThread.join(); - } catch (InterruptedException e) { - Thread.interrupted(); - } - } - } - for (final StreamThread thread : threads) { + // signal the threads to stop and wait + for (final StreamThread thread : threads) { + // avoid deadlocks by stopping any further state reports + // from the thread since we're shutting down + thread.setStateListener(null); + thread.close(); + } + if (globalStreamThread != null) { + globalStreamThread.close(); + if (!globalStreamThread.stillRunning()) { try { - if (!thread.stillRunning()) { - thread.join(); - } - } catch (final InterruptedException ex) { + globalStreamThread.join(); + } catch (final InterruptedException e) { Thread.interrupted(); } } - - metrics.close(); - log.info("Stopped Kafka Streams process"); } + for (final StreamThread thread : threads) { + try { + if (!thread.stillRunning()) { + thread.join(); + } + } catch (final InterruptedException ex) { + Thread.interrupted(); + } + } + + metrics.close(); + log.info("Stopped Kafka Streams process."); + } }, "kafka-streams-close-thread"); shutdown.setDaemon(true); shutdown.start(); try { shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, timeUnit)); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { Thread.interrupted(); } setState(KafkaStreams.State.NOT_RUNNING); @@ -473,9 +502,10 @@ public class KafkaStreams { } /** - * Produces a string representation containing useful information about Kafka Streams - * Such as thread IDs, task IDs and a representation of the topology. This is useful - * in debugging scenarios. + * Produce a string representation containing useful information about this {@code KafkaStream} instance such as + * thread IDs, task IDs, and a representation of the topology DAG including {@link StateStore}s (cf. + * {@link TopologyBuilder} and {@link KStreamBuilder}). + * * @return A string representation of the Kafka Streams instance. */ @Override @@ -484,13 +514,19 @@ public class KafkaStreams { } /** - * Produces a string representation containing useful information about Kafka Streams - * such as thread IDs, task IDs and a representation of the topology starting with the given indent. This is useful - * in debugging scenarios. + * Produce a string representation containing useful information about this {@code KafkaStream} instance such as + * thread IDs, task IDs, and a representation of the topology DAG including {@link StateStore}s (cf. + * {@link TopologyBuilder} and {@link KStreamBuilder}). + * + * @param indent the top-level indent for each line * @return A string representation of the Kafka Streams instance. */ public String toString(final String indent) { - final StringBuilder sb = new StringBuilder(indent + "KafkaStreams processID:" + processId + "\n"); + final StringBuilder sb = new StringBuilder() + .append(indent) + .append("KafkaStreams processID: ") + .append(processId) + .append("\n"); for (final StreamThread thread : threads) { sb.append(thread.toString(indent + "\t")); } @@ -500,11 +536,16 @@ public class KafkaStreams { } /** - * Cleans up local state store directory ({@code state.dir}), by deleting all data with regard to the application-id. + * Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all + * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}. + * <p> + * May only be called either before this {@code KafkaStreams} instance is {@link KafkaStreams#start() started} or + * after the instance is {@link KafkaStreams#close() closed}. * <p> - * May only be called either before instance is started or after instance is closed. + * Calling this method triggers a restore of local {@link StateStore}s on the next {@link KafkaStreams#start() + * application start}. * - * @throws IllegalStateException if instance is currently running + * @throws IllegalStateException if the instance is currently running */ public void cleanUp() { if (state.isRunning()) { @@ -515,7 +556,7 @@ public class KafkaStreams { final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG); final String localApplicationDir = stateDir + File.separator + appId; - log.debug("Removing local Kafka Streams application data in {} for application {}", + log.debug("Removing local Kafka Streams application data in {} for application {}.", localApplicationDir, appId); @@ -524,9 +565,10 @@ public class KafkaStreams { } /** - * Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception. + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. * - * @param eh the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler. + * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler */ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { for (final StreamThread thread : threads) { @@ -538,25 +580,34 @@ public class KafkaStreams { } } - /** - * Find all of the instances of {@link StreamsMetadata} in the {@link KafkaStreams} application that this instance belongs to - * + * Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same + * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to + * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance. + * <p> * Note: this is a point in time view and it may change due to partition reassignment. - * @return collection containing all instances of {@link StreamsMetadata} in the {@link KafkaStreams} application that this instance belongs to + * + * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application */ public Collection<StreamsMetadata> allMetadata() { validateIsRunning(); return streamsMetadataState.getAllMetadata(); } - /** - * Find instances of {@link StreamsMetadata} that contains the given storeName - * + * Find all currently running {@code KafkaStreams} instances (potentially remotely) that + * <ul> + * <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all + * instances that belong to the same Kafka Streams application)</li> + * <li>and that contain a {@link StateStore} with the given {@code storeName}</li> + * </ul> + * and return {@link StreamsMetadata} for each discovered instance. + * <p> * Note: this is a point in time view and it may change due to partition reassignment. - * @param storeName the storeName to find metadata for - * @return A collection containing instances of {@link StreamsMetadata} that have the provided storeName + * + * @param storeName the {@code storeName} to find metadata for + * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of + * this application */ public Collection<StreamsMetadata> allMetadataForStore(final String storeName) { validateIsRunning(); @@ -564,22 +615,35 @@ public class KafkaStreams { } /** - * Find the {@link StreamsMetadata} instance that contains the given storeName - * and the corresponding hosted store instance contains the given key. This will use - * the {@link org.apache.kafka.streams.processor.internals.DefaultStreamPartitioner} to - * locate the partition. If a custom partitioner has been used please use - * {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)} - * - * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, - * this method provides a way of finding which host it would exist on. + * Find the currently running {@code KafkaStreams} instance (potentially remotely) that + * <ul> + * <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all + * instances that belong to the same Kafka Streams application)</li> + * <li>and that contain a {@link StateStore} with the given {@code storeName}</li> + * <li>and the {@link StateStore} contains the given {@code key}</li> + * </ul> + * and return {@link StreamsMetadata} for it. + * <p> + * This will use the default Kafka Streams partitioner to locate the partition. + * If a {@link StreamPartitioner custom partitioner} has been + * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig}, + * {@link KStream#through(StreamPartitioner, String)}, or {@link KTable#through(StreamPartitioner, String, String)}, + * or if the original {@link KTable}'s input {@link KStreamBuilder#table(String, String) topic} is partitioned + * differently, please use {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)}. + * <p> + * Note: + * <ul> + * <li>this is a point in time view and it may change due to partition reassignment</li> + * <li>the key may not exist in the {@link StateStore}; this method provides a way of finding which host it + * <em>would</em> exist on</li> + * </ul> * - * Note: this is a point in time view and it may change due to partition reassignment. - * @param storeName Name of the store - * @param key Key to use to for partition - * @param keySerializer Serializer for the key - * @param <K> key type - * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} - * if streams is (re-)initializing + * @param storeName the {@code storeName} to find metadata for + * @param key the key to find metadata for + * @param keySerializer serializer for the key + * @param <K> key type + * @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provide {@code storeName} and + * {@code key} of this application or {@link StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing */ public <K> StreamsMetadata metadataForKey(final String storeName, final K key, @@ -589,19 +653,28 @@ public class KafkaStreams { } /** - * Find the {@link StreamsMetadata} instance that contains the given storeName - * and the corresponding hosted store instance contains the given key - * - * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, - * this method provides a way of finding which host it would exist on. + * Find the currently running {@code KafkaStreams} instance (potentially remotely) that + * <ul> + * <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all + * instances that belong to the same Kafka Streams application)</li> + * <li>and that contain a {@link StateStore} with the given {@code storeName}</li> + * <li>and the {@link StateStore} contains the given {@code key}</li> + * </ul> + * and return {@link StreamsMetadata} for it. + * <p> + * Note: + * <ul> + * <li>this is a point in time view and it may change due to partition reassignment</li> + * <li>the key may not exist in the {@link StateStore}; this method provides a way of finding which host it + * <em>would</em> exist on</li> + * </ul> * - * Note: this is a point in time view and it may change due to partition reassignment. - * @param storeName Name of the store - * @param key Key to use to for partition - * @param partitioner Partitioner for the store - * @param <K> key type - * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} - * if streams is (re-)initializing + * @param storeName the {@code storeName} to find metadata for + * @param key the key to find metadata for + * @param partitioner the partitioner to be use to locate the host for the key + * @param <K> key type + * @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provide {@code storeName} and + * {@code key} of this application or {@link StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing */ public <K> StreamsMetadata metadataForKey(final String storeName, final K key, @@ -610,17 +683,17 @@ public class KafkaStreams { return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner); } - /** - * Get a facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances - * with the provided storeName and accepted by {@link QueryableStoreType#accepts(StateStore)}. - * The returned object can be used to query the {@link org.apache.kafka.streams.processor.StateStore} instances - * @param storeName name of the store to find - * @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} - * @param <T> return type - * @return A facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances - * @throws org.apache.kafka.streams.errors.InvalidStateStoreException if the streams are (re-)initializing or - * a store with storeName and queryableStoreType doesnt' exist. + * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's + * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}. + * The returned object can be used to query the {@link StateStore} instances. + * + * @param storeName name of the store to find + * @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} + * @param <T> return type + * @return A facade wrapping the local {@link StateStore} instances + * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and + * {@code queryableStoreType} doesnt' exist */ public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) { validateIsRunning(); @@ -629,7 +702,7 @@ public class KafkaStreams { private void validateIsRunning() { if (!state.isRunning()) { - throw new IllegalStateException("KafkaStreams is not running. State is " + state); + throw new IllegalStateException("KafkaStreams is not running. State is " + state + "."); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java index 64b38cd..0c1d2af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java +++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java @@ -21,8 +21,7 @@ import java.util.Objects; /** * A key-value pair defined for a single Kafka Streams record. - * If the record comes directly from a Kafka topic then its - * key / value are defined as the message key / value. + * If the record comes directly from a Kafka topic then its key/value are defined as the message key/value. * * @param <K> Key type * @param <V> Value type @@ -37,10 +36,10 @@ public class KeyValue<K, V> { /** * Create a new key-value pair. * - * @param key the key - * @param value the value + * @param key the key + * @param value the value */ - public KeyValue(K key, V value) { + public KeyValue(final K key, final V value) { this.key = key; this.value = value; } @@ -48,22 +47,23 @@ public class KeyValue<K, V> { /** * Create a new key-value pair. * - * @param key the key - * @param value the value - * @param <K> the type of the key - * @param <V> the type of the value - * @return a new key value pair + * @param key the key + * @param value the value + * @param <K> the type of the key + * @param <V> the type of the value + * @return a new key-value pair */ - public static <K, V> KeyValue<K, V> pair(K key, V value) { + public static <K, V> KeyValue<K, V> pair(final K key, final V value) { return new KeyValue<>(key, value); } + @Override public String toString() { return "KeyValue(" + key + ", " + value + ")"; } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (this == obj) return true; @@ -71,9 +71,9 @@ public class KeyValue<K, V> { return false; } - KeyValue other = (KeyValue) obj; - return (this.key == null ? other.key == null : this.key.equals(other.key)) - && (this.value == null ? other.value == null : this.value.equals(other.value)); + final KeyValue other = (KeyValue) obj; + return (key == null ? other.key == null : key.equals(other.key)) + && (value == null ? other.value == null : value.equals(other.value)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d7d6566..57db027 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -29,8 +31,8 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -43,303 +45,345 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; /** - * Configuration for Kafka Streams. Documentation for these configurations can be found in the <a - * href="http://kafka.apache.org/documentation.html#streamsconfigs">Kafka documentation</a> + * Configuration for a {@link KafkaStreams} instance. + * Can also be use to configure the Kafka Streams internal {@link KafkaConsumer} and {@link KafkaProducer}. + * To avoid consumer/producer property conflicts, you should prefix those properties using + * {@link #consumerPrefix(String)} and {@link #producerPrefix(String)}, respectively. + * <p> + * Example: + * <pre>{@code + * // potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer + * Properties streamsProperties = new Properties(); + * streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000); + * // or + * streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000); + * + * // suggested: + * Properties streamsProperties = new Properties(); + * // sets "metadata.max.age.ms" to 1 minute for consumer only + * streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000); + * // sets "metadata.max.age.ms" to 1 minute for producer only + * streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000); + * + * StreamsConfig streamsConfig = new StreamsConfig(streamsProperties); + * }</pre> + * Kafka Streams required to set at least properties {@link #APPLICATION_ID_CONFIG "application.id"} and + * {@link #BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers"}. + * Furthermore, it is not allowed to enable {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} that + * is disabled by Kafka Streams by default. + * + * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.processor.TopologyBuilder, StreamsConfig) + * @see ConsumerConfig + * @see ProducerConfig */ public class StreamsConfig extends AbstractConfig { private static final ConfigDef CONFIG; - // Prefix used to isolate consumer configs from producer configs. + /** + * Prefix used to isolate {@link KafkaConsumer consumer} configs from {@link KafkaProducer producer} configs. + * It is recommended to use {@link #consumerPrefix(String)} to add this prefix to {@link ConsumerConfig consumer + * properties}. + */ public static final String CONSUMER_PREFIX = "consumer."; // Prefix used to isolate producer configs from consumer configs. + /** + * Prefix used to isolate {@link KafkaProducer producer} configs from {@link KafkaConsumer consumer} configs. + * It is recommended to use {@link #producerPrefix(String)} to add this prefix to {@link ProducerConfig producer + * properties}. + */ public static final String PRODUCER_PREFIX = "producer."; - /** <code>state.dir</code> */ + /** {@code state.dir} */ public static final String STATE_DIR_CONFIG = "state.dir"; private static final String STATE_DIR_DOC = "Directory location for state store."; - /** <code>zookeeper.connect<code/> */ + /** + * {@code zookeeper.connect} + * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored. + */ + @Deprecated public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management."; - /** <code>commit.interval.ms</code> */ + /** {@code commit.interval.ms} */ public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor."; - /** <code>poll.ms</code> */ + /** {@code poll.ms} */ public static final String POLL_MS_CONFIG = "poll.ms"; private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input."; - /** <code>num.stream.threads</code> */ + /** {@code num.stream.threads} */ public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; - /** <code>num.standby.replicas</code> */ + /** {@code num.standby.replicas} */ public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas"; private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; - /** <code>buffered.records.per.partition</code> */ + /** {@code buffered.records.per.partition} */ public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition."; - /** <code>state.cleanup.delay</code> */ + /** {@code state.cleanup.delay} */ public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated."; - /** <code>timestamp.extractor</code> */ + /** {@code timestamp.extractor} */ public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface."; - /** <code>partition.grouper</code> */ + /** {@code partition.grouper} */ public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface."; - /** <code>application.id</code> */ + /** {@code application.id} */ public static final String APPLICATION_ID_CONFIG = "application.id"; - public static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; + private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; - /** <code>replication.factor</code> */ + /** {@code replication.factor} */ public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; - public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application."; + private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application."; - /** <code>key.serde</code> */ + /** {@code key.serde} */ public static final String KEY_SERDE_CLASS_CONFIG = "key.serde"; - public static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface."; + private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface."; - /** <code>value.serde</code> */ + /** {@code value.serde} */ public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde"; - public static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface."; + private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface."; - /**<code>user.endpoint</code> */ + /**{@code user.endpoint} */ public static final String APPLICATION_SERVER_CONFIG = "application.server"; - public static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application"; + private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application"; - /** <code>metrics.sample.window.ms</code> */ + /** {@code metrics.sample.window.ms} */ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; - /** <code>metrics.num.samples</code> */ + /** {@code metrics.num.samples} */ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; - /** <code>metrics.record.level</code> */ + /** {@code metrics.record.level} */ public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; - /** <code>metric.reporters</code> */ + /** {@code metric.reporters} */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; - /** <code>bootstrap.servers</code> */ + /** {@code bootstrap.servers} */ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; - /** <code>client.id</code> */ + /** {@code client.id} */ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; - /** <code>rocksdb.config.setter</code> */ + /** {@code rocksdb.config.setter} */ public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter"; - public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface"; + private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface"; - /** <code>windowstore.changelog.additional.retention.ms</code> */ + /** {@code windowstore.changelog.additional.retention.ms} */ public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; - public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; + private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; - /** <code>cache.max.bytes.buffering</code> */ + /** {@code cache.max.bytes.buffering} */ public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; - public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; + private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; - public static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC; + private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC; public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; - public static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC; + private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC; public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; - public static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; + private static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; - public static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; + private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; - public static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC; + private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC; public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; - public static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC; + private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC; public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; - public static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC; + private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC; public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - public static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; static { - CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value - Type.STRING, - Importance.HIGH, - StreamsConfig.APPLICATION_ID_DOC) - .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value - Type.LIST, - Importance.HIGH, - CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) - .define(CLIENT_ID_CONFIG, - Type.STRING, - "", - Importance.HIGH, - CommonClientConfigs.CLIENT_ID_DOC) - .define(ZOOKEEPER_CONNECT_CONFIG, - Type.STRING, - "", - Importance.HIGH, - StreamsConfig.ZOOKEEPER_CONNECT_DOC) - .define(STATE_DIR_CONFIG, - Type.STRING, - "/tmp/kafka-streams", - Importance.MEDIUM, - STATE_DIR_DOC) - .define(REPLICATION_FACTOR_CONFIG, - Type.INT, - 1, - Importance.MEDIUM, - REPLICATION_FACTOR_DOC) - .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, - Type.CLASS, - FailOnInvalidTimestamp.class.getName(), - Importance.MEDIUM, - TIMESTAMP_EXTRACTOR_CLASS_DOC) - .define(PARTITION_GROUPER_CLASS_CONFIG, - Type.CLASS, - DefaultPartitionGrouper.class.getName(), - Importance.MEDIUM, - PARTITION_GROUPER_CLASS_DOC) - .define(KEY_SERDE_CLASS_CONFIG, - Type.CLASS, - Serdes.ByteArraySerde.class.getName(), - Importance.MEDIUM, - KEY_SERDE_CLASS_DOC) - .define(VALUE_SERDE_CLASS_CONFIG, - Type.CLASS, - Serdes.ByteArraySerde.class.getName(), - Importance.MEDIUM, - VALUE_SERDE_CLASS_DOC) - .define(COMMIT_INTERVAL_MS_CONFIG, - Type.LONG, - 30000, - Importance.LOW, - COMMIT_INTERVAL_MS_DOC) - .define(POLL_MS_CONFIG, - Type.LONG, - 100, - Importance.LOW, - POLL_MS_DOC) - .define(NUM_STREAM_THREADS_CONFIG, - Type.INT, - 1, - Importance.LOW, - NUM_STREAM_THREADS_DOC) - .define(NUM_STANDBY_REPLICAS_CONFIG, - Type.INT, - 0, - Importance.LOW, - NUM_STANDBY_REPLICAS_DOC) - .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, - Type.INT, - 1000, - Importance.LOW, - BUFFERED_RECORDS_PER_PARTITION_DOC) - .define(STATE_CLEANUP_DELAY_MS_CONFIG, - Type.LONG, - 60000, - Importance.LOW, - STATE_CLEANUP_DELAY_MS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, - Type.LIST, - "", - Importance.LOW, - CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) - .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, - Type.LONG, - 30000, - atLeast(0), - Importance.LOW, - CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) - .define(METRICS_NUM_SAMPLES_CONFIG, - Type.INT, - 2, - atLeast(1), - Importance.LOW, - CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) - .define(METRICS_RECORDING_LEVEL_CONFIG, - Type.STRING, - Sensor.RecordingLevel.INFO.toString(), - in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), - Importance.LOW, - CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) - .define(APPLICATION_SERVER_CONFIG, - Type.STRING, - "", - Importance.LOW, - APPLICATION_SERVER_DOC) - .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, - Type.CLASS, - null, - Importance.LOW, - ROCKSDB_CONFIG_SETTER_CLASS_DOC) - .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, - Type.LONG, - 24 * 60 * 60 * 1000, - Importance.MEDIUM, - WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC) - .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, - Type.LONG, - 10 * 1024 * 1024L, - atLeast(0), - Importance.LOW, - CACHE_MAX_BYTES_BUFFERING_DOC) - .define(SECURITY_PROTOCOL_CONFIG, - Type.STRING, - DEFAULT_SECURITY_PROTOCOL, - Importance.MEDIUM, - SECURITY_PROTOCOL_DOC) - .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, - ConfigDef.Type.LONG, - 9 * 60 * 1000, - ConfigDef.Importance.MEDIUM, - CONNECTIONS_MAX_IDLE_MS_DOC) - .define(RETRY_BACKOFF_MS_CONFIG, - ConfigDef.Type.LONG, - 100L, - atLeast(0L), - ConfigDef.Importance.LOW, - RETRY_BACKOFF_MS_DOC) - .define(METADATA_MAX_AGE_CONFIG, - ConfigDef.Type.LONG, - 5 * 60 * 1000, - atLeast(0), - ConfigDef.Importance.LOW, - METADATA_MAX_AGE_DOC) - .define(RECONNECT_BACKOFF_MS_CONFIG, - ConfigDef.Type.LONG, - 50L, - atLeast(0L), - ConfigDef.Importance.LOW, - RECONNECT_BACKOFF_MS_DOC) - .define(SEND_BUFFER_CONFIG, - ConfigDef.Type.INT, - 128 * 1024, - atLeast(0), - ConfigDef.Importance.MEDIUM, - SEND_BUFFER_DOC) - .define(RECEIVE_BUFFER_CONFIG, - ConfigDef.Type.INT, - 32 * 1024, - atLeast(0), - ConfigDef.Importance.MEDIUM, - RECEIVE_BUFFER_DOC) - .define(REQUEST_TIMEOUT_MS_CONFIG, - ConfigDef.Type.INT, - 40 * 1000, - atLeast(0), - ConfigDef.Importance.MEDIUM, - REQUEST_TIMEOUT_MS_DOC); + CONFIG = new ConfigDef() + .define(APPLICATION_ID_CONFIG, // required with no default value + Type.STRING, + Importance.HIGH, + APPLICATION_ID_DOC) + .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value + Type.LIST, + Importance.HIGH, + CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) + .define(CLIENT_ID_CONFIG, + Type.STRING, + "", + Importance.HIGH, + CommonClientConfigs.CLIENT_ID_DOC) + .define(ZOOKEEPER_CONNECT_CONFIG, + Type.STRING, + "", + Importance.HIGH, + ZOOKEEPER_CONNECT_DOC) + .define(STATE_DIR_CONFIG, + Type.STRING, + "/tmp/kafka-streams", + Importance.MEDIUM, + STATE_DIR_DOC) + .define(REPLICATION_FACTOR_CONFIG, + Type.INT, + 1, + Importance.MEDIUM, + REPLICATION_FACTOR_DOC) + .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + FailOnInvalidTimestamp.class.getName(), + Importance.MEDIUM, + TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(PARTITION_GROUPER_CLASS_CONFIG, + Type.CLASS, + DefaultPartitionGrouper.class.getName(), + Importance.MEDIUM, + PARTITION_GROUPER_CLASS_DOC) + .define(KEY_SERDE_CLASS_CONFIG, + Type.CLASS, + Serdes.ByteArraySerde.class.getName(), + Importance.MEDIUM, + KEY_SERDE_CLASS_DOC) + .define(VALUE_SERDE_CLASS_CONFIG, + Type.CLASS, + Serdes.ByteArraySerde.class.getName(), + Importance.MEDIUM, + VALUE_SERDE_CLASS_DOC) + .define(COMMIT_INTERVAL_MS_CONFIG, + Type.LONG, + 30000, + Importance.LOW, + COMMIT_INTERVAL_MS_DOC) + .define(POLL_MS_CONFIG, + Type.LONG, + 100, + Importance.LOW, + POLL_MS_DOC) + .define(NUM_STREAM_THREADS_CONFIG, + Type.INT, + 1, + Importance.LOW, + NUM_STREAM_THREADS_DOC) + .define(NUM_STANDBY_REPLICAS_CONFIG, + Type.INT, + 0, + Importance.LOW, + NUM_STANDBY_REPLICAS_DOC) + .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, + Type.INT, + 1000, + Importance.LOW, + BUFFERED_RECORDS_PER_PARTITION_DOC) + .define(STATE_CLEANUP_DELAY_MS_CONFIG, + Type.LONG, + 60000, + Importance.LOW, + STATE_CLEANUP_DELAY_MS_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, + Type.LIST, + "", + Importance.LOW, + CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) + .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, + Type.LONG, + 30000, + atLeast(0), + Importance.LOW, + CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) + .define(METRICS_NUM_SAMPLES_CONFIG, + Type.INT, + 2, + atLeast(1), + Importance.LOW, + CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) + .define(METRICS_RECORDING_LEVEL_CONFIG, + Type.STRING, + Sensor.RecordingLevel.INFO.toString(), + in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), + Importance.LOW, + CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) + .define(APPLICATION_SERVER_CONFIG, + Type.STRING, + "", + Importance.LOW, + APPLICATION_SERVER_DOC) + .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, + Type.CLASS, + null, + Importance.LOW, + ROCKSDB_CONFIG_SETTER_CLASS_DOC) + .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, + Type.LONG, + 24 * 60 * 60 * 1000, + Importance.MEDIUM, + WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC) + .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, + Type.LONG, + 10 * 1024 * 1024L, + atLeast(0), + Importance.LOW, + CACHE_MAX_BYTES_BUFFERING_DOC) + .define(SECURITY_PROTOCOL_CONFIG, + Type.STRING, + DEFAULT_SECURITY_PROTOCOL, + Importance.MEDIUM, + SECURITY_PROTOCOL_DOC) + .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, + ConfigDef.Type.LONG, + 9 * 60 * 1000, + ConfigDef.Importance.MEDIUM, + CONNECTIONS_MAX_IDLE_MS_DOC) + .define(RETRY_BACKOFF_MS_CONFIG, + ConfigDef.Type.LONG, + 100L, + atLeast(0L), + ConfigDef.Importance.LOW, + RETRY_BACKOFF_MS_DOC) + .define(METADATA_MAX_AGE_CONFIG, + ConfigDef.Type.LONG, + 5 * 60 * 1000, + atLeast(0), + ConfigDef.Importance.LOW, + METADATA_MAX_AGE_DOC) + .define(RECONNECT_BACKOFF_MS_CONFIG, + ConfigDef.Type.LONG, + 50L, + atLeast(0L), + ConfigDef.Importance.LOW, + RECONNECT_BACKOFF_MS_DOC) + .define(SEND_BUFFER_CONFIG, + ConfigDef.Type.INT, + 128 * 1024, + atLeast(0), + ConfigDef.Importance.MEDIUM, + SEND_BUFFER_DOC) + .define(RECEIVE_BUFFER_CONFIG, + ConfigDef.Type.INT, + 32 * 1024, + atLeast(0), + ConfigDef.Importance.MEDIUM, + RECEIVE_BUFFER_DOC) + .define(REQUEST_TIMEOUT_MS_CONFIG, + ConfigDef.Type.INT, + 40 * 1000, + atLeast(0), + ConfigDef.Importance.MEDIUM, + REQUEST_TIMEOUT_MS_DOC); } // this is the list of configs for underlying clients @@ -347,7 +391,7 @@ public class StreamsConfig extends AbstractConfig { private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES; static { - Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(); + final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(); tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); @@ -356,7 +400,7 @@ public class StreamsConfig extends AbstractConfig { private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES; static { - Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(); + final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(); tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -369,160 +413,177 @@ public class StreamsConfig extends AbstractConfig { } /** - * Prefix a property with {@link StreamsConfig#CONSUMER_PREFIX}. This is used to isolate consumer configs - * from producer configs - * @param consumerProp - * @return CONSUMER_PREFIX + consumerProp + * Prefix a property with {@link #CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig consumer configs} + * from {@link ProducerConfig producer configs}. + * + * @param consumerProp the consumer property to be masked + * @return {@link #CONSUMER_PREFIX} + {@code consumerProp} */ public static String consumerPrefix(final String consumerProp) { return CONSUMER_PREFIX + consumerProp; } /** - * Prefix a property with {@link StreamsConfig#PRODUCER_PREFIX}. This is used to isolate producer configs - * from consumer configs - * @param producerProp - * @return PRODUCER_PREFIX + consumerProp + * Prefix a property with {@link #PRODUCER_PREFIX}. This is used to isolate {@link ProducerConfig producer configs} + * from {@link ConsumerConfig consumer configs}. + * + * @param producerProp the producer property to be masked + * @return PRODUCER_PREFIX + {@code producerProp} */ public static String producerPrefix(final String producerProp) { return PRODUCER_PREFIX + producerProp; } /** - * Returns a copy of the config definition. + * Return a copy of the config definition. + * + * @return a copy of the config definition */ public static ConfigDef configDef() { return new ConfigDef(CONFIG); } - public StreamsConfig(Map<?, ?> props) { - super(CONFIG, props); - } - /** - * Get the configs specific to the Consumer. Properties using the prefix {@link StreamsConfig#CONSUMER_PREFIX} - * will be used in favor over their non-prefixed versions except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} - * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster - * @param streamThread the {@link StreamThread} creating a consumer - * @param groupId consumer groupId - * @param clientId clientId - * @return Map of the Consumer configuration. - * @throws ConfigException + * Create a new {@code StreamsConfig} using the given properties. + * + * @param props properties that specify Kafka Streams and internal consumer/producer configuration */ - public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException { - - final Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES); + public StreamsConfig(final Map<?, ?> props) { + super(CONFIG, props); + } + private Map<String, Object> getCommonConsumerConfigs() throws ConfigException { final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames()); // disable auto commit and throw exception if there is user overridden values, // this is necessary for streams commit semantics if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG - + ", as the streams client will always turn off auto committing."); + + ", as the streams client will always turn off auto committing."); } + final Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES); consumerProps.putAll(clientProvidedProps); // bootstrap.servers should be from StreamsConfig - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG)); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); + // remove deprecate ZK config + consumerProps.remove(ZOOKEEPER_CONNECT_CONFIG); + + return consumerProps; + } + + /** + * Get the configs to the {@link KafkaConsumer consumer}. + * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions + * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed + * version as we only support reading/writing from/to the same Kafka Cluster. + * + * @param streamThread the {@link StreamThread} creating a consumer + * @param groupId consumer groupId + * @param clientId clientId + * @return Map of the consumer configuration. + * @throws ConfigException if {@code "enable.auto.commit"} was set to {@code false} by the user + */ + public Map<String, Object> getConsumerConfigs(final StreamThread streamThread, + final String groupId, + final String clientId) throws ConfigException { + final Map<String, Object> consumerProps = getCommonConsumerConfigs(); + // add client id with stream client id prefix, and group id consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); // add configs required for stream partition assignor - consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread); - consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); - consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); + consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread); + consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); + consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName()); - consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); - if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) { - consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG)); - } + consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG)); + return consumerProps; } - /** - * Get the consumer config for the restore-consumer. Properties using the prefix {@link StreamsConfig#CONSUMER_PREFIX} - * will be used in favor over their non-prefixed versions except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} - * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster - * @param clientId clientId - * @return Map of the Consumer configuration - * @throws ConfigException + * Get the configs for the {@link KafkaConsumer restore-consumer}. + * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions + * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed + * version as we only support reading/writing from/to the same Kafka Cluster. + * + * @param clientId clientId + * @return Map of the consumer configuration. + * @throws ConfigException if {@code "enable.auto.commit"} was set to {@code false} by the user */ - public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException { - Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES); - - final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames()); - - // disable auto commit and throw exception if there is user overridden values, - // this is necessary for streams commit semantics - if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { - throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG - + ", as the streams client will always turn off auto committing."); - } - - consumerProps.putAll(clientProvidedProps); - - // bootstrap.servers should be from StreamsConfig - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG)); + public Map<String, Object> getRestoreConsumerConfigs(final String clientId) throws ConfigException { + final Map<String, Object> consumerProps = getCommonConsumerConfigs(); // no need to set group id for a restore consumer consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG); - // add client id with stream client id prefix consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer"); return consumerProps; } - /** - * Get the configs for the Producer. Properties using the prefix {@link StreamsConfig#PRODUCER_PREFIX} - * will be used in favor over their non-prefixed versions except in the case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG} - * where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster - * @param clientId clientId - * @return Map of the Consumer configuration - * @throws ConfigException + * Get the configs for the {@link KafkaProducer producer}. + * Properties using the prefix {@link #PRODUCER_PREFIX} will be used in favor over their non-prefixed versions + * except in the case of {@link ProducerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed + * version as we only support reading/writing from/to the same Kafka Cluster. + * + * @param clientId clientId + * @return Map of the producer configuration. */ - public Map<String, Object> getProducerConfigs(String clientId) { + public Map<String, Object> getProducerConfigs(final String clientId) { // generate producer configs from original properties and overridden maps final Map<String, Object> props = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES); props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames())); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG)); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); return props; } - private Map<String, Object> getClientPropsWithPrefix(final String prefix, final Set<String> configNames) { + private Map<String, Object> getClientPropsWithPrefix(final String prefix, + final Set<String> configNames) { final Map<String, Object> props = clientProps(configNames, originals()); - props.putAll(this.originalsWithPrefix(prefix)); + props.putAll(originalsWithPrefix(prefix)); return props; } + /** + * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde + * class}. + * + * @return an configured instance of key Serde class + */ public Serde keySerde() { try { - Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class); + final Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), true); return serde; - } catch (Exception e) { - throw new StreamsException(String.format("Failed to configure key serde %s", get(StreamsConfig.KEY_SERDE_CLASS_CONFIG)), e); + } catch (final Exception e) { + throw new StreamsException(String.format("Failed to configure key serde %s", get(KEY_SERDE_CLASS_CONFIG)), e); } } + /** + * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #VALUE_SERDE_CLASS_CONFIG value + * Serde class}. + * + * @return an configured instance of value Serde class + */ public Serde valueSerde() { try { - Serde<?> serde = getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serde.class); + final Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), false); return serde; - } catch (Exception e) { - throw new StreamsException(String.format("Failed to configure value serde %s", get(StreamsConfig.VALUE_SERDE_CLASS_CONFIG)), e); + } catch (final Exception e) { + throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e); } } @@ -530,14 +591,15 @@ public class StreamsConfig extends AbstractConfig { * Override any client properties in the original configs with overrides * * @param configNames The given set of configuration names. - * @param originals The original configs to be filtered. + * @param originals The original configs to be filtered. * @return client config with any overrides */ - private Map<String, Object> clientProps(Set<String> configNames, Map<String, Object> originals) { + private Map<String, Object> clientProps(final Set<String> configNames, + final Map<String, Object> originals) { // iterate all client config names, filter out non-client configs from the original // property map and use the overridden values when they are not specified by users - Map<String, Object> parsed = new HashMap<>(); - for (String configName: configNames) { + final Map<String, Object> parsed = new HashMap<>(); + for (final String configName: configNames) { if (originals.containsKey(configName)) { parsed.put(configName, originals.get(configName)); } @@ -546,7 +608,7 @@ public class StreamsConfig extends AbstractConfig { return parsed; } - public static void main(String[] args) { + public static void main(final String[] args) { System.out.println(CONFIG.toHtmlTable()); } }
