Repository: kafka Updated Branches: refs/heads/trunk 52d7b6763 -> f2b74aa1c
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index cbf2b56..0bdd3a3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -33,7 +34,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.test.IntegrationTest; @@ -101,9 +104,13 @@ public class GlobalKTableIntegrationTest { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore); - stream = builder.stream(inputStream, Consumed.with(Serdes.String(), Serdes.Long())); - table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table"); + globalTable = builder.globalTable(globalOne, Consumed.with(Serdes.Long(), Serdes.String()), + Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore) + .withKeySerde(Serdes.Long()) + .withValueSerde(Serdes.String())); + final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); + stream = builder.stream(inputStream, stringLongConsumed); + table = builder.table(inputTable, stringLongConsumed); foreachAction = new ForeachAction<String, String>() { @Override public void apply(final String key, final String value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index 3a771c4..faa581b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -128,8 +128,8 @@ public class JoinIntegrationTest { CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC); builder = new StreamsBuilder(); - leftTable = builder.table(INPUT_TOPIC_1, "leftTable"); - rightTable = builder.table(INPUT_TOPIC_2, "rightTable"); + leftTable = builder.table(INPUT_TOPIC_1); + rightTable = builder.table(INPUT_TOPIC_2); leftStream = leftTable.toStream(); rightStream = rightTable.toStream(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index a433667..8d4299b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -211,7 +211,8 @@ public class KStreamKTableJoinIntegrationTest { // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe" // (which overrides her previous region value of "asia"). final KTable<String, String> userRegionsTable = - builder.table(stringSerde, stringSerde, userRegionsTopic, userRegionsStoreName); + builder.table(userRegionsTopic, + Consumed.with(Serdes.String(), Serdes.String())); // Compute the number of clicks per region, e.g. "europe" -> 13L. http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index 1b45711..a12ffac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -343,9 +343,9 @@ public class KTableKTableJoinIntegrationTest { private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) { final StreamsBuilder builder = new StreamsBuilder(); - final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1); - final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2); - final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3); + final KTable<String, String> table1 = builder.table(TABLE_1); + final KTable<String, String> table2 = builder.table(TABLE_2); + final KTable<String, String> table3 = builder.table(TABLE_3); Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = null; if (queryableName != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 69c42fe..31b7222 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -106,7 +107,7 @@ public class RestoreIntegrationTest { createStateForRestoration(); - builder.table(Serdes.Integer(), Serdes.Integer(), inputStream, "store") + builder.table(inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer())) .toStream() .foreach(new ForeachAction<Integer, Integer>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java index 0ee74b8..0f8109d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java @@ -54,8 +54,9 @@ public class GlobalKTableJoinsTest { @Before public void setUp() { stateDir = TestUtils.tempDirectory(); - global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store"); - stream = builder.stream(streamTopic, Consumed.with(Serdes.String(), Serdes.String())); + final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); + global = builder.globalTable(globalTopic, consumed); + stream = builder.stream(streamTopic, consumed); keyValueMapper = new KeyValueMapper<String, String, String>() { @Override public String apply(final String key, final String value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index 1a2dc13..494e197 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -16,17 +16,20 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockTimestampExtractor; @@ -58,6 +61,8 @@ public class InternalStreamsBuilderTest { private KStreamTestDriver driver = null; private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(); + private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized + = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), false); @Before public void setUp() { @@ -132,27 +137,30 @@ public class InternalStreamsBuilderTest { } @Test - public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception { - KTable table1 = builder.table("topic1", consumed, "table1"); - KTable table2 = builder.table("topic2", consumed, null); + public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() throws Exception { + KTable table1 = builder.table("topic2", + consumed, + new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("topic2"), + false)); final ProcessorTopology topology = builder.internalTopologyBuilder.build(null); - assertEquals(2, topology.stateStores().size()); - assertEquals("table1", topology.stateStores().get(0).name()); + assertEquals(1, topology.stateStores().size()); + assertEquals("topic2", topology.stateStores().get(0).name()); - final String internalStoreName = topology.stateStores().get(1).name(); - assertTrue(internalStoreName.contains(KTableImpl.STATE_STORE_NAME)); - assertEquals(2, topology.storeToChangelogTopic().size()); - assertEquals("topic1", topology.storeToChangelogTopic().get("table1")); - assertEquals("topic2", topology.storeToChangelogTopic().get(internalStoreName)); - assertEquals(table1.queryableStoreName(), "table1"); - assertNull(table2.queryableStoreName()); + assertEquals(1, topology.storeToChangelogTopic().size()); + assertEquals("topic2", topology.storeToChangelogTopic().get("topic2")); + assertNull(table1.queryableStoreName()); } @Test public void shouldBuildSimpleGlobalTableTopology() throws Exception { - builder.globalTable("table", consumed, "globalTable"); + builder.globalTable("table", + consumed, + new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"), + false)); final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology(); final List<StateStore> stateStores = topology.globalStateStores(); @@ -173,16 +181,14 @@ public class InternalStreamsBuilderTest { @Test public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception { - builder.globalTable("table", consumed, "globalTable"); - builder.globalTable("table2", consumed, "globalTable2"); - - doBuildGlobalTopologyWithAllGlobalTables(); - } - - @Test - public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception { - builder.globalTable("table", consumed, null); - builder.globalTable("table2", consumed, null); + builder.globalTable("table", + consumed, + new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1"))); + builder.globalTable("table2", + consumed, + new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2"))); doBuildGlobalTopologyWithAllGlobalTables(); } @@ -191,10 +197,19 @@ public class InternalStreamsBuilderTest { public void shouldAddGlobalTablesToEachGroup() throws Exception { final String one = "globalTable"; final String two = "globalTable2"; - final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, one); - final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, two); - builder.table("not-global", consumed, "not-global"); + final GlobalKTable<String, String> globalTable = builder.globalTable("table", + consumed, + new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one))); + final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", + consumed, + new MaterializedInternal<>( + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two))); + + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized + = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), false); + builder.table("not-global", consumed, materialized); final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() { @Override @@ -227,7 +242,9 @@ public class InternalStreamsBuilderTest { public void shouldMapStateStoresToCorrectSourceTopics() throws Exception { final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed); - final KTable<String, String> table = builder.table("table-topic", consumed, "table-store"); + final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized + = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), false); + final KTable<String, String> table = builder.table("table-topic", consumed, materialized); assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store")); final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper()); @@ -260,8 +277,7 @@ public class InternalStreamsBuilderTest { @Test public void shouldAddTableToEarliestAutoOffsetResetList() { final String topicName = "topic-1"; - final String storeName = "test-store"; - builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), storeName); + builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.EARLIEST)), materialized); assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches()); assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches()); @@ -270,9 +286,7 @@ public class InternalStreamsBuilderTest { @Test public void shouldAddTableToLatestAutoOffsetResetList() { final String topicName = "topic-1"; - final String storeName = "test-store"; - - builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), storeName); + builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.LATEST)), materialized); assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches()); assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches()); @@ -281,9 +295,8 @@ public class InternalStreamsBuilderTest { @Test public void shouldNotAddTableToOffsetResetLists() { final String topicName = "topic-1"; - final String storeName = "test-store"; - builder.table(topicName, consumed, storeName); + builder.table(topicName, consumed, materialized); assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches()); assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches()); @@ -340,15 +353,15 @@ public class InternalStreamsBuilderTest { @Test public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception { - builder.table("topic", consumed, "store"); + builder.table("topic", consumed, materialized); final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null); assertNull(processorTopology.source("topic").getTimestampExtractor()); } @Test public void ktableShouldUseProvidedTimestampExtractor() throws Exception { - final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor())); - builder.table("topic", consumed, "store"); + final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor())); + builder.table("topic", consumed, materialized); final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null); assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java index ff9726e..705cf62 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.ForeachAction; @@ -60,7 +61,7 @@ public class KGroupedTableImplTest { @Before public void before() { - groupedTable = builder.table(Serdes.String(), Serdes.String(), "blah", "blah") + groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String())) .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper()); } @@ -157,7 +158,11 @@ public class KGroupedTableImplTest { } }; - final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store") + final KTable<String, Integer> reduced = builder.table(topic, + Consumed.with(Serdes.String(), Serdes.Double()), + Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Double())) .groupBy(intProjection) .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced"); @@ -175,7 +180,11 @@ public class KGroupedTableImplTest { } }; - final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store") + final KTable<String, Integer> reduced = builder.table(topic, + Consumed.with(Serdes.String(), Serdes.Double()), + Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Double())) .groupBy(intProjection) .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR); @@ -194,7 +203,7 @@ public class KGroupedTableImplTest { } }; - final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store") + final KTable<String, Integer> reduced = builder.table(topic, Consumed.with(Serdes.String(), Serdes.Double())) .groupBy(intProjection) .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, @@ -211,10 +220,10 @@ public class KGroupedTableImplTest { @SuppressWarnings("unchecked") @Test public void shouldCountAndMaterializeResults() { - final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store"); + final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String())); table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), - Serialized.with(Serdes.String(), - Serdes.String())) + Serialized.with(Serdes.String(), + Serdes.String())) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())); @@ -228,7 +237,7 @@ public class KGroupedTableImplTest { @SuppressWarnings("unchecked") @Test public void shouldAggregateAndMaterializeResults() { - final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store"); + final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String())); table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), Serialized.with(Serdes.String(), Serdes.String())) http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 5e8687f..be1d865 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -61,6 +61,7 @@ public class KStreamImplTest { final private Serde<String> stringSerde = Serdes.String(); final private Serde<Integer> intSerde = Serdes.Integer(); + private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String()); private KStream<String, String> testStream; private StreamsBuilder builder; private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); @@ -361,7 +362,7 @@ public class KStreamImplTest { @Test(expected = NullPointerException.class) public void shouldNotAllowNullValueMapperOnTableJoin() { - testStream.leftJoin(builder.table(Serdes.String(), Serdes.String(), "topic", "store"), null); + testStream.leftJoin(builder.table("topic", stringConsumed), null); } @Test(expected = NullPointerException.class) @@ -383,14 +384,14 @@ public class KStreamImplTest { @Test(expected = NullPointerException.class) public void shouldNotAllowNullMapperOnJoinWithGlobalTable() { - testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"), + testStream.join(builder.globalTable("global", stringConsumed), null, MockValueJoiner.TOSTRING_JOINER); } @Test(expected = NullPointerException.class) public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() { - testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"), + testStream.join(builder.globalTable("global", stringConsumed), MockKeyValueMapper.<String, String>SelectValueMapper(), null); } @@ -404,14 +405,14 @@ public class KStreamImplTest { @Test(expected = NullPointerException.class) public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() { - testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"), + testStream.leftJoin(builder.globalTable("global", stringConsumed), null, MockValueJoiner.TOSTRING_JOINER); } @Test(expected = NullPointerException.class) public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() { - testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"), + testStream.leftJoin(builder.globalTable("global", stringConsumed), MockKeyValueMapper.<String, String>SelectValueMapper(), null); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 745ab4e..39b318f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -87,7 +87,7 @@ public class KStreamKStreamLeftJoinTest { assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - driver.setUp(builder, stateDir); + driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String()); driver.setTime(0L); // push two items to the primary stream. the other window is empty http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java index d206b02..d1226c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -67,8 +67,9 @@ public class KStreamKTableJoinTest { final MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde)); - table = builder.table(intSerde, stringSerde, topic2, "anyStoreName"); + final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde); + stream = builder.stream(topic1, consumed); + table = builder.table(topic2, consumed); stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor); final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -76,7 +77,7 @@ public class KStreamKTableJoinTest { assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - driver.setUp(builder, stateDir); + driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String()); driver.setTime(0L); // push two items to the primary stream. the other table is empty http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index a79184e..ed835a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -68,8 +68,9 @@ public class KStreamKTableLeftJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde)); - table = builder.table(intSerde, stringSerde, topic2, "anyStoreName"); + Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde); + stream = builder.stream(topic1, consumed); + table = builder.table(topic2, consumed); stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor); Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index accbb9c..0ae95dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -50,6 +51,7 @@ import static org.junit.Assert.assertEquals; public class KTableAggregateTest { final private Serde<String> stringSerde = Serdes.String(); + private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde, stringSerde); private File stateDir = null; @@ -70,7 +72,7 @@ public class KTableAggregateTest { final String topic1 = "topic1"; final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTable<String, String> table1 = builder.table(topic1, consumed); KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), stringSerialzied ).aggregate(MockInitializer.STRING_INIT, @@ -81,7 +83,7 @@ public class KTableAggregateTest { table2.toStream().process(proc); - driver.setUp(builder, stateDir); + driver.setUp(builder, stateDir, Serdes.String(), Serdes.String()); driver.process(topic1, "A", "1"); driver.flushState(); @@ -118,7 +120,7 @@ public class KTableAggregateTest { final String topic1 = "topic1"; final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTable<String, String> table1 = builder.table(topic1, consumed); KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), stringSerialzied ).aggregate(MockInitializer.STRING_INIT, @@ -146,7 +148,7 @@ public class KTableAggregateTest { final String topic1 = "topic1"; final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTable<String, String> table1 = builder.table(topic1, consumed); KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() { @Override public KeyValue<String, String> apply(String key, String value) { @@ -232,7 +234,7 @@ public class KTableAggregateTest { final String input = "count-test-input"; final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); - builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") + builder.table(input, consumed) .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) .count("count") .toStream() @@ -247,7 +249,7 @@ public class KTableAggregateTest { final String input = "count-test-input"; final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); - builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") + builder.table(input, consumed) .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) .count() .toStream() @@ -262,7 +264,7 @@ public class KTableAggregateTest { final String input = "count-test-input"; final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); - builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") + builder.table(input, consumed) .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) .count("count") .toStream() @@ -291,7 +293,7 @@ public class KTableAggregateTest { final String input = "count-test-input"; final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); - builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") + builder.table(input, consumed) .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() { @Override @@ -348,8 +350,8 @@ public class KTableAggregateTest { final String reduceTopic = "TestDriver-reducer-store-repartition"; final Map<String, Long> reduceResults = new HashMap<>(); - final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne); - final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo); + final KTable<String, String> one = builder.table(tableOne, consumed); + final KTable<Long, String> two = builder.table(tableTwo, Consumed.with(Serdes.Long(), Serdes.String())); final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index a885edd..7986277 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -42,6 +43,7 @@ public class KTableFilterTest { final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); + private final Consumed<String, Integer> consumed = Consumed.with(stringSerde, intSerde); @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); private File stateDir = null; @@ -79,7 +81,7 @@ public class KTableFilterTest { final String topic1 = "topic1"; - KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + KTable<String, Integer> table1 = builder.table(topic1, consumed); KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() { @Override @@ -104,7 +106,7 @@ public class KTableFilterTest { final String topic1 = "topic1"; - KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + KTable<String, Integer> table1 = builder.table(topic1, consumed); KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() { @Override @@ -128,7 +130,7 @@ public class KTableFilterTest { final String topic1 = "topic1"; - KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + KTable<String, Integer> table1 = builder.table(topic1, consumed); KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() { @Override @@ -213,7 +215,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -239,7 +241,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -304,7 +306,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -323,7 +325,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -382,7 +384,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -401,7 +403,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -440,8 +442,9 @@ public class KTableFilterTest { String topic1 = "topic1"; + final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter( new Predicate<String, String>() { @Override @@ -461,8 +464,9 @@ public class KTableFilterTest { String topic1 = "topic1"; + final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter( new Predicate<String, String>() { @Override @@ -485,7 +489,7 @@ public class KTableFilterTest { }; new StreamsBuilder() - .<Integer, String>table("empty", "emptyStore") + .<Integer, String>table("empty") .filter(numberKeyPredicate) .filterNot(numberKeyPredicate) .to("nirvana"); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java index 693aac8..23e0b59 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java @@ -18,10 +18,14 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -79,7 +83,11 @@ public class KTableForeachTest { // When StreamsBuilder builder = new StreamsBuilder(); - KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName, "anyStoreName"); + KTable<Integer, String> table = builder.table(topicName, + Consumed.with(intSerde, stringSerde), + new MaterializedInternal<>(Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName) + .withKeySerde(intSerde) + .withValueSerde(stringSerde))); table.foreach(action); // Then @@ -105,7 +113,7 @@ public class KTableForeachTest { }; new StreamsBuilder() - .<Integer, String>table("emptyTopic", "emptyStore") + .<Integer, String>table("emptyTopic") .foreach(consume); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 6ca38b8..9d918e2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.KTable; @@ -54,6 +55,7 @@ import static org.junit.Assert.assertTrue; public class KTableImplTest { final private Serde<String> stringSerde = Serdes.String(); + private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); private File stateDir = null; @@ -64,7 +66,7 @@ public class KTableImplTest { public void setUp() { stateDir = TestUtils.tempDirectory("kafka-test"); builder = new StreamsBuilder(); - table = builder.table("test", "test"); + table = builder.table("test"); } @Test @@ -73,10 +75,9 @@ public class KTableImplTest { String topic1 = "topic1"; String topic2 = "topic2"; - String storeName1 = "storeName1"; String storeName2 = "storeName2"; - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, storeName1); + KTable<String, String> table1 = builder.table(topic1, consumed); MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); @@ -130,11 +131,10 @@ public class KTableImplTest { String topic1 = "topic1"; String topic2 = "topic2"; - String storeName1 = "storeName1"; String storeName2 = "storeName2"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -256,14 +256,12 @@ public class KTableImplTest { public void testStateStoreLazyEval() { String topic1 = "topic1"; String topic2 = "topic2"; - String storeName1 = "storeName1"; - String storeName2 = "storeName2"; final StreamsBuilder builder = new StreamsBuilder(); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); - builder.table(stringSerde, stringSerde, topic2, storeName2); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); + builder.table(topic2, consumed); KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @@ -291,15 +289,13 @@ public class KTableImplTest { public void testStateStore() { String topic1 = "topic1"; String topic2 = "topic2"; - String storeName1 = "storeName1"; - String storeName2 = "storeName2"; final StreamsBuilder builder = new StreamsBuilder(); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, String> table2 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2); + (KTableImpl<String, String, String>) builder.table(topic2, consumed); KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @@ -338,7 +334,12 @@ public class KTableImplTest { final StreamsBuilder builder = new StreamsBuilder(); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); + (KTableImpl<String, String, String>) builder.table(topic1, + consumed, + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1) + .withKeySerde(stringSerde) + .withValueSerde(stringSerde) + ); table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1"); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index 124114b..aeb2418 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; @@ -50,6 +51,7 @@ public class KTableKTableJoinTest { final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); + private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde); private File stateDir = null; @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); @@ -165,8 +167,8 @@ public class KTableKTableJoinTest { final MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER); joined.toStream().process(processor); @@ -186,8 +188,8 @@ public class KTableKTableJoinTest { final MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName"); joined.toStream().process(processor); @@ -285,8 +287,8 @@ public class KTableKTableJoinTest { final KTable<Integer, String> joined; final MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER); proc = new MockProcessorSupplier<>(); builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name); @@ -306,8 +308,8 @@ public class KTableKTableJoinTest { final KTable<Integer, String> joined; final MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName"); proc = new MockProcessorSupplier<>(); builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name); @@ -327,8 +329,8 @@ public class KTableKTableJoinTest { final KTable<Integer, String> joined; final MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER); proc = new MockProcessorSupplier<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 9cdc782..ca0c81c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; @@ -60,6 +61,7 @@ public class KTableKTableLeftJoinTest { private File stateDir = null; @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); + private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde); @Before public void setUp() throws IOException { @@ -72,8 +74,8 @@ public class KTableKTableLeftJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + KTable<Integer, String> table1 = builder.table(topic1, consumed); + KTable<Integer, String> table2 = builder.table(topic2, consumed); KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER); MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); @@ -171,8 +173,8 @@ public class KTableKTableLeftJoinTest { final KTable<Integer, String> joined; final MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER); proc = new MockProcessorSupplier<>(); @@ -252,8 +254,8 @@ public class KTableKTableLeftJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER); ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); @@ -341,7 +343,8 @@ public class KTableKTableLeftJoinTest { final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive, tableSix}; final StreamsBuilder builder = new StreamsBuilder(); - final KTable<Long, String> aggTable = builder.table(Serdes.Long(), Serdes.String(), agg, agg) + final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), Serdes.String()); + final KTable<Long, String> aggTable = builder.table(agg, consumed) .groupBy(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { @Override public KeyValue<Long, String> apply(final Long key, final String value) { @@ -349,12 +352,12 @@ public class KTableKTableLeftJoinTest { } }, Serialized.with(Serdes.Long(), Serdes.String())).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store"); - final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(), tableOne, tableOne); - final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo); - final KTable<Long, String> three = builder.table(Serdes.Long(), Serdes.String(), tableThree, tableThree); - final KTable<Long, String> four = builder.table(Serdes.Long(), Serdes.String(), tableFour, tableFour); - final KTable<Long, String> five = builder.table(Serdes.Long(), Serdes.String(), tableFive, tableFive); - final KTable<Long, String> six = builder.table(Serdes.Long(), Serdes.String(), tableSix, tableSix); + final KTable<Long, String> one = builder.table(tableOne, consumed); + final KTable<Long, String> two = builder.table(tableTwo, consumed); + final KTable<Long, String> three = builder.table(tableThree, consumed); + final KTable<Long, String> four = builder.table(tableFour, consumed); + final KTable<Long, String> five = builder.table(tableFive, consumed); + final KTable<Long, String> six = builder.table(tableSix, consumed); final ValueMapper<String, String> mapper = new ValueMapper<String, String>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index 368a3ea..d6ab613 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; @@ -54,6 +55,7 @@ public class KTableKTableOuterJoinTest { private File stateDir = null; @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); + private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde); @Before public void setUp() throws IOException { @@ -72,8 +74,8 @@ public class KTableKTableOuterJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER); joined.toStream().process(processor); @@ -176,8 +178,8 @@ public class KTableKTableOuterJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER); proc = new MockProcessorSupplier<>(); @@ -264,8 +266,8 @@ public class KTableKTableOuterJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(intSerde, stringSerde, topic1, storeName1); - table2 = builder.table(intSerde, stringSerde, topic2, storeName2); + table1 = builder.table(topic1, consumed); + table2 = builder.table(topic2, consumed); joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER); ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java index 756404f..81797cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -57,7 +58,7 @@ public class KTableMapKeysTest { String topic1 = "topic_map_keys"; - KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1, "anyStoreName"); + KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(integerSerde, stringSerde)); final Map<Integer, String> keyMap = new HashMap<>(); keyMap.put(1, "ONE"); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index 4bfaea6..5d92846 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -43,6 +44,7 @@ import static org.junit.Assert.assertTrue; public class KTableMapValuesTest { final private Serde<String> stringSerde = Serdes.String(); + private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); private File stateDir = null; @@ -69,7 +71,7 @@ public class KTableMapValuesTest { String topic1 = "topic1"; - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTable<String, String> table1 = builder.table(topic1, consumed); KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() { @Override public Integer apply(CharSequence value) { @@ -89,7 +91,7 @@ public class KTableMapValuesTest { String topic1 = "topic1"; - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTable<String, String> table1 = builder.table(topic1, consumed); KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() { @Override public Integer apply(CharSequence value) { @@ -210,11 +212,10 @@ public class KTableMapValuesTest { String topic1 = "topic1"; String topic2 = "topic2"; - String storeName1 = "storeName1"; String storeName2 = "storeName2"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -241,11 +242,10 @@ public class KTableMapValuesTest { String topic1 = "topic1"; String topic2 = "topic2"; - String storeName1 = "storeName1"; String storeName2 = "storeName2"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -273,7 +273,7 @@ public class KTableMapValuesTest { String topic1 = "topic1"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -321,7 +321,7 @@ public class KTableMapValuesTest { String topic1 = "topic1"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 3f8a6b2..35a3dbd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.KStreamTestDriver; @@ -37,6 +38,7 @@ import static org.junit.Assert.assertTrue; public class KTableSourceTest { final private Serde<String> stringSerde = Serdes.String(); + private final Consumed<String, String> stringConsumed = Consumed.with(stringSerde, stringSerde); final private Serde<Integer> intSerde = Serdes.Integer(); @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); @@ -53,7 +55,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde)); MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); @@ -77,7 +79,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); @@ -121,7 +123,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed); MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); @@ -159,7 +161,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed); table1.enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 3cbceeb..1e4afcd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.TestUtils; @@ -686,7 +687,7 @@ public class SimpleBenchmark { final StreamsBuilder builder = new StreamsBuilder(); final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic); - final KTable<Long, byte[]> input2 = builder.table(kTableTopic, kTableTopic + "-store"); + final KTable<Long, byte[]> input2 = builder.table(kTableTopic); input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch)); @@ -697,8 +698,8 @@ public class SimpleBenchmark { String kTableTopic2, final CountDownLatch latch) { final StreamsBuilder builder = new StreamsBuilder(); - final KTable<Long, byte[]> input1 = builder.table(kTableTopic1, kTableTopic1 + "-store"); - final KTable<Long, byte[]> input2 = builder.table(kTableTopic2, kTableTopic2 + "-store"); + final KTable<Long, byte[]> input1 = builder.table(kTableTopic1); + final KTable<Long, byte[]> input2 = builder.table(kTableTopic2); input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch)); @@ -725,10 +726,12 @@ public class SimpleBenchmark { StreamsBuilder builder = new StreamsBuilder(); + final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder + = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.Integer(), Serdes.ByteArray()); if (enableCaching) { - builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().enableCaching().build()); + builder.addStateStore(storeBuilder.withCachingEnabled()); } else { - builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build()); + builder.addStateStore(storeBuilder); } KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE)); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java index 0a10f9f..d98fd7f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java @@ -286,8 +286,7 @@ public class YahooBenchmark { final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic, Consumed.with(Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer))); - final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(), - campaignsTopic, "campaign-state"); + final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String())); KStream<String, ProjectedEvent> filteredEvents = kEvents http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index b22c488..f3dbb32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -22,14 +22,18 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StreamsMetadata; import org.junit.Before; import org.junit.Test; @@ -88,7 +92,9 @@ public class StreamsMetadataStateTest { } }); - builder.globalTable("global-topic", "global-table"); + builder.globalTable("global-topic", + Consumed.with(null, null), + Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as(globalTable)); StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId"); http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index c4e108d..4b75702 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -105,9 +105,9 @@ public class SmokeTestClient extends SmokeTestUtil { props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); props.put(ProducerConfig.ACKS_CONFIG, "all"); - StreamsBuilder builder = new StreamsBuilder(); - KStream<String, Integer> source = builder.stream("data", Consumed.with(stringSerde, intSerde)); + Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde); + KStream<String, Integer> source = builder.stream("data", stringIntConsumed); source.to(stringSerde, intSerde, "echo"); KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() { @Override @@ -141,7 +141,7 @@ public class SmokeTestClient extends SmokeTestUtil { new Unwindow<String, Integer>() ).to(stringSerde, intSerde, "min"); - KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName"); + KTable<String, Integer> minTable = builder.table("min", stringIntConsumed); minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min")); // max @@ -163,7 +163,7 @@ public class SmokeTestClient extends SmokeTestUtil { new Unwindow<String, Integer>() ).to(stringSerde, intSerde, "max"); - KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName"); + KTable<String, Integer> maxTable = builder.table("max", stringIntConsumed); maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max")); // sum @@ -186,7 +186,8 @@ public class SmokeTestClient extends SmokeTestUtil { ).to(stringSerde, longSerde, "sum"); - KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName"); + Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde); + KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed); sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum")); // cnt @@ -195,7 +196,7 @@ public class SmokeTestClient extends SmokeTestUtil { new Unwindow<String, Long>() ).to(stringSerde, longSerde, "cnt"); - KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName"); + KTable<String, Long> cntTable = builder.table("cnt", stringLongConsumed); cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt")); // dif
