Repository: kafka Updated Branches: refs/heads/trunk aef6927a4 -> 4c42654b1
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java index 7b68137..a7bd889 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java @@ -31,84 +31,118 @@ import java.util.Map; public interface StreamsMetrics { /** - * Get read-only handle on global metrics registry + * Get read-only handle on global metrics registry. + * * @return Map of all metrics. */ Map<MetricName, ? extends Metric> metrics(); /** - * Add a latency and throughput sensor and default associated metrics. Metrics include both latency ones - * (average and max latency) and throughput ones (operations/time unit). + * Add a latency and throughput sensor for a specific operation, which will include the following sensors: + * <ol> + * <li>average latency</li> + * <li>max latency</li> + * <li>throughput (num.operations / time unit)</li> + * </ol> + * Also create a parent sensor with the same metrics that aggregates all entities with the same operation under the + * same scope if it has not been created. * - * @param scopeName Name of the scope, could be the type of the state store, etc. - * @param entityName Name of the entity, could be the name of the state store instance, etc. - * @param recordingLevel The recording level (e.g., INFO or DEBUG) for this sensor. - * @param operationName Name of the operation, could be get / put / delete / etc. - * @param tags Additional tags of the sensor. + * @param scopeName name of the scope, could be the type of the state store, etc. + * @param entityName name of the entity, could be the name of the state store instance, etc. + * @param operationName name of the operation, could be get / put / delete / etc. + * @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor. + * @param tags additional tags of the sensor * @return The added sensor. */ - Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags); + Sensor addLatencyAndThroughputSensor(final String scopeName, + final String entityName, + final String operationName, + final Sensor.RecordingLevel recordingLevel, + final String... tags); /** - * Record the given latency value of the sensor. If the passed sensor includes throughput metrics, - * e.g., when created by the {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} - * method, then the throughput metrics will also be recorded from this event. + * Record the given latency value of the sensor. + * If the passed sensor includes throughput metrics, e.g., when created by the + * {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} method, then the + * throughput metrics will also be recorded from this event. * - * @param sensor sensor whose latency we are recording. + * @param sensor sensor whose latency we are recording. * @param startNs start of measurement time in nanoseconds. - * @param endNs end of measurement time in nanoseconds. + * @param endNs end of measurement time in nanoseconds. */ - void recordLatency(Sensor sensor, long startNs, long endNs); + void recordLatency(final Sensor sensor, + final long startNs, + final long endNs); /** - * Add a throughput sensor and default associated metrics. Metrics include throughput ones - * (operations/time unit). This sensor is a strict subset of the sensor created by + * Add a throughput sensor for a specific operation: + * <ol> + * <li>throughput (num.operations / time unit)</li> + * </ol> + * Also create a parent sensor with the same metrics that aggregates all entities with the same operation under the + * same scope if it has not been created. + * This sensor is a strict subset of the sensors created by * {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)}. * - * @param scopeName Name of the scope, could be the type of the state store, etc. - * @param entityName Name of the entity, could be the name of the state store instance, etc. - * @param recordingLevel The recording level (e.g., INFO or DEBUG) for this sensor. - * @param operationName Name of the operation, could be get / put / delete / etc. - * @param tags Additional tags of the sensor. + * @param scopeName name of the scope, could be the type of the state store, etc. + * @param entityName name of the entity, could be the name of the state store instance, etc. + * @param operationName name of the operation, could be get / put / delete / etc. + * @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor. + * @param tags additional tags of the sensor * @return The added sensor. */ - Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags); + Sensor addThroughputSensor(final String scopeName, + final String entityName, + final String operationName, + final Sensor.RecordingLevel recordingLevel, + final String... tags); /** - * Records the throughput value of a sensor. - * @param sensor addSensor whose throughput we are recording. - * @param value throughput value. + * Record the throughput value of a sensor. + * + * @param sensor add Sensor whose throughput we are recording + * @param value throughput value */ - void recordThroughput(Sensor sensor, long value); + void recordThroughput(final Sensor sensor, + final long value); /** * Generic method to create a sensor. - * Note that for most cases it is advisable to use {@link #addThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} + * Note that for most cases it is advisable to use + * {@link #addThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} * or {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} to ensure - * metric name well-formedness and conformity with the rest of the streams code base. However, - * if the above two methods are not sufficient, this method can also be used. - * @param name Name of the sensor. - * @param recordingLevel The recording level (e.g., INFO or DEBUG) for this sensor. + * metric name well-formedness and conformity with the rest of the streams code base. + * However, if the above two methods are not sufficient, this method can also be used. + * + * @param name name of the sensor. + * @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor + * @return The added sensor. */ - Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel); + Sensor addSensor(final String name, + final Sensor.RecordingLevel recordingLevel); /** * Generic method to create a sensor with parent sensors. - * Note that for most cases it is advisable to use {@link #addThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} + * Note that for most cases it is advisable to use + * {@link #addThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} * or {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} to ensure - * metric name well-formedness and conformity with the rest of the streams code base. However, - * if the above two methods are not sufficient, this method can also be used. - * @param name Name of the sensor. - * @param recordingLevel The recording level (e.g., INFO or DEBUG) for this sensor. + * metric name well-formedness and conformity with the rest of the streams code base. + * However, if the above two methods are not sufficient, this method can also be used. + * + * @param name name of the sensor + * @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor + * @return The added sensor. */ - Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel, Sensor... parents); + Sensor addSensor(final String name, + final Sensor.RecordingLevel recordingLevel, + final Sensor... parents); /** * Remove a sensor. - * @param sensor Sensor to be removed. + * @param sensor sensor to be removed */ - void removeSensor(Sensor sensor); + void removeSensor(final Sensor sensor); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 6cfd202..f1f1338 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -353,6 +353,7 @@ public interface KGroupedStream<K, V> { * Use {@link StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // compute sum + * String storeName = storeSupplier.name(); * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-key"; * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) @@ -554,9 +555,11 @@ public interface KGroupedStream<K, V> { * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. * <p> * To query the local {@link SessionStore} it must be obtained via - * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + * Use {@link StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // compute sum + * Sting storeName = storeSupplier.name(); * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore()); * String key = "some-key"; * KeyValueIterator<Windowed<String>, Long> sumForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances) @@ -666,6 +669,7 @@ public interface KGroupedStream<K, V> { * Use {@link StateStoreSupplier#name()} to get the store name: * <pre>{@code * KafkaStreams streams = ... // some aggregation on value type double + * Sting storeName = storeSupplier.name(); * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); * String key = "some-key"; * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index aecd8ab..8f67d45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -19,15 +19,20 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.kstream.internals.KTableSource; import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import java.util.Collections; @@ -35,113 +40,139 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; /** - * {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL - * for users to specify computational logic and translates the given logic to a {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}. + * {@link KStreamBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. + * + * @see TopologyBuilder + * @see KStream + * @see KTable + * @see GlobalKTable */ public class KStreamBuilder extends TopologyBuilder { private final AtomicInteger index = new AtomicInteger(0); /** - * Create a new {@link KStreamBuilder} instance. - */ - public KStreamBuilder() { - super(); - } - - /** - * Create a {@link KStream} instance from the specified topics. - * The default deserializers specified in the config are used. + * Create a {@link KStream} from the specified topics. + * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the + * {@link StreamsConfig config} are used. + * <p> + * If multiple topics are specified there are no ordering guaranteed for records from different topics. * <p> - * If multiple topics are specified there are nor ordering guaranteed for records from different topics. + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. * - * @param topics the topic names; must contain at least one topic name + * @param topics the topic names; must contain at least one topic name * @return a {@link KStream} for the specified topics */ - public <K, V> KStream<K, V> stream(String... topics) { + public <K, V> KStream<K, V> stream(final String... topics) { return stream(null, null, null, topics); } - /** - * Create a {@link KStream} instance from the specified topics. - * The default deserializers specified in the config are used. + * Create a {@link KStream} from the specified topics. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * <p> + * If multiple topics are specified there are no ordering guaranteed for records from different topics. * <p> - * If multiple topics are specified there are nor ordering guaranteed for records from different topics. + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest - * @param topics the topic names; must contain at least one topic name + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed + * offsets are available + * @param topics the topic names; must contain at least one topic name * @return a {@link KStream} for the specified topics */ - public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, String... topics) { + public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, + final String... topics) { return stream(offsetReset, null, null, topics); } - /** - * Create a {@link KStream} instance from the specified Pattern. - * The default deserializers specified in the config are used. + * Create a {@link KStream} from the specified topic pattern. + * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the + * {@link StreamsConfig config} are used. + * <p> + * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of + * them and there is no ordering guarantee between records from different topics. * <p> - * If multiple topics are matched by the specified pattern, the created stream will read data from all of them, - * and there is no ordering guarantee between records from different topics + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. * - * @param topicPattern the Pattern to match for topic names + * @param topicPattern the pattern to match for topic names * @return a {@link KStream} for topics matching the regex pattern. */ - public <K, V> KStream<K, V> stream(Pattern topicPattern) { + public <K, V> KStream<K, V> stream(final Pattern topicPattern) { return stream(null, null, null, topicPattern); } /** - * Create a {@link KStream} instance from the specified Pattern. - * The default deserializers specified in the config are used. + * Create a {@link KStream} from the specified topic pattern. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. * <p> - * If multiple topics are matched by the specified pattern, the created stream will read data from all of them, - * and there is no ordering guarantee between records from different topics + * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of + * them and there is no ordering guarantee between records from different topics. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest - * @param topicPattern the Pattern to match for topic names + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed + * offsets are available + * @param topicPattern the pattern to match for topic names * @return a {@link KStream} for topics matching the regex pattern. */ - public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Pattern topicPattern) { + public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern) { return stream(offsetReset, null, null, topicPattern); } - - /** - * Create a {@link KStream} instance from the specified topics. + * Create a {@link KStream} from the specified topics. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. * <p> - * If multiple topics are specified there are nor ordering guaranteed for records from different topics. + * If multiple topics are specified there are no ordering guaranteed for records from different topics. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. * - * @param keySerde key serde used to read this source {@link KStream}, - * if not specified the default serde defined in the configs will be used - * @param valSerde value serde used to read this source {@link KStream}, - * if not specified the default serde defined in the configs will be used - * @param topics the topic names; must contain at least one topic name + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topics the topic names; must contain at least one topic name * @return a {@link KStream} for the specified topics */ - public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String... topics) { + public <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics) { return stream(null, keySerde, valSerde, topics); } /** - * Create a {@link KStream} instance from the specified topics. + * Create a {@link KStream} from the specified topics. * <p> - * If multiple topics are specified there are nor ordering guaranteed for records from different topics. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest + * If multiple topics are specified there are no ordering guaranteed for records from different topics. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. * - * @param keySerde key serde used to read this source {@link KStream}, - * if not specified the default serde defined in the configs will be used - * @param valSerde value serde used to read this source {@link KStream}, - * if not specified the default serde defined in the configs will be used - * @param topics the topic names; must contain at least one topic name + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed + * offsets are available + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topics the topic names; must contain at least one topic name * @return a {@link KStream} for the specified topics */ - public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valSerde, String... topics) { - String name = newName(KStreamImpl.SOURCE_NAME); + public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String... topics) { + final String name = newName(KStreamImpl.SOURCE_NAME); addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics); @@ -150,38 +181,53 @@ public class KStreamBuilder extends TopologyBuilder { /** - * Create a {@link KStream} instance from the specified Pattern. + * Create a {@link KStream} from the specified topic pattern. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * <p> + * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of + * them and there is no ordering guarantee between records from different topics. * <p> - * If multiple topics are matched by the specified pattern, the created stream will read data from all of them, - * and there is no ordering guarantee between records from different topics. + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. * - * @param keySerde key serde used to read this source {@link KStream}, - * if not specified the default serde defined in the configs will be used - * @param valSerde value serde used to read this source {@link KStream}, - * if not specified the default serde defined in the configs will be used - * @param topicPattern the Pattern to match for topic names - * @return a {@link KStream} for the specified topics + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topicPattern the pattern to match for topic names + * @return a {@link KStream} for topics matching the regex pattern. */ - public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, Pattern topicPattern) { + public <K, V> KStream<K, V> stream(final Serde<K> keySerde, + final Serde<V> valSerde, + final Pattern topicPattern) { return stream(null, keySerde, valSerde, topicPattern); } /** - * Create a {@link KStream} instance from the specified Pattern. + * Create a {@link KStream} from the specified topic pattern. * <p> - * If multiple topics are matched by the specified pattern, the created stream will read data from all of them, - * and there is no ordering guarantee between records from different topics. + * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of + * them and there is no ordering guarantee between records from different topics. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case it is the user's responsibility to repartition the date before any key based operation + * (like aggregation or join) is applied to the returned {@link KStream}. * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest - * @param keySerde key serde used to read this source {@link KStream}, - * if not specified the default serde defined in the configs will be used - * @param valSerde value serde used to read this source {@link KStream}, - * if not specified the default serde defined in the configs will be used - * @param topicPattern the Pattern to match for topic names - * @return a {@link KStream} for the specified topics + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed + * offsets are available + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topicPattern the pattern to match for topic names + * @return a {@link KStream} for topics matching the regex pattern. */ - public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valSerde, Pattern topicPattern) { - String name = newName(KStreamImpl.SOURCE_NAME); + public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, + final Serde<K> keySerde, + final Serde<V> valSerde, + final Pattern topicPattern) { + final String name = newName(KStreamImpl.SOURCE_NAME); addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern); @@ -189,71 +235,152 @@ public class KStreamBuilder extends TopologyBuilder { } /** - * Create a {@link KTable} instance for the specified topic. - * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. - * The default deserializers specified in the config are used. - * The resulting {@link KTable} will be materialized in a local state store with the given store name. - * However, no new changelog topic is created in this case since the underlying topic acts as one. + * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the + * {@link StreamsConfig config} are used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest - * @param topic the topic name; cannot be null - * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected - * @return a {@link KTable} for the specified topics + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link KTable} for the specified topic */ - public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, String topic, final String storeName) { - return table(offsetReset, null, null, topic, storeName); + public <K, V> KTable<K, V> table(final String topic, + final String storeName) { + return table(null, null, null, topic, storeName); } /** - * Create a {@link KTable} instance for the specified topic. - * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. - * The default deserializers specified in the config are used. - * The resulting {@link KTable} will be materialized in a local state store with the given store name. - * However, no new changelog topic is created in this case since the underlying topic acts as one. + * Create a {@link KTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param topic the topic name; cannot be null - * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected - * @return a {@link KTable} for the specified topics + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed + * offsets are available + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link KTable} for the specified topic */ - public <K, V> KTable<K, V> table(String topic, final String storeName) { - return table(null, null, null, topic, storeName); + public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final String topic, + final String storeName) { + return table(offsetReset, null, null, topic, storeName); } - /** - * Create a {@link KTable} instance for the specified topic. - * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. - * The resulting {@link KTable} will be materialized in a local state store with the given store name. - * However, no new changelog topic is created in this case since the underlying topic acts as one. + * Create a {@link KTable} for the specified topic. + * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be null - * @param storeName the state store name used for the materialized KTable - * @return a {@link KTable} for the specified topics + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link KTable} for the specified topic */ - public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) { + public <K, V> KTable<K, V> table(final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final String storeName) { return table(null, keySerde, valSerde, topic, storeName); } /** - * Create a {@link KTable} instance for the specified topic. - * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. - * The resulting {@link KTable} will be materialized in a local state store with the given store name. - * However, no new changelog topic is created in this case since the underlying topic acts as one. + * Create a {@link KTable} for the specified topic. + * Input {@link KeyValue} pairs with {@code null} key will be dropped. + * <p> + * Note that the specified input topics must be partitioned by key. + * If this is not the case the returned {@link KTable} will be corrupted. + * <p> + * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be null - * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected - * @return a {@link KTable} for the specified topics + * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed + * offsets are available + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link KTable} for the specified topic */ - public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) { + public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, + final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final String storeName) { final String source = newName(KStreamImpl.SOURCE_NAME); final String name = newName(KTableImpl.SOURCE_NAME); final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName); @@ -265,7 +392,7 @@ public class KStreamBuilder extends TopologyBuilder { // only materialize the KTable into a state store if the storeName is not null if (storeName != null) { - StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, + final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, keySerde, valSerde, false, @@ -279,22 +406,70 @@ public class KStreamBuilder extends TopologyBuilder { return kTable; } + /** + * Create a {@link GlobalKTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue} pairs cannot have a {@code null} key (otherwise an exception will be thrown). + * <p> + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); + * }</pre> + * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} + * regardless of the specified value in {@link StreamsConfig}. + * + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link GlobalKTable} for the specified topic + */ + public <K, V> GlobalKTable<K, V> globalTable(final String topic, + final String storeName) { + return globalTable(null, null, topic, storeName); + } /** - * Create a new {@link GlobalKTable} instance for the specified topic. - * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. - * The resulting {@link GlobalKTable} will be materialized in a local state store with the given store name. - * However, no new changelog topic is created in this case since the underlying topic acts as one. - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name; cannot be null - * @param storeName the state store name used - * @return a {@link GlobalKTable} for the specified topics + * Create a {@link GlobalKTable} for the specified topic. + * The default key and value deserializers as specified in the {@link StreamsConfig config} are used. + * Input {@link KeyValue} pairs cannot have a {@code null} key (otherwise an exception will be thrown). + * <p> + * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given + * {@code storeName}. + * However, no internal changelog topic is created since the original input topic can be used for recovery (cf. + * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long valueForKey = localStore.get(key); + * }</pre> + * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"} + * regardless of the specified value in {@link StreamsConfig}. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be {@code null} + * @param storeName the state store name; cannot be {@code null} + * @return a {@link GlobalKTable} for the specified topic */ @SuppressWarnings("unchecked") - public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName) { + public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, + final Serde<V> valSerde, + final String topic, + final String storeName) { final String sourceName = newName(KStreamImpl.SOURCE_NAME); final String processorName = newName(KTableImpl.SOURCE_NAME); final KTableSource<K, V> tableSource = new KTableSource<>(storeName); @@ -315,41 +490,27 @@ public class KStreamBuilder extends TopologyBuilder { } /** - * Create a new {@link GlobalKTable} instance for the specified topic using the default key and value {@link Serde}s - * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. - * The resulting {@link GlobalKTable} will be materialized in a local state store with the given store name. - * However, no new changelog topic is created in this case since the underlying topic acts as one. - * - * @param topic the topic name; cannot be null - * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected - * @return a {@link GlobalKTable} for the specified topics - */ - public <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName) { - return globalTable(null, null, topic, storeName); - } - - /** - * Create a new instance of {@link KStream} by merging the given streams. + * Create a new instance of {@link KStream} by merging the given {@link KStream}s. * <p> - * There are nor ordering guaranteed for records from different streams. + * There are nor ordering guaranteed for records from different {@link KStream}s. * - * @param streams the instances of {@link KStream} to be merged + * @param streams the {@link KStream}s to be merged * @return a {@link KStream} containing all records of the given streams */ - public <K, V> KStream<K, V> merge(KStream<K, V>... streams) { + public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) { return KStreamImpl.merge(this, streams); } /** + * <strong>This function is only for internal usage only and should not be called.</strong> + * <p> * Create a unique processor name used for translation into the processor topology. - * This function is only for internal usage. * - * @param prefix processor name prefix + * @param prefix processor name prefix * @return a new unique name */ - public String newName(String prefix) { + public String newName(final String prefix) { return prefix + String.format("%010d", index.getAndIncrement()); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index 6df9481..e00a102 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; * * @see FailOnInvalidTimestamp * @see LogAndSkipOnInvalidTimestamp + * @see UsePreviousTimeOnInvalidTimestamp */ public class WallclockTimestampExtractor implements TimestampExtractor { http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index 3ba0f34..25e4a36 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -135,7 +135,7 @@ public class StreamsKafkaClient { } /** - * Creates a set of new topics using batch request. + * Create a set of new topics using batch request. */ public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final int replicationFactor, final long windowChangeLogAdditionalRetention) { @@ -253,7 +253,15 @@ public class StreamsKafkaClient { return metadataResponse.topicMetadata(); } - public void checkBrokerCompatibility() { + /** + * Check if the used brokers have version 0.10.1.x or higher. + * <p> + * Note, for <em>pre</em> 0.10.x brokers the broker version cannot be checked and the client will hang and retry + * until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}. + * + * @throws StreamsException if brokers have version 0.10.0.x + */ + public void checkBrokerCompatibility() throws StreamsException { final ClientRequest clientRequest = kafkaClient.newClientRequest( getBrokerId(), new ApiVersionsRequest.Builder(), http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 6ac87ae..9e59113 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -90,7 +90,6 @@ public class GlobalKTableIntegrationTest { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index b443abc..ac8abd4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -121,7 +121,6 @@ public class JoinIntegrationTest { RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - STREAMS_CONFIG.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); http://git-wip-us.apache.org/repos/asf/kafka/blob/4c42654b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index eeb9177..7061507 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -133,7 +133,6 @@ public class KTableKTableJoinIntegrationTest { streamsConfig = new Properties(); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
