KAFKA-5651; Follow-up: add with method to Materialized Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where people don't care about the state store name.
Author: Damian Guy <damian....@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Matthias J. Sax <matth...@confluent.io> Closes #4009 from dguy/materialized (cherry picked from commit 23a014052d39521a3af471b3f95809c2164820f7) Signed-off-by: Guozhang Wang <wangg...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/12c77b4b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/12c77b4b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/12c77b4b Branch: refs/heads/1.0 Commit: 12c77b4b08aa0eb39320915a2796444420a05392 Parents: aa1e4c2 Author: Damian Guy <damian....@gmail.com> Authored: Fri Oct 6 15:38:23 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Fri Oct 6 15:38:31 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/StreamsBuilder.java | 31 ++++---- .../kafka/streams/kstream/KGroupedStream.java | 6 +- .../kafka/streams/kstream/KGroupedTable.java | 2 +- .../kafka/streams/kstream/KStreamBuilder.java | 2 +- .../kafka/streams/kstream/Materialized.java | 18 +++++ .../GroupedStreamAggregateBuilder.java | 2 +- .../kstream/internals/InternalNameProvider.java | 23 ++++++ .../internals/InternalStreamsBuilder.java | 25 ++++--- .../kstream/internals/KGroupedStreamImpl.java | 14 ++-- .../kstream/internals/KGroupedTableImpl.java | 34 +++++---- .../streams/kstream/internals/KStreamImpl.java | 54 +++++++------- .../streams/kstream/internals/KTableImpl.java | 50 +++++++------ .../internals/KeyValueStoreMaterializer.java | 6 +- .../kstream/internals/MaterializedInternal.java | 21 +++--- .../internals/SessionWindowedKStreamImpl.java | 11 ++- .../internals/TimeWindowedKStreamImpl.java | 14 ++-- .../KStreamAggregationIntegrationTest.java | 6 +- .../kstream/internals/AbstractStreamTest.java | 2 +- .../internals/InternalStreamsBuilderTest.java | 40 ++++++----- .../kstream/internals/KTableForeachTest.java | 6 +- .../internals/MaterializedInternalTest.java | 76 ++++++++++++++++++++ .../SessionWindowedKStreamImplTest.java | 16 ++++- .../internals/TimeWindowedKStreamImplTest.java | 2 +- .../KeyValueStoreMaterializerTest.java | 23 ++++-- 24 files changed, 330 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 94d19ae..b5cc6d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -224,7 +224,7 @@ public class StreamsBuilder { materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), - new MaterializedInternal<>(materialized)); + new MaterializedInternal<>(materialized, internalStreamsBuilder, topic)); } /** @@ -273,11 +273,9 @@ public class StreamsBuilder { return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), new MaterializedInternal<>( - Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as( - internalStreamsBuilder.newStoreName(topic)) - .withKeySerde(consumed.keySerde) - .withValueSerde(consumed.valueSerde), - false)); + Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde), + internalStreamsBuilder, + topic)); } /** @@ -301,11 +299,12 @@ public class StreamsBuilder { final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal + = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic); return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde())), - new MaterializedInternal<>(materialized)); + materializedInternal); } /** @@ -329,11 +328,12 @@ public class StreamsBuilder { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized = - new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as( - internalStreamsBuilder.newStoreName(topic)) - .withKeySerde(consumed.keySerde) - .withValueSerde(consumed.valueSerde), - false); + new MaterializedInternal<>( + Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde), + internalStreamsBuilder, + topic); + + return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized); } @@ -399,7 +399,7 @@ public class StreamsBuilder { materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), - new MaterializedInternal<>(materialized)); + new MaterializedInternal<>(materialized, internalStreamsBuilder, topic)); } /** @@ -431,7 +431,8 @@ public class StreamsBuilder { final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(materialized, internalStreamsBuilder, topic); return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde())), http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index b3945f7..6347960 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -1173,7 +1173,7 @@ public interface KGroupedStream<K, V> { * @param <VR> the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key - * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))} + * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.with(null, aggValueSerde))} */ @Deprecated <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, @@ -1346,7 +1346,7 @@ public interface KGroupedStream<K, V> { * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window * @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by - * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))} + * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.with(null, aggValueSerde))} */ @Deprecated <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, @@ -1509,7 +1509,7 @@ public interface KGroupedStream<K, V> { * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window * @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by - * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as("someStoreName").withValueSerde(aggValueSerde))} + * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.with(null, aggValueSerde))} */ @Deprecated <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index d0a38cc..4d2bb29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -855,7 +855,7 @@ public interface KGroupedTable<K, V> { * @param <VR> the value type of the aggregated {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key - * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as("someStoreName").withValueSerde(aggValueSerde))} + * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.with(null, aggValueSerde))} */ @Deprecated <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 6b51c86..77745d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -1251,7 +1251,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * @return a new unique name */ public String newName(final String prefix) { - return internalStreamsBuilder.newName(prefix); + return internalStreamsBuilder.newProcessorName(prefix); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java index dd7165c..48dd12e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -139,6 +139,24 @@ public class Materialized<K, V, S extends StateStore> { } /** + * Materialize a {@link StateStore} with the provided key and value {@link Serde}s. + * An internal name will be used for the store. + * + * @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key + * serde from configs will be used + * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value + * serde from configs will be used + * @param <K> key type + * @param <V> value type + * @param <S> store type + * @return a new {@link Materialized} instance with the given key and value serdes + */ + public static <K, V, S extends StateStore> Materialized<K, V, S> with(final Serde<K> keySerde, + final Serde<V> valueSerde) { + return new Materialized<K, V, S>((String) null).withKeySerde(keySerde).withValueSerde(valueSerde); + } + + /** * Set the valueSerde the materialized {@link StateStore} will use. * * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 6fb7a35..e4429cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -65,7 +65,7 @@ class GroupedStreamAggregateBuilder<K, V> { final String functionName, final StoreBuilder storeBuilder, final boolean isQueryable) { - final String aggFunctionName = builder.newName(functionName); + final String aggFunctionName = builder.newProcessorName(functionName); final String sourceName = repartitionIfRequired(storeBuilder.name()); builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName); builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName); http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java new file mode 100644 index 0000000..8d8ebfc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +public interface InternalNameProvider { + String newProcessorName(String prefix); + + String newStoreName(String prefix); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 0df1524..4308e5d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -32,7 +32,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; -public class InternalStreamsBuilder { +public class InternalStreamsBuilder implements InternalNameProvider { final InternalTopologyBuilder internalTopologyBuilder; @@ -44,7 +44,7 @@ public class InternalStreamsBuilder { public <K, V> KStream<K, V> stream(final Collection<String> topics, final ConsumedInternal<K, V> consumed) { - final String name = newName(KStreamImpl.SOURCE_NAME); + final String name = newProcessorName(KStreamImpl.SOURCE_NAME); internalTopologyBuilder.addSource(consumed.offsetResetPolicy(), name, @@ -57,7 +57,7 @@ public class InternalStreamsBuilder { } public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) { - final String name = newName(KStreamImpl.SOURCE_NAME); + final String name = newProcessorName(KStreamImpl.SOURCE_NAME); internalTopologyBuilder.addSource(consumed.offsetResetPolicy(), name, @@ -74,8 +74,8 @@ public class InternalStreamsBuilder { final ConsumedInternal<K, V> consumed, final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); - final String source = newName(KStreamImpl.SOURCE_NAME); - final String name = newName(KTableImpl.SOURCE_NAME); + final String source = newProcessorName(KStreamImpl.SOURCE_NAME); + final String name = newProcessorName(KTableImpl.SOURCE_NAME); final KTable<K, V> kTable = createKTable(consumed, topic, @@ -94,10 +94,11 @@ public class InternalStreamsBuilder { public <K, V> KTable<K, V> table(final String topic, final ConsumedInternal<K, V> consumed, final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { - final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize(); + final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized) + .materialize(); - final String source = newName(KStreamImpl.SOURCE_NAME); - final String name = newName(KTableImpl.SOURCE_NAME); + final String source = newProcessorName(KStreamImpl.SOURCE_NAME); + final String name = newProcessorName(KTableImpl.SOURCE_NAME); final KTable<K, V> kTable = createKTable(consumed, topic, storeBuilder.name(), @@ -141,8 +142,8 @@ public class InternalStreamsBuilder { // explicitly disable logging for global stores materialized.withLoggingDisabled(); final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize(); - final String sourceName = newName(KStreamImpl.SOURCE_NAME); - final String processorName = newName(KTableImpl.SOURCE_NAME); + final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME); + final String processorName = newProcessorName(KTableImpl.SOURCE_NAME); final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name()); @@ -160,10 +161,12 @@ public class InternalStreamsBuilder { return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name())); } - public String newName(final String prefix) { + @Override + public String newProcessorName(final String prefix) { return prefix + String.format("%010d", index.getAndIncrement()); } + @Override public String newStoreName(final String prefix) { return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index dafaa62..45ae7da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -109,7 +109,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized); + = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); return doAggregate( new KStreamReduce<K, V>(materializedInternal.storeName(), reducer), REDUCE_NAME, @@ -171,7 +171,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized); + = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); return doAggregate( new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), AGGREGATE_NAME, @@ -183,10 +183,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre final Aggregator<? super K, ? super V, VR> aggregator) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); - final String storeName = builder.newStoreName(AGGREGATE_NAME); - MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = - new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>as(storeName), false); + new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>with(keySerde, null), + builder, + AGGREGATE_NAME); return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), AGGREGATE_NAME, materializedInternal); @@ -277,7 +277,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized); + = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); if (materializedInternal.valueSerde() == null) { materialized.withValueSerde(Serdes.Long()); } @@ -495,7 +495,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre final String functionName, final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) { - final String aggFunctionName = builder.newName(functionName); + final String aggFunctionName = builder.newProcessorName(functionName); final String sourceName = repartitionIfRequired(storeSupplier.name()); http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 507944a..d5a4e71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Collections; @@ -46,8 +47,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup private static final String REDUCE_NAME = "KTABLE-REDUCE-"; - protected final Serde<? extends K> keySerde; - protected final Serde<? extends V> valSerde; + protected final Serde<K> keySerde; + protected final Serde<V> valSerde; private boolean isQueryable = true; private final Initializer<Long> countInitializer = new Initializer<Long>() { @Override @@ -73,8 +74,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup KGroupedTableImpl(final InternalStreamsBuilder builder, final String name, final String sourceName, - final Serde<? extends K> keySerde, - final Serde<? extends V> valSerde) { + final Serde<K> keySerde, + final Serde<V> valSerde) { super(builder, name, Collections.singleton(sourceName)); this.keySerde = keySerde; this.valSerde = valSerde; @@ -141,10 +142,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup @SuppressWarnings("deprecation") private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, final String functionName, - final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { - final String sinkName = builder.newName(KStreamImpl.SINK_NAME); - final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME); - final String funcName = builder.newName(functionName); + final StateStoreSupplier<KeyValueStore> storeSupplier) { + final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME); + final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME); + final String funcName = builder.newProcessorName(functionName); buildAggregate(aggregateSupplier, storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX, @@ -184,15 +185,16 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, final String functionName, final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) { - final String sinkName = builder.newName(KStreamImpl.SINK_NAME); - final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME); - final String funcName = builder.newName(functionName); + final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME); + final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME); + final String funcName = builder.newProcessorName(functionName); buildAggregate(aggregateSupplier, materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX, funcName, sourceName, sinkName); - builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), funcName); + builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized) + .materialize(), funcName); // return the KTable representation with the intermediate topic as the sources return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable); @@ -215,7 +217,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup Objects.requireNonNull(subtractor, "subtractor can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized); + = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(materializedInternal.storeName(), adder, subtractor); @@ -255,6 +257,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup materialized); } + @SuppressWarnings("unchecked") @Override public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, @@ -265,7 +268,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup Objects.requireNonNull(subtractor, "subtractor can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = - new MaterializedInternal<>(materialized); + new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { + materializedInternal.withKeySerde(keySerde); + } final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(), initializer, adder, http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index cbbe848..8e80315 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -123,7 +123,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate) { Objects.requireNonNull(predicate, "predicate can't be null"); - String name = builder.newName(FILTER_NAME); + String name = builder.newProcessorName(FILTER_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, false), this.name); @@ -133,7 +133,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) { Objects.requireNonNull(predicate, "predicate can't be null"); - String name = builder.newName(FILTER_NAME); + String name = builder.newProcessorName(FILTER_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, true), this.name); @@ -147,7 +147,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } private <K1> String internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) { - String name = builder.newName(KEY_SELECT_NAME); + String name = builder.newProcessorName(KEY_SELECT_NAME); builder.internalTopologyBuilder.addProcessor( name, new KStreamMap<>( @@ -166,7 +166,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); - String name = builder.newName(MAP_NAME); + String name = builder.newProcessorName(MAP_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamMap<>(mapper), this.name); @@ -177,7 +177,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); - String name = builder.newName(MAPVALUES_NAME); + String name = builder.newProcessorName(MAPVALUES_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(mapper), this.name); @@ -247,7 +247,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public void print(final Printed<K, V> printed) { Objects.requireNonNull(printed, "printed can't be null"); final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed); - final String name = builder.newName(PRINTING_NAME); + final String name = builder.newProcessorName(PRINTING_NAME); builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name); } @@ -320,7 +320,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); - String name = builder.newName(FLATMAP_NAME); + String name = builder.newProcessorName(FLATMAP_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMap<>(mapper), this.name); @@ -330,7 +330,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); - String name = builder.newName(FLATMAPVALUES_NAME); + String name = builder.newProcessorName(FLATMAPVALUES_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name); @@ -346,13 +346,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V for (final Predicate<? super K, ? super V> predicate : predicates) { Objects.requireNonNull(predicate, "predicates can't have null values"); } - String branchName = builder.newName(BRANCH_NAME); + String branchName = builder.newProcessorName(BRANCH_NAME); builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name); KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length); for (int i = 0; i < predicates.length; i++) { - String childName = builder.newName(BRANCHCHILD_NAME); + String childName = builder.newProcessorName(BRANCHCHILD_NAME); builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName); @@ -371,7 +371,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V private KStream<K, V> merge(final InternalStreamsBuilder builder, final KStream<K, V> stream) { KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream; - String name = builder.newName(MERGE_NAME); + String name = builder.newProcessorName(MERGE_NAME); String[] parentNames = {this.name, streamImpl.name}; Set<String> allSourceNodes = new HashSet<>(); @@ -406,7 +406,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public void foreach(final ForeachAction<? super K, ? super V> action) { Objects.requireNonNull(action, "action can't be null"); - String name = builder.newName(FOREACH_NAME); + String name = builder.newProcessorName(FOREACH_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, false), this.name); } @@ -414,7 +414,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) { Objects.requireNonNull(action, "action can't be null"); - final String name = builder.newName(PEEK_NAME); + final String name = builder.newProcessorName(PEEK_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, true), this.name); @@ -482,7 +482,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @SuppressWarnings("unchecked") private void to(final String topic, final ProducedInternal<K, V> produced) { - final String name = builder.newName(SINK_NAME); + final String name = builder.newProcessorName(SINK_NAME); final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer(); final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer(); final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner(); @@ -500,7 +500,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, final String... stateStoreNames) { Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); - String name = builder.newName(TRANSFORM_NAME); + String name = builder.newProcessorName(TRANSFORM_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name); if (stateStoreNames != null && stateStoreNames.length > 0) { @@ -514,7 +514,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier, final String... stateStoreNames) { Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null"); - String name = builder.newName(TRANSFORMVALUES_NAME); + String name = builder.newProcessorName(TRANSFORMVALUES_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name); if (stateStoreNames != null && stateStoreNames.length > 0) { @@ -527,7 +527,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier, final String... stateStoreNames) { - final String name = builder.newName(PROCESSOR_NAME); + final String name = builder.newProcessorName(PROCESSOR_NAME); builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); if (stateStoreNames != null && stateStoreNames.length > 0) { @@ -645,9 +645,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V String baseName = topicNamePrefix != null ? topicNamePrefix : name; String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX; - String sinkName = builder.newName(SINK_NAME); - String filterName = builder.newName(FILTER_NAME); - String sourceName = builder.newName(SOURCE_NAME); + String sinkName = builder.newProcessorName(SINK_NAME); + String filterName = builder.newProcessorName(FILTER_NAME); + String sourceName = builder.newProcessorName(SOURCE_NAME); builder.internalTopologyBuilder.addInternalTopic(repartitionTopic); builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() { @@ -753,7 +753,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V Objects.requireNonNull(joiner, "joiner can't be null"); final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier(); - final String name = builder.newName(LEFTJOIN_NAME); + final String name = builder.newProcessorName(LEFTJOIN_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name); return new KStreamImpl<>(builder, name, sourceNodes, false); } @@ -767,7 +767,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); - final String name = builder.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); + final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name); builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).internalStoreName()); builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name); @@ -886,11 +886,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final ValueJoiner<? super V1, ? super V2, ? extends R> joiner, final JoinWindows windows, final Joined<K1, V1, V2> joined) { - String thisWindowStreamName = builder.newName(WINDOWED_NAME); - String otherWindowStreamName = builder.newName(WINDOWED_NAME); - String joinThisName = rightOuter ? builder.newName(OUTERTHIS_NAME) : builder.newName(JOINTHIS_NAME); - String joinOtherName = leftOuter ? builder.newName(OUTEROTHER_NAME) : builder.newName(JOINOTHER_NAME); - String joinMergeName = builder.newName(MERGE_NAME); + String thisWindowStreamName = builder.newProcessorName(WINDOWED_NAME); + String otherWindowStreamName = builder.newProcessorName(WINDOWED_NAME); + String joinThisName = rightOuter ? builder.newProcessorName(OUTERTHIS_NAME) : builder.newProcessorName(JOINTHIS_NAME); + String joinOtherName = leftOuter ? builder.newProcessorName(OUTEROTHER_NAME) : builder.newProcessorName(JOINOTHER_NAME); + String joinMergeName = builder.newProcessorName(MERGE_NAME); final StoreBuilder<WindowStore<K1, V1>> thisWindow = createWindowedStateStore(windows, joined.keySerde(), joined.valueSerde(), joinThisName + "-store"); http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index db8de1a..8c79dec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -146,7 +146,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier, final boolean isFilterNot) { Objects.requireNonNull(predicate, "predicate can't be null"); - String name = builder.newName(FILTER_NAME); + String name = builder.newProcessorName(FILTER_NAME); String internalStoreName = null; if (storeSupplier != null) { internalStoreName = storeSupplier.name(); @@ -162,7 +162,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate, final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized, final boolean filterNot) { - String name = builder.newName(FILTER_NAME); + String name = builder.newProcessorName(FILTER_NAME); KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, @@ -193,7 +193,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(predicate, "predicate can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doFilter(predicate, new MaterializedInternal<>(materialized), false); + return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), false); } @SuppressWarnings("deprecation") @@ -225,7 +225,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(predicate, "predicate can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doFilter(predicate, new MaterializedInternal<>(materialized), true); + return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), true); } @SuppressWarnings("deprecation") @@ -252,7 +252,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final Serde<V1> valueSerde, final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(mapper); - String name = builder.newName(MAPVALUES_NAME); + String name = builder.newProcessorName(MAPVALUES_NAME); String internalStoreName = null; if (storeSupplier != null) { internalStoreName = storeSupplier.name(); @@ -278,13 +278,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, Objects.requireNonNull(mapper, "mapper can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized); - final String name = builder.newName(MAPVALUES_NAME); + = new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME); + final String name = builder.newProcessorName(MAPVALUES_NAME); final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this, mapper, materializedInternal.storeName()); builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); - builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(), + builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal) + .materialize(), name); return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true); } @@ -335,7 +336,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final Serde<V> valSerde, final String label) { Objects.requireNonNull(label, "label can't be null"); - final String name = builder.newName(PRINTING_NAME); + final String name = builder.newProcessorName(PRINTING_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label)), this.name); } @@ -374,7 +375,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, if (filePath.trim().isEmpty()) { throw new TopologyException("filePath can't be an empty string"); } - String name = builder.newName(PRINTING_NAME); + String name = builder.newProcessorName(PRINTING_NAME); try { PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name()); builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name); @@ -387,7 +388,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public void foreach(final ForeachAction<? super K, ? super V> action) { Objects.requireNonNull(action, "action can't be null"); - String name = builder.newName(FOREACH_NAME); + String name = builder.newProcessorName(FOREACH_NAME); KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() { @Override public void apply(K key, Change<V> value) { @@ -404,16 +405,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final StreamPartitioner<? super K, ? super V> partitioner, final String topic, final String queryableStoreName) { - final String internalStoreName = queryableStoreName != null ? queryableStoreName : builder.newStoreName(KTableImpl.TOSTREAM_NAME); - to(keySerde, valSerde, partitioner, topic); return builder.table(topic, new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null), - new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(internalStoreName) - .withKeySerde(keySerde) - .withValueSerde(valSerde), - queryableStoreName != null)); + new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde), + builder, + KTableImpl.TOSTREAM_NAME)); } @SuppressWarnings("deprecation") @@ -543,7 +541,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public KStream<K, V> toStream() { - String name = builder.newName(TOSTREAM_NAME); + String name = builder.newProcessorName(TOSTREAM_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() { @Override @@ -573,7 +571,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false); + return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), false, false); } @SuppressWarnings("deprecation") @@ -604,7 +602,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { - return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true); + return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), true, true); } @SuppressWarnings("deprecation") @@ -637,7 +635,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { return doJoin(other, joiner, - new MaterializedInternal<>(materialized), + new MaterializedInternal<>(materialized, builder, MERGE_NAME), true, false); } @@ -684,7 +682,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - final String joinMergeName = builder.newName(MERGE_NAME); + final String joinMergeName = builder.newProcessorName(MERGE_NAME); final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name(); final KTable<K, R> result = buildJoin((AbstractStream<K>) other, joiner, @@ -709,7 +707,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); final String internalQueryableName = materialized == null ? null : materialized.storeName(); - final String joinMergeName = builder.newName(MERGE_NAME); + final String joinMergeName = builder.newProcessorName(MERGE_NAME); final KTable<K, VR> result = buildJoin((AbstractStream<K>) other, joiner, leftOuter, @@ -741,8 +739,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, ((KTableImpl) other).enableSendingOldValues(); } - final String joinThisName = builder.newName(JOINTHIS_NAME); - final String joinOtherName = builder.newName(JOINOTHER_NAME); + final String joinThisName = builder.newProcessorName(JOINTHIS_NAME); + final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME); final KTableKTableAbstractJoin<K, R, V, V1> joinThis; @@ -792,7 +790,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final Serialized<K1, V1> serialized) { Objects.requireNonNull(selector, "selector can't be null"); Objects.requireNonNull(serialized, "serialized can't be null"); - String selectName = builder.newName(SELECT_NAME); + String selectName = builder.newProcessorName(SELECT_NAME); KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java index 1d702f2..c8cd35d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java @@ -29,10 +29,14 @@ public class KeyValueStoreMaterializer<K, V> { this.materialized = materialized; } + /** + * @return StoreBuilder + */ public StoreBuilder<KeyValueStore<K, V>> materialize() { KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier(); if (supplier == null) { - supplier = Stores.persistentKeyValueStore(materialized.storeName()); + final String name = materialized.storeName(); + supplier = Stores.persistentKeyValueStore(name); } final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(supplier, materialized.keySerde(), http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index 9f186fd..c933b86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -27,21 +27,24 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ private final boolean queryable; - public MaterializedInternal(final Materialized<K, V, S> materialized) { - this(materialized, true); - } - + public MaterializedInternal(final Materialized<K, V, S> materialized, - final boolean queryable) { + final InternalNameProvider nameProvider, + final String generatedStorePrefix) { super(materialized); - this.queryable = queryable; + if (storeName() == null) { + queryable = false; + storeName = nameProvider.newStoreName(generatedStorePrefix); + } else { + queryable = true; + } } public String storeName() { - if (storeName != null) { - return storeName; + if (storeSupplier != null) { + return storeSupplier.name(); } - return storeSupplier.name(); + return storeName; } public StoreSupplier<S> storeSupplier() { http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index 6644c92..34e5bd7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -83,7 +83,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized); + = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); if (materializedInternal.valueSerde() == null) { materialized.withValueSerde(Serdes.Long()); } @@ -93,6 +93,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen materialized); } + @SuppressWarnings("unchecked") @Override public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -100,7 +101,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(sessionMerger, "sessionMerger can't be null"); - return doAggregate(initializer, aggregator, sessionMerger, null); + return doAggregate(initializer, aggregator, sessionMerger, (Serde<T>) valSerde); } @SuppressWarnings("unchecked") @@ -113,7 +114,11 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(sessionMerger, "sessionMerger can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal + = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { + materializedInternal.withKeySerde(keySerde); + } return (KTable<Windowed<K>, VR>) aggregateBuilder.build( new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator, sessionMerger), AGGREGATE_NAME, http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index daba4c3..5e54770 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -73,7 +73,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal - = new MaterializedInternal<>(materialized); + = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); if (materializedInternal.valueSerde() == null) { materialized.withValueSerde(Serdes.Long()); } @@ -81,12 +81,13 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr } + @SuppressWarnings("unchecked") @Override public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); - return doAggregate(initializer, aggregator, null); + return doAggregate(initializer, aggregator, (Serde<VR>) valSerde); } @SuppressWarnings("unchecked") @@ -108,7 +109,11 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal + = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + if (materializedInternal.keySerde() == null) { + materializedInternal.withKeySerde(keySerde); + } return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, @@ -135,7 +140,8 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) { Objects.requireNonNull(reducer, "reducer can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); + final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal + = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer), REDUCE_NAME, http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 350facf..4169eb2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -38,6 +39,7 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Serialized; @@ -48,6 +50,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.TestUtils; @@ -308,7 +311,8 @@ public class KStreamAggregationIntegrationTest { groupedStream.windowedBy(TimeWindows.of(500L)) .aggregate( initializer, - aggregator + aggregator, + Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null, Serdes.Integer()) ) .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java index 8e1a8a1..e16d8e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java @@ -64,7 +64,7 @@ public class AbstractStreamTest { } KStream<K, V> randomFilter() { - String name = builder.newName("RANDOM-FILTER-"); + String name = builder.newProcessorName("RANDOM-FILTER-"); builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), this.name); return new KStreamImpl<>(builder, name, sourceNodes, false); } http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index 68d0e24..05a0214 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -61,8 +61,9 @@ public class InternalStreamsBuilderTest { private KStreamTestDriver driver = null; private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(); + private final String storePrefix = "prefix-"; private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), false); + = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), builder, storePrefix); @Before public void setUp() { @@ -79,15 +80,15 @@ public class InternalStreamsBuilderTest { @Test public void testNewName() { - assertEquals("X-0000000000", builder.newName("X-")); - assertEquals("Y-0000000001", builder.newName("Y-")); - assertEquals("Z-0000000002", builder.newName("Z-")); + assertEquals("X-0000000000", builder.newProcessorName("X-")); + assertEquals("Y-0000000001", builder.newProcessorName("Y-")); + assertEquals("Z-0000000002", builder.newProcessorName("Z-")); final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder()); - assertEquals("X-0000000000", newBuilder.newName("X-")); - assertEquals("Y-0000000001", newBuilder.newName("Y-")); - assertEquals("Z-0000000002", newBuilder.newName("Z-")); + assertEquals("X-0000000000", newBuilder.newProcessorName("X-")); + assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-")); + assertEquals("Z-0000000002", newBuilder.newProcessorName("Z-")); } @Test @@ -141,16 +142,18 @@ public class InternalStreamsBuilderTest { KTable table1 = builder.table("topic2", consumed, new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("topic2"), - false)); + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null), + builder, + storePrefix)); final ProcessorTopology topology = builder.internalTopologyBuilder.build(null); assertEquals(1, topology.stateStores().size()); - assertEquals("topic2", topology.stateStores().get(0).name()); + final String storeName = "prefix-STATE-STORE-0000000000"; + assertEquals(storeName, topology.stateStores().get(0).name()); assertEquals(1, topology.storeToChangelogTopic().size()); - assertEquals("topic2", topology.storeToChangelogTopic().get("topic2")); + assertEquals("topic2", topology.storeToChangelogTopic().get(storeName)); assertNull(table1.queryableStoreName()); } @@ -160,7 +163,8 @@ public class InternalStreamsBuilderTest { consumed, new MaterializedInternal<>( Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"), - false)); + builder, + storePrefix)); final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology(); final List<StateStore> stateStores = topology.globalStateStores(); @@ -184,11 +188,11 @@ public class InternalStreamsBuilderTest { builder.globalTable("table", consumed, new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1"))); + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1"), builder, storePrefix)); builder.globalTable("table2", consumed, new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2"))); + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2"), builder, storePrefix)); doBuildGlobalTopologyWithAllGlobalTables(); } @@ -201,14 +205,14 @@ public class InternalStreamsBuilderTest { final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one))); + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one), builder, storePrefix)); final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, new MaterializedInternal<>( - Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two))); + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two), builder, storePrefix)); final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), false); + = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), builder, storePrefix); builder.table("not-global", consumed, materialized); final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() { @@ -243,7 +247,7 @@ public class InternalStreamsBuilderTest { final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed); final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized - = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), false); + = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), builder, storePrefix); final KTable<String, String> table = builder.table("table-topic", consumed, materialized); assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store")); http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java index 23e0b59..d8b3a5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java @@ -85,9 +85,9 @@ public class KTableForeachTest { StreamsBuilder builder = new StreamsBuilder(); KTable<Integer, String> table = builder.table(topicName, Consumed.with(intSerde, stringSerde), - new MaterializedInternal<>(Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName) - .withKeySerde(intSerde) - .withValueSerde(stringSerde))); + Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName) + .withKeySerde(intSerde) + .withValueSerde(stringSerde)); table.foreach(action); // Then http://git-wip-us.apache.org/repos/asf/kafka/blob/12c77b4b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java new file mode 100644 index 0000000..5fd76f3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Test; +import org.junit.runner.RunWith; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@RunWith(EasyMockRunner.class) +public class MaterializedInternalTest { + + @Mock(type = MockType.NICE) + private InternalNameProvider nameProvider; + + @Mock(type = MockType.NICE) + private KeyValueBytesStoreSupplier supplier; + private final String prefix = "prefix"; + + @Test + public void shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() { + final String generatedName = prefix + "-store"; + EasyMock.expect(nameProvider.newStoreName(prefix)).andReturn(generatedName); + + EasyMock.replay(nameProvider); + + final MaterializedInternal<Object, Object, StateStore> materialized + = new MaterializedInternal<>(Materialized.with(null, null), nameProvider, prefix); + + assertThat(materialized.storeName(), equalTo(generatedName)); + EasyMock.verify(nameProvider); + } + + @Test + public void shouldUseProvidedStoreNameWhenSet() { + final String storeName = "store-name"; + final MaterializedInternal<Object, Object, StateStore> materialized + = new MaterializedInternal<>(Materialized.as(storeName), nameProvider, prefix); + assertThat(materialized.storeName(), equalTo(storeName)); + } + + @Test + public void shouldUseStoreNameOfSupplierWhenProvided() { + final String storeName = "other-store-name"; + EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes(); + EasyMock.replay(supplier); + final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized + = new MaterializedInternal<>(Materialized.as(supplier), nameProvider, prefix); + assertThat(materialized.storeName(), equalTo(storeName)); + } +} \ No newline at end of file