Repository: kafka Updated Branches: refs/heads/trunk 7a70c1a10 -> fbc518554
http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java index 20efd45..b37e5e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -27,14 +27,13 @@ import static org.junit.Assert.assertNotEquals; public class JoinWindowsTest { - private static String anyName = "window"; private static long anySize = 123L; private static long anyOtherSize = 456L; // should be larger than anySize @Test public void shouldHaveSaneEqualsAndHashCode() { - JoinWindows w1 = JoinWindows.of("w1", anySize); - JoinWindows w2 = JoinWindows.of("w2", anySize); + JoinWindows w1 = JoinWindows.of(anySize); + JoinWindows w2 = JoinWindows.of(anySize); // Reflexive assertEquals(w1, w1); @@ -45,36 +44,36 @@ public class JoinWindowsTest { assertEquals(w2, w1); assertEquals(w1.hashCode(), w2.hashCode()); - JoinWindows w3 = JoinWindows.of("w3", w2.after).before(anyOtherSize); - JoinWindows w4 = JoinWindows.of("w4", anyOtherSize).after(w2.after); + JoinWindows w3 = JoinWindows.of(w2.after).before(anyOtherSize); + JoinWindows w4 = JoinWindows.of(anyOtherSize).after(w2.after); assertEquals(w3, w4); assertEquals(w4, w3); assertEquals(w3.hashCode(), w4.hashCode()); // Inequality scenarios assertNotEquals("must be false for null", null, w1); - assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1); + assertNotEquals("must be false for different window types", UnlimitedWindows.of(), w1); assertNotEquals("must be false for different types", new Object(), w1); - JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize", w1.after + 1); + JoinWindows differentWindowSize = JoinWindows.of(w1.after + 1); assertNotEquals("must be false when window sizes are different", differentWindowSize, w1); - JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize", w1.after).after(w1.after + 1); + JoinWindows differentWindowSize2 = JoinWindows.of(w1.after).after(w1.after + 1); assertNotEquals("must be false when window sizes are different", differentWindowSize2, w1); - JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize", w1.after).before(w1.before + 1); + JoinWindows differentWindowSize3 = JoinWindows.of(w1.after).before(w1.before + 1); assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1); } @Test public void validWindows() { - JoinWindows.of(anyName, anyOtherSize) // [ -anyOtherSize ; anyOtherSize ] + JoinWindows.of(anyOtherSize) // [ -anyOtherSize ; anyOtherSize ] .before(anySize) // [ -anySize ; anyOtherSize ] .before(0) // [ 0 ; anyOtherSize ] .before(-anySize) // [ anySize ; anyOtherSize ] .before(-anyOtherSize); // [ anyOtherSize ; anyOtherSize ] - JoinWindows.of(anyName, anyOtherSize) // [ -anyOtherSize ; anyOtherSize ] + JoinWindows.of(anyOtherSize) // [ -anyOtherSize ; anyOtherSize ] .after(anySize) // [ -anyOtherSize ; anySize ] .after(0) // [ -anyOtherSize ; 0 ] .after(-anySize) // [ -anyOtherSize ; -anySize ] @@ -82,28 +81,18 @@ public class JoinWindowsTest { } @Test(expected = IllegalArgumentException.class) - public void nameMustNotBeEmpty() { - JoinWindows.of("", anySize); - } - - @Test(expected = IllegalArgumentException.class) - public void nameMustNotBeNull() { - JoinWindows.of(null, anySize); - } - - @Test(expected = IllegalArgumentException.class) public void timeDifferenceMustNotBeNegative() { - JoinWindows.of(anyName, -1); + JoinWindows.of(-1); } @Test(expected = IllegalArgumentException.class) public void afterBelowLower() { - JoinWindows.of(anyName, anySize).after(-anySize - 1); + JoinWindows.of(anySize).after(-anySize - 1); } @Test(expected = IllegalArgumentException.class) public void beforeOverUpper() { - JoinWindows.of(anyName, anySize).before(-anySize - 1); + JoinWindows.of(anySize).before(-anySize - 1); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index 5acd6e2..2bea16b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -29,13 +29,12 @@ import static org.junit.Assert.assertNotEquals; public class TimeWindowsTest { - private static String anyName = "window"; private static long anySize = 123L; @Test public void shouldHaveSaneEqualsAndHashCode() { - TimeWindows w1 = TimeWindows.of("w1", anySize); - TimeWindows w2 = TimeWindows.of("w2", w1.size); + TimeWindows w1 = TimeWindows.of(anySize); + TimeWindows w2 = TimeWindows.of(w1.size); // Reflexive assertEquals(w1, w1); @@ -47,62 +46,53 @@ public class TimeWindowsTest { assertEquals(w1.hashCode(), w2.hashCode()); // Transitive - TimeWindows w3 = TimeWindows.of("w3", w2.size); + TimeWindows w3 = TimeWindows.of(w2.size); assertEquals(w2, w3); assertEquals(w1, w3); assertEquals(w1.hashCode(), w3.hashCode()); // Inequality scenarios assertNotEquals("must be false for null", null, w1); - assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1); + assertNotEquals("must be false for different window types", UnlimitedWindows.of(), w1); assertNotEquals("must be false for different types", new Object(), w1); - TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1); + TimeWindows differentWindowSize = TimeWindows.of(w1.size + 1); assertNotEquals("must be false when window sizes are different", differentWindowSize, w1); TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1); assertNotEquals("must be false when advance intervals are different", differentAdvanceInterval, w1); } - @Test(expected = IllegalArgumentException.class) - public void nameMustNotBeEmpty() { - TimeWindows.of("", anySize); - } - - @Test(expected = IllegalArgumentException.class) - public void nameMustNotBeNull() { - TimeWindows.of(null, anySize); - } @Test(expected = IllegalArgumentException.class) public void windowSizeMustNotBeNegative() { - TimeWindows.of(anyName, -1); + TimeWindows.of(-1); } @Test(expected = IllegalArgumentException.class) public void windowSizeMustNotBeZero() { - TimeWindows.of(anyName, 0); + TimeWindows.of(0); } @Test(expected = IllegalArgumentException.class) public void advanceIntervalMustNotBeNegative() { - TimeWindows.of(anyName, anySize).advanceBy(-1); + TimeWindows.of(anySize).advanceBy(-1); } @Test(expected = IllegalArgumentException.class) public void advanceIntervalMustNotBeZero() { - TimeWindows.of(anyName, anySize).advanceBy(0); + TimeWindows.of(anySize).advanceBy(0); } @Test(expected = IllegalArgumentException.class) public void advanceIntervalMustNotBeLargerThanWindowSize() { long size = anySize; - TimeWindows.of(anyName, size).advanceBy(size + 1); + TimeWindows.of(size).advanceBy(size + 1); } @Test public void windowsForHoppingWindows() { - TimeWindows windows = TimeWindows.of(anyName, 12L).advanceBy(5L); + TimeWindows windows = TimeWindows.of(12L).advanceBy(5L); Map<Long, TimeWindow> matched = windows.windowsFor(21L); assertEquals(12L / 5L + 1, matched.size()); assertEquals(new TimeWindow(10L, 22L), matched.get(10L)); @@ -112,7 +102,7 @@ public class TimeWindowsTest { @Test public void windowsForBarelyOverlappingHoppingWindows() { - TimeWindows windows = TimeWindows.of(anyName, 6L).advanceBy(5L); + TimeWindows windows = TimeWindows.of(6L).advanceBy(5L); Map<Long, TimeWindow> matched = windows.windowsFor(7L); assertEquals(1, matched.size()); assertEquals(new TimeWindow(5L, 11L), matched.get(5L)); @@ -120,7 +110,7 @@ public class TimeWindowsTest { @Test public void windowsForTumblingWindows() { - TimeWindows windows = TimeWindows.of(anyName, 12L); + TimeWindows windows = TimeWindows.of(12L); Map<Long, TimeWindow> matched = windows.windowsFor(21L); assertEquals(1, matched.size()); assertEquals(new TimeWindow(12L, 24L), matched.get(12L)); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java index da5f159..c1f4be6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java @@ -29,32 +29,21 @@ import static org.junit.Assert.assertTrue; public class UnlimitedWindowsTest { - private static String anyName = "window"; private static long anyStartTime = 10L; @Test(expected = IllegalArgumentException.class) - public void nameMustNotBeEmpty() { - UnlimitedWindows.of(""); - } - - @Test(expected = IllegalArgumentException.class) - public void nameMustNotBeNull() { - UnlimitedWindows.of(null); - } - - @Test(expected = IllegalArgumentException.class) public void startTimeMustNotBeNegative() { - UnlimitedWindows.of(anyName).startOn(-1); + UnlimitedWindows.of().startOn(-1); } @Test public void startTimeCanBeZero() { - UnlimitedWindows.of(anyName).startOn(0); + UnlimitedWindows.of().startOn(0); } @Test public void shouldIncludeRecordsThatHappenedOnWindowStart() { - UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime); + UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime); Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.start); assertEquals(1, matchedWindows.size()); assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime)); @@ -62,7 +51,7 @@ public class UnlimitedWindowsTest { @Test public void shouldIncludeRecordsThatHappenedAfterWindowStart() { - UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime); + UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime); long timestamp = w.start + 1; Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp); assertEquals(1, matchedWindows.size()); @@ -71,7 +60,7 @@ public class UnlimitedWindowsTest { @Test public void shouldExcludeRecordsThatHappenedBeforeWindowStart() { - UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime); + UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime); long timestamp = w.start - 1; Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp); assertTrue(matchedWindows.isEmpty()); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 6242702..d5fc41b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -109,14 +109,14 @@ public class KStreamImplTest { public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of("join-0", anyWindowSize), stringSerde, intSerde, intSerde); + }, JoinWindows.of(anyWindowSize), stringSerde, intSerde, intSerde); KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() { @Override public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of("join-1", anyWindowSize), stringSerde, intSerde, intSerde); + }, JoinWindows.of(anyWindowSize), stringSerde, intSerde, intSerde); stream4.to("topic-5"); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index aa7d117..7175f63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -77,8 +77,7 @@ public class KStreamKStreamJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), - intSerde, stringSerde, stringSerde); + joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -176,8 +175,7 @@ public class KStreamKStreamJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), - intSerde, stringSerde, stringSerde); + joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -277,8 +275,8 @@ public class KStreamKStreamJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), - intSerde, stringSerde, stringSerde); + + joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 8e05da9..95d9ef6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -78,8 +78,8 @@ public class KStreamKStreamLeftJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), - intSerde, stringSerde, stringSerde); + + joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -157,8 +157,8 @@ public class KStreamKStreamLeftJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), - intSerde, stringSerde, stringSerde); + + joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 8bc9a77..f2fad78 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -75,7 +75,7 @@ public class KStreamKTableLeftJoinTest { processor = new MockProcessorSupplier<>(); stream = builder.stream(intSerde, stringSerde, topic1); - table = builder.table(intSerde, stringSerde, topic2); + table = builder.table(intSerde, stringSerde, topic2, "anyStoreName"); stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index db533e4..1a5de5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -75,8 +75,8 @@ public class KStreamWindowAggregateTest { strSerde) .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - TimeWindows.of("topic1-Canonized", 10).advanceBy(5), - strSerde); + TimeWindows.of(10).advanceBy(5), + strSerde, "topic1-Canonized"); MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); @@ -154,8 +154,8 @@ public class KStreamWindowAggregateTest { stream1.groupByKey(strSerde, strSerde) .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - TimeWindows.of("topic1-Canonized", 10).advanceBy(5), - strSerde); + TimeWindows.of(10).advanceBy(5), + strSerde, "topic1-Canonized"); MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); @@ -165,8 +165,8 @@ public class KStreamWindowAggregateTest { stream2.groupByKey(strSerde, strSerde) .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - TimeWindows.of("topic2-Canonized", 10).advanceBy(5), - strSerde); + TimeWindows.of(10).advanceBy(5), + strSerde, "topic2-Canonized"); MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index e5864ee..26e6a0f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -65,7 +65,7 @@ public class KTableAggregateTest { final String topic1 = "topic1"; final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), stringSerde, stringSerde @@ -105,7 +105,7 @@ public class KTableAggregateTest { final String topic1 = "topic1"; final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() { @Override public KeyValue<String, String> apply(String key, String value) { @@ -157,7 +157,7 @@ public class KTableAggregateTest { final String input = "count-test-input"; final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); - builder.table(Serdes.String(), Serdes.String(), input) + builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde) .count("count") .toStream() http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index e328bae..d8dee30 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -64,7 +64,7 @@ public class KTableFilterTest { String topic1 = "topic1"; - KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1); + KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName"); KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() { @Override @@ -104,7 +104,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -183,7 +183,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -232,7 +232,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -284,7 +284,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter( new Predicate<String, String>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java index af131c2..e0cb190 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java @@ -36,7 +36,6 @@ import static org.junit.Assert.assertEquals; public class KTableForeachTest { final private String topicName = "topic"; - final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); @@ -78,7 +77,7 @@ public class KTableForeachTest { // When KStreamBuilder builder = new KStreamBuilder(); - KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName); + KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName, "anyStoreName"); table.foreach(action); // Then http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index ca3bbe1..6794bb4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -72,8 +72,10 @@ public class KTableImplTest { String topic1 = "topic1"; String topic2 = "topic2"; + String storeName1 = "storeName1"; + String storeName2 = "storeName2"; - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, storeName1); MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); @@ -98,7 +100,7 @@ public class KTableImplTest { MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>(); table3.toStream().process(proc3); - KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2); + KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2, storeName2); MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>(); table4.toStream().process(proc4); @@ -122,9 +124,11 @@ public class KTableImplTest { String topic1 = "topic1"; String topic2 = "topic2"; + String storeName1 = "storeName1"; + String storeName2 = "storeName2"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -140,7 +144,7 @@ public class KTableImplTest { } }); KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(stringSerde, stringSerde, topic2); + table1.through(stringSerde, stringSerde, topic2, storeName2); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); @@ -241,13 +245,15 @@ public class KTableImplTest { public void testStateStoreLazyEval() throws IOException { String topic1 = "topic1"; String topic2 = "topic2"; + String storeName1 = "storeName1"; + String storeName2 = "storeName2"; final KStreamBuilder builder = new KStreamBuilder(); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); KTableImpl<String, String, String> table2 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2); KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @@ -267,7 +273,7 @@ public class KTableImplTest { driver = new KStreamTestDriver(builder, stateDir, null, null); driver.setTime(0L); - // no state store should be created + // no state stores should be created assertEquals(0, driver.allStateStores().size()); } @@ -275,13 +281,15 @@ public class KTableImplTest { public void testStateStore() throws IOException { String topic1 = "topic1"; String topic2 = "topic2"; + String storeName1 = "storeName1"; + String storeName2 = "storeName2"; final KStreamBuilder builder = new KStreamBuilder(); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); KTableImpl<String, String, String> table2 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2); KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @@ -315,11 +323,12 @@ public class KTableImplTest { @Test public void testRepartition() throws IOException { String topic1 = "topic1"; + String storeName1 = "storeName1"; final KStreamBuilder builder = new KStreamBuilder(); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1 .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index 16015fe..3615b46 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -46,6 +46,8 @@ public class KTableKTableJoinTest { final private String topic1 = "topic1"; final private String topic2 = "topic2"; + final private String storeName1 = "store-name-1"; + final private String storeName2 = "store-name-2"; final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); @@ -78,8 +80,8 @@ public class KTableKTableJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - table1 = builder.table(intSerde, stringSerde, topic1); - table2 = builder.table(intSerde, stringSerde, topic2); + table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.join(table2, MockValueJoiner.STRING_JOINER); joined.toStream().process(processor); @@ -170,8 +172,8 @@ public class KTableKTableJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1); - table2 = builder.table(intSerde, stringSerde, topic2); + table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.join(table2, MockValueJoiner.STRING_JOINER); proc = new MockProcessorSupplier<>(); @@ -251,8 +253,8 @@ public class KTableKTableJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1); - table2 = builder.table(intSerde, stringSerde, topic2); + table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.join(table2, MockValueJoiner.STRING_JOINER); ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 5132ce3..ec07116 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -46,6 +46,8 @@ public class KTableKTableLeftJoinTest { final private String topic1 = "topic1"; final private String topic2 = "topic2"; + final private String storeName1 = "store-name-1"; + final private String storeName2 = "store-name-2"; final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); @@ -72,8 +74,8 @@ public class KTableKTableLeftJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1); - KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2); + KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2, storeName2); KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER); MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); @@ -166,8 +168,8 @@ public class KTableKTableLeftJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1); - table2 = builder.table(intSerde, stringSerde, topic2); + table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER); proc = new MockProcessorSupplier<>(); @@ -247,8 +249,8 @@ public class KTableKTableLeftJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1); - table2 = builder.table(intSerde, stringSerde, topic2); + table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER); ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index 3124556..33dfb04 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -46,6 +46,8 @@ public class KTableKTableOuterJoinTest { final private String topic1 = "topic1"; final private String topic2 = "topic2"; + final private String storeName1 = "store-name-1"; + final private String storeName2 = "store-name-2"; final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); @@ -78,8 +80,8 @@ public class KTableKTableOuterJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - table1 = builder.table(intSerde, stringSerde, topic1); - table2 = builder.table(intSerde, stringSerde, topic2); + table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER); joined.toStream().process(processor); @@ -179,8 +181,8 @@ public class KTableKTableOuterJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1); - table2 = builder.table(intSerde, stringSerde, topic2); + table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER); proc = new MockProcessorSupplier<>(); @@ -268,8 +270,8 @@ public class KTableKTableOuterJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1); - table2 = builder.table(intSerde, stringSerde, topic2); + table1 = builder.table(intSerde, stringSerde, topic1, storeName1); + table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER); ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java index cf74017..7666438 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; + import org.junit.After; import org.junit.Test; @@ -38,9 +39,9 @@ public class KTableMapKeysTest { final private Serde<String> stringSerde = new Serdes.StringSerde(); final private Serde<Integer> integerSerde = new Serdes.IntegerSerde(); - private KStreamTestDriver driver = null; + @After public void cleanup() { if (driver != null) { @@ -55,7 +56,7 @@ public class KTableMapKeysTest { String topic1 = "topic_map_keys"; - KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1); + KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1, "anyStoreName"); final Map<Integer, String> keyMap = new HashMap<>(); keyMap.put(1, "ONE"); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index efb17fc..5739397 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -65,7 +65,7 @@ public class KTableMapValuesTest { String topic1 = "topic1"; - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() { @Override public Integer apply(String value) { @@ -76,7 +76,7 @@ public class KTableMapValuesTest { MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); - driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder, stateDir); driver.process(topic1, "A", "01"); driver.process(topic1, "B", "02"); @@ -92,9 +92,11 @@ public class KTableMapValuesTest { String topic1 = "topic1"; String topic2 = "topic2"; + String storeName1 = "storeName1"; + String storeName2 = "storeName2"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -110,7 +112,7 @@ public class KTableMapValuesTest { } }); KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(stringSerde, stringSerde, topic2); + table1.through(stringSerde, stringSerde, topic2, storeName2); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); @@ -211,7 +213,7 @@ public class KTableMapValuesTest { String topic1 = "topic1"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -256,7 +258,7 @@ public class KTableMapValuesTest { String topic1 = "topic1"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 84bfdd6..ad3f02c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -39,6 +39,7 @@ import static org.junit.Assert.assertTrue; public class KTableSourceTest { final private Serde<String> stringSerde = Serdes.String(); + final private Serde<Integer> intSerde = Serdes.Integer(); private KStreamTestDriver driver = null; private File stateDir = null; @@ -62,9 +63,9 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); + KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName"); - MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>(); + MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); driver = new KStreamTestDriver(builder); @@ -85,7 +86,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); @@ -130,7 +131,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); @@ -165,7 +166,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); table1.enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/fbc51855/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 1e1e3f4..ba71e05 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -123,13 +123,13 @@ public class SmokeTestClient extends SmokeTestUtil { return (value < aggregate) ? value : aggregate; } }, - UnlimitedWindows.of("uwin-min"), - intSerde + UnlimitedWindows.of(), + intSerde, "uwin-min" ).toStream().map( new Unwindow<String, Integer>() ).to(stringSerde, intSerde, "min"); - KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min"); + KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName"); minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min")); // max @@ -145,13 +145,13 @@ public class SmokeTestClient extends SmokeTestUtil { return (value > aggregate) ? value : aggregate; } }, - UnlimitedWindows.of("uwin-max"), - intSerde + UnlimitedWindows.of(), + intSerde, "uwin-max" ).toStream().map( new Unwindow<String, Integer>() ).to(stringSerde, intSerde, "max"); - KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max"); + KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName"); maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max")); // sum @@ -167,23 +167,23 @@ public class SmokeTestClient extends SmokeTestUtil { return (long) value + aggregate; } }, - UnlimitedWindows.of("win-sum"), - longSerde + UnlimitedWindows.of(), + longSerde, "win-sum" ).toStream().map( new Unwindow<String, Long>() ).to(stringSerde, longSerde, "sum"); - KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum"); + KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName"); sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum")); // cnt - groupedData.count(UnlimitedWindows.of("uwin-cnt")) + groupedData.count(UnlimitedWindows.of(), "uwin-cnt") .toStream().map( new Unwindow<String, Long>() ).to(stringSerde, longSerde, "cnt"); - KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt"); + KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName"); cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt")); // dif @@ -206,7 +206,7 @@ public class SmokeTestClient extends SmokeTestUtil { ).to(stringSerde, doubleSerde, "avg"); // windowed count - groupedData.count(TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE)) + groupedData.count(TimeWindows.of(WINDOW_SIZE), "tumbling-win-cnt") .toStream().map( new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() { @Override
