This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1a324d7 KAFKA-6729: Reuse source topics for source KTable's
materialized store's changelog (#5017)
1a324d7 is described below
commit 1a324d784cfc53288730b7c1b5c1bde0685e4686
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu May 17 11:28:45 2018 -0700
KAFKA-6729: Reuse source topics for source KTable's materialized store's
changelog (#5017)
1. In InternalTopologyBuilder#topicGroups, which is used in
StreamsPartitionAssignor, look for book-kept storeToChangelogTopic map before
creating a new internal changelog topics. In this way if the source KTable is
created, its source topic stored in storeToChangelogTopic will be used.
2. Added unit test (confirmed that without 1) it will fail).
3. MINOR: removed TODOs that are related to removed KStreamBuilder.
4. MINOR: removed TODOs in StreamsBuilderTest util functions and replaced
with TopologyWrapper.
5. MINOR: removed StreamsBuilderTest#testFrom as it is already covered by
TopologyTest#shouldNotAllowToAddSourcesWithSameName, plus it requires
KStreamImpl.SOURCE_NAME which should be a package private field of the
KStreamImpl.
Reviewers: John Roesler <[email protected]>, Bill Bejeck
<[email protected]>, Matthias
J. Sax <[email protected]>
---
.../streams/kstream/internals/KStreamImpl.java | 3 +-
.../streams/kstream/internals/KTableImpl.java | 5 ++--
.../internals/InternalTopologyBuilder.java | 15 ++++++----
.../apache/kafka/streams/StreamsBuilderTest.java | 34 ++++++++--------------
.../internals/KStreamGlobalKTableJoinTest.java | 4 +--
.../internals/KStreamGlobalKTableLeftJoinTest.java | 4 +--
.../streams/kstream/internals/KStreamImplTest.java | 5 ++--
.../kstream/internals/KStreamKStreamJoinTest.java | 12 ++++----
.../internals/KStreamKStreamLeftJoinTest.java | 6 ++--
.../kstream/internals/KStreamKTableJoinTest.java | 4 +--
.../internals/KStreamKTableLeftJoinTest.java | 4 +--
.../internals/KTableKTableInnerJoinTest.java | 4 +--
.../internals/KTableKTableLeftJoinTest.java | 4 +--
.../internals/KTableKTableOuterJoinTest.java | 4 +--
.../internals/StreamsMetadataStateTest.java | 12 ++++----
.../internals/StreamsPartitionAssignorTest.java | 10 +++----
.../org/apache/kafka/test/KStreamTestDriver.java | 4 +--
17 files changed, 62 insertions(+), 72 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b8195a0..5331a95 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -53,8 +53,7 @@ import java.util.Set;
public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K,
V> {
- // TODO: change to package-private after removing KStreamBuilder
- public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
+ static final String SOURCE_NAME = "KSTREAM-SOURCE-";
static final String SINK_NAME = "KSTREAM-SINK-";
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 1c5ad4d..c1f0f7a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -45,10 +45,9 @@ import java.util.Set;
*/
public class KTableImpl<K, S, V> extends AbstractStream<K> implements
KTable<K, V> {
- // TODO: these two fields can be package-private after KStreamBuilder is
removed
- public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+ static final String SOURCE_NAME = "KTABLE-SOURCE-";
- public static final String STATE_STORE_NAME = "STATE-STORE-";
+ static final String STATE_STORE_NAME = "STATE-STORE-";
private static final String FILTER_NAME = "KTABLE-FILTER-";
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 70437e9..575ac01 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -96,8 +96,7 @@ public class InternalTopologyBuilder {
// are connected to these state stores
private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new
HashMap<>();
- // map from state store names to this state store's corresponding
changelog topic if possible,
- // this is used in the extended KStreamBuilder.
+ // map from state store names to this state store's corresponding
changelog topic if possible
private final Map<String, String> storeToChangelogTopic = new HashMap<>();
// all global topics
@@ -1013,12 +1012,16 @@ public class InternalTopologyBuilder {
}
}
- // if the node is connected to a state, add to the state topics
+ // if the node is connected to a state store whose changelog
topics are not predefined, add to the changelog topics
for (final StateStoreFactory stateFactory :
stateFactories.values()) {
if (stateFactory.loggingEnabled() &&
stateFactory.users().contains(node)) {
- final String name =
ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
- final InternalTopicConfig internalTopicConfig =
createChangelogTopicConfig(stateFactory, name);
- stateChangelogTopics.put(name, internalTopicConfig);
+ final String topicName =
storeToChangelogTopic.containsKey(stateFactory.name()) ?
+ storeToChangelogTopic.get(stateFactory.name())
:
+
ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
+ if (!stateChangelogTopics.containsKey(topicName)) {
+ final InternalTopicConfig internalTopicConfig =
createChangelogTopicConfig(stateFactory, topicName);
+ stateChangelogTopics.put(topicName,
internalTopicConfig);
+ }
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 7c2bfa6..0a1e6df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -39,13 +38,11 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -58,13 +55,6 @@ public class StreamsBuilderTest {
private final StreamsBuilder builder = new StreamsBuilder();
private final Properties props =
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
- @Test(expected = TopologyException.class)
- public void testFrom() {
- builder.stream(Arrays.asList("topic-1", "topic-2"));
-
- builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000",
"topic-3");
- }
-
@Test
public void shouldAllowJoinUnmaterializedFilteredKTable() {
final KTable<Bytes, String> filteredKTable = builder.<Bytes,
String>table("table-topic").filter(MockPredicate.<Bytes,
String>allGoodPredicate());
@@ -192,7 +182,7 @@ public class StreamsBuilderTest {
}
@Test
- public void testMerge() {
+ public void shouldMergeStreams() {
final String topic1 = "topic-1";
final String topic2 = "topic-2";
@@ -281,6 +271,16 @@ public class StreamsBuilderTest {
assertFalse(stores.hasNext());
assertFalse(subtopologies.hasNext());
}
+
+ @Test
+ public void shouldReuseSourceTopicAsChangelogs() {
+ final String topic = "topic";
+ builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("store"));
+
+ final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
+
+
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("topic")));
+ }
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() {
@@ -291,14 +291,4 @@ public class StreamsBuilderTest {
public void shouldThrowExceptionWhenTopicNamesAreNull() {
builder.stream(Arrays.<String>asList(null, null));
}
-
- // TODO: these two static functions are added because some
non-TopologyBuilder unit tests need to access the internal topology builder,
- // which is usually a bad sign of design patterns between
TopologyBuilder and StreamThread. We need to consider getting rid of them later
- public static InternalTopologyBuilder internalTopologyBuilder(final
StreamsBuilder builder) {
- return builder.internalTopologyBuilder;
- }
-
- public static Collection<Set<String>> getCopartitionedGroups(final
StreamsBuilder builder) {
- return builder.internalTopologyBuilder.copartitionGroups();
- }
-}
\ No newline at end of file
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index c37e8a9..7a65c4a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -112,7 +112,7 @@ public class KStreamGlobalKTableJoinTest {
@Test
public void shouldNotRequireCopartitioning() {
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals("KStream-GlobalKTable joins do not need to be
co-partitioned", 0, copartitionGroups.size());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index eb0775a..d6196c5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -114,7 +114,7 @@ public class KStreamGlobalKTableLeftJoinTest {
@Test
public void shouldNotRequireCopartitioning() {
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals("KStream-GlobalKTable joins do not need to be
co-partitioned", 0, copartitionGroups.size());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 463afb8..ebf3f36 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -174,7 +173,7 @@ public class KStreamImplTest {
1 + // to
2 + // through
1, // process
-
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size());
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
}
@Test
@@ -186,7 +185,7 @@ public class KStreamImplTest {
stream1.to("topic-5");
stream2.through("topic-6");
- ProcessorTopology processorTopology =
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null);
+ ProcessorTopology processorTopology =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null);
assertThat(processorTopology.source("topic-6").getTimestampExtractor(),
instanceOf(FailOnInvalidTimestamp.class));
assertEquals(processorTopology.source("topic-4").getTimestampExtractor(), null);
assertEquals(processorTopology.source("topic-3").getTimestampExtractor(), null);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index de3446c..59f0953 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
@@ -105,7 +105,7 @@ public class KStreamKStreamJoinTest {
Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -207,7 +207,7 @@ public class KStreamKStreamJoinTest {
JoinWindows.of(100),
Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -312,7 +312,7 @@ public class KStreamKStreamJoinTest {
Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -535,7 +535,7 @@ public class KStreamKStreamJoinTest {
Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -644,7 +644,7 @@ public class KStreamKStreamJoinTest {
Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 11c5c5b..8535a04 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
@@ -69,7 +69,7 @@ public class KStreamKStreamLeftJoinTest {
Joined.with(Serdes.Integer(),
Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
@@ -155,7 +155,7 @@ public class KStreamKStreamLeftJoinTest {
Joined.with(Serdes.Integer(),
Serdes.String(), Serdes.String()));
joined.process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 0ce27ab..55635fa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -104,7 +104,7 @@ public class KStreamKTableJoinTest {
@Test
public void shouldRequireCopartitionedStreams() {
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)),
copartitionGroups.iterator().next());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index eedda07..98fc500 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -100,7 +100,7 @@ public class KStreamKTableLeftJoinTest {
@Test
public void shouldRequireCopartitionedStreams() {
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)),
copartitionGroups.iterator().next());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 7ed8b6a..2efdd85 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -75,7 +75,7 @@ public class KTableKTableInnerJoinTest {
final int[] expectedKeys,
final MockProcessorSupplier<Integer, String>
supplier,
final KTable<Integer, String> joined) {
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 51fd839..79e5f0e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
@@ -87,7 +87,7 @@ public class KTableKTableLeftJoinTest {
final MockProcessorSupplier<Integer, String> supplier = new
MockProcessorSupplier<>();
joined.toStream().process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index cf3321f..8cee72f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
@@ -83,7 +83,7 @@ public class KTableKTableOuterJoinTest {
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
joined.toStream().process(supplier);
- final Collection<Set<String>> copartitionGroups =
StreamsBuilderTest.getCopartitionedGroups(builder);
+ final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e9bb2a3..39f848f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
@@ -96,7 +96,7 @@ public class StreamsMetadataStateTest {
Consumed.with(null, null),
Materialized.<Object, Object, KeyValueStore<Bytes,
byte[]>>as(globalTable));
-
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("appId");
topic1P0 = new TopicPartition("topic-one", 0);
topic1P1 = new TopicPartition("topic-one", 1);
@@ -122,7 +122,7 @@ public class StreamsMetadataStateTest {
new PartitionInfo("topic-four", 0, null, null, null));
cluster = new Cluster(null, Collections.<Node>emptyList(),
partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
- discovery = new
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
hostOne);
+ discovery = new
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
hostOne);
discovery.onChange(hostToPartitions, cluster);
partitioner = new StreamPartitioner<String, Object>() {
@Override
@@ -134,7 +134,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldNotThrowNPEWhenOnChangeNotCalled() {
- new
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
hostOne).getAllMetadataForStore("store");
+ new
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
hostOne).getAllMetadataForStore("store");
}
@Test
@@ -301,7 +301,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() {
- final StreamsMetadataState streamsMetadataState = new
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
StreamsMetadataState.UNKNOWN_HOST);
+ final StreamsMetadataState streamsMetadataState = new
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
StreamsMetadataState.UNKNOWN_HOST);
streamsMetadataState.onChange(hostToPartitions, cluster);
assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable,
"key", Serdes.String().serializer()));
}
@@ -314,7 +314,7 @@ public class StreamsMetadataStateTest {
@Test
public void
shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() {
- final StreamsMetadataState streamsMetadataState = new
StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
StreamsMetadataState.UNKNOWN_HOST);
+ final StreamsMetadataState streamsMetadataState = new
StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
StreamsMetadataState.UNKNOWN_HOST);
streamsMetadataState.onChange(hostToPartitions, cluster);
assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable,
"key", partitioner));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9812158..cc507d6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -727,7 +727,7 @@ public class StreamsPartitionAssignorTest {
public void shouldGenerateTasksForAllCreatedPartitions() {
final StreamsBuilder builder = new StreamsBuilder();
- final InternalTopologyBuilder internalTopologyBuilder =
StreamsBuilderTest.internalTopologyBuilder(builder);
+ final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
internalTopologyBuilder.setApplicationId(applicationId);
// KStream with 3 partitions
@@ -796,7 +796,7 @@ public class StreamsPartitionAssignorTest {
expectedCreatedInternalTopics.put(applicationId +
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
expectedCreatedInternalTopics.put(applicationId +
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
expectedCreatedInternalTopics.put(applicationId +
"-KSTREAM-MAP-0000000001-repartition", 4);
- expectedCreatedInternalTopics.put(applicationId +
"-topic3-STATE-STORE-0000000002-changelog", 4);
+ expectedCreatedInternalTopics.put("topic3", 4); // the source
topic is reused as changelog topics
// check if all internal topics were created as expected
assertThat(mockInternalTopicManager.readyTopics,
equalTo(expectedCreatedInternalTopics));
@@ -906,7 +906,7 @@ public class StreamsPartitionAssignorTest {
public void
shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
final StreamsBuilder builder = new StreamsBuilder();
- final InternalTopologyBuilder internalTopologyBuilder =
StreamsBuilderTest.internalTopologyBuilder(builder);
+ final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
internalTopologyBuilder.setApplicationId(applicationId);
KStream<Object, Object> stream1 = builder
@@ -1026,7 +1026,7 @@ public class StreamsPartitionAssignorTest {
public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
final StreamsBuilder builder = new StreamsBuilder();
- final InternalTopologyBuilder internalTopologyBuilder =
StreamsBuilderTest.internalTopologyBuilder(builder);
+ final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
internalTopologyBuilder.setApplicationId(applicationId);
builder.stream("topic1").groupByKey().count();
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 033b68d..2c3461a 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -82,7 +82,7 @@ public class KStreamTestDriver extends ExternalResource {
final Serde<?> keySerde,
final Serde<?> valSerde,
final long cacheSize) {
- final InternalTopologyBuilder internalTopologyBuilder =
StreamsBuilderTest.internalTopologyBuilder(builder);
+ final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
internalTopologyBuilder.setApplicationId("TestDriver");
topology = internalTopologyBuilder.build(null);
--
To stop receiving notification emails like this one, please contact
[email protected].