Repository: kafka Updated Branches: refs/heads/trunk 9cbb9f093 -> 329d5fa64
KAFKA-5844; add groupBy(selector, serialized) to Ktable add `KTable#groupBy(KeyValueMapper, Serialized)` and deprecate the overload with `Serde` params Author: Damian Guy <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>, Bill Bejeck <[email protected]> Closes #3802 from dguy/kip-182-ktable-groupby Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/329d5fa6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/329d5fa6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/329d5fa6 Branch: refs/heads/trunk Commit: 329d5fa64a2a3ac1d39ac37fdacbf6e43d500d11 Parents: 9cbb9f0 Author: Damian Guy <[email protected]> Authored: Thu Sep 7 12:35:31 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Thu Sep 7 12:35:31 2017 +0100 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KTable.java | 33 +++++++++++++++++++- .../kafka/streams/kstream/KeyValueMapper.java | 4 +-- .../streams/kstream/internals/KTableImpl.java | 22 ++++++++----- .../kstream/internals/KTableAggregateTest.java | 21 ++++++------- .../internals/KTableKTableLeftJoinTest.java | 3 +- .../kafka/streams/tests/SmokeTestClient.java | 3 +- 6 files changed, 62 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 06a0eee..4bc9572 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -1001,7 +1001,7 @@ public interface KTable<K, V> { * records to and rereading all update records from it, such that the resulting {@link KGroupedTable} is partitioned * on the new key. * <p> - * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serde, Serde)} + * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)} * instead. * * @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated @@ -1012,6 +1012,35 @@ public interface KTable<K, V> { <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); /** + * Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper} + * and {@link Serde}s as specified by {@link Serialized}. + * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the + * provided {@link KeyValueMapper}. + * Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data + * (cf. {@link KGroupedTable}). + * The {@link KeyValueMapper} selects a new key and value (with should both have unmodified type). + * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable} + * <p> + * Because a new key is selected, an internal repartitioning topic will be created in Kafka. + * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is + * an internally generated name, and "-repartition" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * <p> + * All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update + * records to and rereading all update records from it, such that the resulting {@link KGroupedTable} is partitioned + * on the new key. + * + * @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated + * @param serialized the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} + * @param <KR> the key type of the result {@link KGroupedTable} + * @param <VR> the value type of the result {@link KGroupedTable} + * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable} + */ + <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, + final Serialized<KR, VR> serialized); + + /** * Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}. * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the * provided {@link KeyValueMapper}. @@ -1038,7 +1067,9 @@ public interface KTable<K, V> { * @param <KR> the key type of the result {@link KGroupedTable} * @param <VR> the value type of the result {@link KGroupedTable} * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable} + * @deprecated use {@link #groupBy(KeyValueMapper, Serialized)} */ + @Deprecated <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, final Serde<KR> keySerde, final Serde<VR> valueSerde); http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java index e707fbb..2a56a05 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java @@ -39,9 +39,9 @@ import org.apache.kafka.streams.KeyValue; * @see KStream#flatMap(KeyValueMapper) * @see KStream#selectKey(KeyValueMapper) * @see KStream#groupBy(KeyValueMapper) - * @see KStream#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KStream#groupBy(KeyValueMapper, Serialized) * @see KTable#groupBy(KeyValueMapper) - * @see KTable#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KTable#groupBy(KeyValueMapper, Serialized) * @see KTable#toStream(KeyValueMapper) */ public interface KeyValueMapper<K, V, VR> { http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 46f2636..aed7fde 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.PrintForeachAction; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; @@ -609,21 +610,28 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, final Serde<K1> keySerde, final Serde<V1> valueSerde) { + return groupBy(selector, Serialized.with(keySerde, valueSerde)); + } + + @Override + public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) { + return this.groupBy(selector, Serialized.<K1, V1>with(null, null)); + } + + @Override + public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, + final Serialized<K1, V1> serialized) { Objects.requireNonNull(selector, "selector can't be null"); + Objects.requireNonNull(serialized, "serialized can't be null"); String selectName = builder.newName(SELECT_NAME); - KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<K, V, K1, V1>(this, selector); + KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); // select the aggregate key and values (old and new), it would require parent to send old values builder.internalTopologyBuilder.addProcessor(selectName, selectSupplier, this.name); this.enableSendingOldValues(); - return new KGroupedTableImpl<>(builder, selectName, this.name, keySerde, valueSerde); - } - - @Override - public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) { - return this.groupBy(selector, null, null); + return new KGroupedTableImpl<>(builder, selectName, this.name, serialized.keySerde(), serialized.valueSerde()); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index f31e232..9347cc8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; @@ -49,6 +50,7 @@ import static org.junit.Assert.assertEquals; public class KTableAggregateTest { final private Serde<String> stringSerde = Serdes.String(); + private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde, stringSerde); private File stateDir = null; @@ -70,8 +72,7 @@ public class KTableAggregateTest { KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), - stringSerde, - stringSerde + stringSerialzied ).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, @@ -119,8 +120,7 @@ public class KTableAggregateTest { KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), - stringSerde, - stringSerde + stringSerialzied ).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, @@ -160,8 +160,7 @@ public class KTableAggregateTest { } } }, - stringSerde, - stringSerde + stringSerialzied ) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -234,7 +233,7 @@ public class KTableAggregateTest { final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") - .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde) + .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) .count("count") .toStream() .process(proc); @@ -249,7 +248,7 @@ public class KTableAggregateTest { final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") - .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde) + .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) .count() .toStream() .process(proc); @@ -264,7 +263,7 @@ public class KTableAggregateTest { final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") - .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde) + .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) .count("count") .toStream() .process(proc); @@ -299,7 +298,7 @@ public class KTableAggregateTest { public KeyValue<String, String> apply(String key, String value) { return KeyValue.pair(String.valueOf(key.charAt(0)), String.valueOf(key.charAt(1))); } - }, stringSerde, stringSerde) + }, stringSerialzied) .aggregate(new Initializer<String>() { @Override @@ -358,7 +357,7 @@ public class KTableAggregateTest { public KeyValue<String, Long> apply(final Long key, final String value) { return new KeyValue<>(value, key); } - }, Serdes.String(), Serdes.Long()) + }, Serialized.with(Serdes.String(), Serdes.Long())) .reduce(new Reducer<Long>() { @Override public Long apply(final Long value1, final Long value2) { http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index fe92f2b..781eb61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; @@ -346,7 +347,7 @@ public class KTableKTableLeftJoinTest { public KeyValue<Long, String> apply(final Long key, final String value) { return new KeyValue<>(key, value); } - }, Serdes.Long(), Serdes.String()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store"); + }, Serialized.with(Serdes.Long(), Serdes.String())).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store"); final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(), tableOne, tableOne); final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo); http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index fc7a915..50c33e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -219,8 +219,7 @@ public class SmokeTestClient extends SmokeTestUtil { // test repartition Agg agg = new Agg(); cntTable.groupBy(agg.selector(), - stringSerde, - longSerde + Serialized.with(stringSerde, longSerde) ).aggregate(agg.init(), agg.adder(), agg.remover(),
