This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 83281ba  KAFKA-7933: Switch from persistent to in-memory store in 
KTableKTableLeftJoinTest (#6292)
83281ba is described below

commit 83281ba0e49693207b8c657dc4cd6c0f6d130928
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Feb 20 23:17:58 2019 -0800

    KAFKA-7933: Switch from persistent to in-memory store in 
KTableKTableLeftJoinTest (#6292)
    
    Reviewers: Bill Bejeck <b...@confluent.io>, John Roesler 
<j...@confluent.io>, Guozhang Wang <guozh...@confluent.io>
---
 .../internals/KTableKTableLeftJoinTest.java        | 40 +++++++++++++++++-----
 1 file changed, 31 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 609b070..be43e5e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.MockProcessor;
@@ -315,16 +316,37 @@ public class KTableKTableLeftJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), 
Serdes.String());
         final KTable<Long, String> aggTable = builder
-            .table(agg, consumed)
+            .table(agg, consumed, 
Materialized.as(Stores.inMemoryKeyValueStore("agg-base-store")))
             .groupBy(KeyValue::new, Grouped.with(Serdes.Long(), 
Serdes.String()))
-            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, 
Materialized.as("agg-store"));
-
-        final KTable<Long, String> one = builder.table(tableOne, consumed);
-        final KTable<Long, String> two = builder.table(tableTwo, consumed);
-        final KTable<Long, String> three = builder.table(tableThree, consumed);
-        final KTable<Long, String> four = builder.table(tableFour, consumed);
-        final KTable<Long, String> five = builder.table(tableFive, consumed);
-        final KTable<Long, String> six = builder.table(tableSix, consumed);
+            .reduce(
+                MockReducer.STRING_ADDER,
+                MockReducer.STRING_ADDER,
+                Materialized.as(Stores.inMemoryKeyValueStore("agg-store")));
+
+        final KTable<Long, String> one = builder.table(
+            tableOne,
+            consumed,
+            
Materialized.as(Stores.inMemoryKeyValueStore("tableOne-base-store")));
+        final KTable<Long, String> two = builder.table(
+            tableTwo,
+            consumed,
+            
Materialized.as(Stores.inMemoryKeyValueStore("tableTwo-base-store")));
+        final KTable<Long, String> three = builder.table(
+            tableThree,
+            consumed,
+            
Materialized.as(Stores.inMemoryKeyValueStore("tableThree-base-store")));
+        final KTable<Long, String> four = builder.table(
+            tableFour,
+            consumed,
+            
Materialized.as(Stores.inMemoryKeyValueStore("tableFour-base-store")));
+        final KTable<Long, String> five = builder.table(
+            tableFive,
+            consumed,
+            
Materialized.as(Stores.inMemoryKeyValueStore("tableFive-base-store")));
+        final KTable<Long, String> six = builder.table(
+            tableSix,
+            consumed,
+            
Materialized.as(Stores.inMemoryKeyValueStore("tableSix-base-store")));
 
         final ValueMapper<String, String> mapper = value -> 
value.toUpperCase(Locale.ROOT);
 

Reply via email to