This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 83281ba KAFKA-7933: Switch from persistent to in-memory store in KTableKTableLeftJoinTest (#6292) 83281ba is described below commit 83281ba0e49693207b8c657dc4cd6c0f6d130928 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Wed Feb 20 23:17:58 2019 -0800 KAFKA-7933: Switch from persistent to in-memory store in KTableKTableLeftJoinTest (#6292) Reviewers: Bill Bejeck <b...@confluent.io>, John Roesler <j...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../internals/KTableKTableLeftJoinTest.java | 40 +++++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 609b070..be43e5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; import org.apache.kafka.test.MockProcessor; @@ -315,16 +316,37 @@ public class KTableKTableLeftJoinTest { final StreamsBuilder builder = new StreamsBuilder(); final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), Serdes.String()); final KTable<Long, String> aggTable = builder - .table(agg, consumed) + .table(agg, consumed, Materialized.as(Stores.inMemoryKeyValueStore("agg-base-store"))) .groupBy(KeyValue::new, Grouped.with(Serdes.Long(), Serdes.String())) - .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, Materialized.as("agg-store")); - - final KTable<Long, String> one = builder.table(tableOne, consumed); - final KTable<Long, String> two = builder.table(tableTwo, consumed); - final KTable<Long, String> three = builder.table(tableThree, consumed); - final KTable<Long, String> four = builder.table(tableFour, consumed); - final KTable<Long, String> five = builder.table(tableFive, consumed); - final KTable<Long, String> six = builder.table(tableSix, consumed); + .reduce( + MockReducer.STRING_ADDER, + MockReducer.STRING_ADDER, + Materialized.as(Stores.inMemoryKeyValueStore("agg-store"))); + + final KTable<Long, String> one = builder.table( + tableOne, + consumed, + Materialized.as(Stores.inMemoryKeyValueStore("tableOne-base-store"))); + final KTable<Long, String> two = builder.table( + tableTwo, + consumed, + Materialized.as(Stores.inMemoryKeyValueStore("tableTwo-base-store"))); + final KTable<Long, String> three = builder.table( + tableThree, + consumed, + Materialized.as(Stores.inMemoryKeyValueStore("tableThree-base-store"))); + final KTable<Long, String> four = builder.table( + tableFour, + consumed, + Materialized.as(Stores.inMemoryKeyValueStore("tableFour-base-store"))); + final KTable<Long, String> five = builder.table( + tableFive, + consumed, + Materialized.as(Stores.inMemoryKeyValueStore("tableFive-base-store"))); + final KTable<Long, String> six = builder.table( + tableSix, + consumed, + Materialized.as(Stores.inMemoryKeyValueStore("tableSix-base-store"))); final ValueMapper<String, String> mapper = value -> value.toUpperCase(Locale.ROOT);