KAFKA-3477: extended KStream/KTable API to specify custom partitioner for sinks
Author: mjsax <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1180 from mjsax/kafka-3477-streamPartitioner-DSL Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c5fe7bd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c5fe7bd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c5fe7bd Branch: refs/heads/0.10.0 Commit: 5c5fe7bd795f5aab5248fb718c61c8ca3f2f571a Parents: 35fadbf Author: Matthias J. Sax <[email protected]> Authored: Tue Apr 5 15:56:09 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Apr 5 17:08:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 2 +- .../apache/kafka/streams/kstream/KStream.java | 88 +++++++++++++++++--- .../apache/kafka/streams/kstream/KTable.java | 88 +++++++++++++++++--- .../streams/kstream/internals/KStreamImpl.java | 39 ++++++--- .../streams/kstream/internals/KTableImpl.java | 30 ++++++- 5 files changed, 206 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 20958e4..e8fda10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work * based on the assignment of the input topic partitions so that all partitions are being - * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves + * consumed. If instances are added or failed, all instances will rebalance the partition assignment among themselves * to balance processing load. * <p> * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 2313b8b..e4933cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StreamPartitioner; /** * KStream is an abstraction of a <i>record stream</i> of key-value pairs. @@ -92,7 +93,7 @@ public interface KStream<K, V> { /** * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic - * using default serializers and deserializers. + * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}. * * @param topic the topic name @@ -100,37 +101,98 @@ public interface KStream<K, V> { KStream<K, V> through(String topic); /** + * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic + * using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions. + * This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}. + * + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used + * @param topic the topic name + */ + KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic); + + /** * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic. + * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} + * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used + * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used. * This is equivalent to calling {@link #to(Serde, Serde, String)} and * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}. * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name */ KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic); /** - * Materialize this stream to a topic using default serializers specified in the config. + * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic + * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions. + * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and + * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key + * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used + * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used + * @param topic the topic name + */ + KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic); + + /** + * Materialize this stream to a topic using default serializers specified in the config + * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. * * @param topic the topic name */ void to(String topic); /** - * Materialize this stream to a topic. + * Materialize this stream to a topic using default serializers specified in the config and a customizable + * {@link StreamPartitioner} to determine the distribution of records to partitions. * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default serde defined in the configs will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default serde defined in the configs will be used - * @param topic the topic name + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used + * @param topic the topic name + */ + void to(StreamPartitioner<K, V> partitioner, String topic); + + /** + * Materialize this stream to a topic. If {@code keySerde} provides a + * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key + * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used + * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param topic the topic name */ void to(Serde<K> keySerde, Serde<V> valSerde, String topic); /** + * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key + * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used + * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used + * @param topic the topic name + */ + void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic); + + /** * Create a new {@link KStream} instance by applying a {@link org.apache.kafka.streams.kstream.Transformer} to all elements in this stream, one element at a time. * * @param transformerSupplier the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer} http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 30ea882..581ee28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.StreamPartitioner; /** * KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table. @@ -54,7 +55,7 @@ public interface KTable<K, V> { /** * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic - * using default serializers and deserializers. + * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}. * * @param topic the topic name @@ -62,37 +63,98 @@ public interface KTable<K, V> { KTable<K, V> through(String topic); /** + * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic using default serializers + * and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions. + * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}. + * + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used + * @param topic the topic name + */ + KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic); + + /** * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic. + * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} + * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used + * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used. * This is equivalent to calling {@link #to(Serde, Serde, String)} and * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}. * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default key serde defined in the configuration will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default value serde defined in the configuration will be used - * @param topic the topic name + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name */ KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic); /** - * Materialize this stream to a topic using default serializers specified in the config. + * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic + * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions. + * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and + * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key + * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used + * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used + * @param topic the topic name + */ + KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic); + + /** + * Materialize this stream to a topic using default serializers specified in the config + * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. * * @param topic the topic name */ void to(String topic); /** - * Materialize this stream to a topic. + * Materialize this stream to a topic using default serializers specified in the config + * and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions. + * + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used + * @param topic the topic name + */ + void to(StreamPartitioner<K, V> partitioner, String topic); + + /** + * Materialize this stream to a topic. If {@code keySerde} provides a + * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key + * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used + * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used. * - * @param keySerde key serde used to send key-value pairs, - * if not specified the default serde defined in the configs will be used - * @param valSerde value serde used to send key-value pairs, - * if not specified the default serde defined in the configs will be used - * @param topic the topic name + * @param keySerde key serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param topic the topic name */ void to(Serde<K> keySerde, Serde<V> valSerde, String topic); /** + * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions. + * + * @param keySerde key serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param partitioner the function used to determine how records are distributed among partitions of the topic, + * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key + * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used + * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used + * @param topic the topic name + */ + void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic); + + /** * Convert this stream to a new instance of {@link KStream}. */ KStream<K, V> toStream(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 5889e07..0fb3984 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -194,37 +194,56 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override - public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) { - to(keySerde, valSerde, topic); + public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) { + to(keySerde, valSerde, partitioner, topic); return topology.stream(keySerde, valSerde, topic); } @Override + public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) { + return through(keySerde, valSerde, null, topic); + } + + @Override + public KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic) { + return through(null, null, partitioner, topic); + } + + @Override public KStream<K, V> through(String topic) { - return through(null, null, topic); + return through(null, null, null, topic); } @Override public void to(String topic) { - to(null, null, topic); + to(null, null, null, topic); + } + + @Override + public void to(StreamPartitioner<K, V> partitioner, String topic) { + to(null, null, partitioner, topic); } - @SuppressWarnings("unchecked") @Override public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) { + to(keySerde, valSerde, null, topic); + } + + @SuppressWarnings("unchecked") + @Override + public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) { String name = topology.newName(SINK_NAME); - StreamPartitioner<K, V> streamPartitioner = null; Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer(); - - if (keySerializer != null && keySerializer instanceof WindowedSerializer) { + + if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; - streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); + partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); } - topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name); + topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index fd464a0..156f2db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.Stores; import java.util.Collections; @@ -133,25 +134,46 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, + StreamPartitioner<K, V> partitioner, String topic) { - to(keySerde, valSerde, topic); + to(keySerde, valSerde, partitioner, topic); return topology.table(keySerde, valSerde, topic); } @Override + public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) { + return through(keySerde, valSerde, null, topic); + } + + @Override + public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic) { + return through(null, null, partitioner, topic); + } + + @Override public KTable<K, V> through(String topic) { - return through(null, null, topic); + return through(null, null, null, topic); } @Override public void to(String topic) { - to(null, null, topic); + to(null, null, null, topic); + } + + @Override + public void to(StreamPartitioner<K, V> partitioner, String topic) { + to(null, null, partitioner, topic); } @Override public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) { - this.toStream().to(keySerde, valSerde, topic); + this.toStream().to(keySerde, valSerde, null, topic); + } + + @Override + public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) { + this.toStream().to(keySerde, valSerde, partitioner, topic); } @Override
