This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9211ff6 MINOR: Increase unit test coverage of method
ProcessorTopology#updateSourceTopics() (#9654)
9211ff6 is described below
commit 9211ff6ffd06485a4c3bc41185be22bac0d961d8
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Dec 1 19:58:57 2020 +0100
MINOR: Increase unit test coverage of method
ProcessorTopology#updateSourceTopics() (#9654)
The unit tests for method ProcessorTopology#updateSourceTopics() did not
cover all
code paths.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../processor/internals/ProcessorTopology.java | 1 -
.../processor/internals/ProcessorTopologyTest.java | 93 +++++++++++++++++++---
2 files changed, 81 insertions(+), 13 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index c4821c2..0a0118a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -172,7 +172,6 @@ public class ProcessorTopology {
}
sourceNodesByTopic.put(topic, sourceNode);
}
-
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 8010dcf..d76ebaa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -53,6 +53,7 @@ import org.junit.Test;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
@@ -64,11 +65,15 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class ProcessorTopologyTest {
@@ -151,42 +156,106 @@ public class ProcessorTopologyTest {
@Test
public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
- topology.addSource("source-1", "topic-1");
+ final String sourceNode = "source-1";
+ final String topic = "topic-1";
+ final String newTopic = "topic-2";
+ topology.addSource(sourceNode, topic);
final ProcessorTopology processorTopology =
topology.getInternalBuilder("X").buildTopology();
+ assertThat(processorTopology.source(newTopic), is(nullValue()));
- assertNull(processorTopology.source("topic-2"));
-
processorTopology.updateSourceTopics(Collections.singletonMap("source-1",
asList("topic-1", "topic-2")));
+
processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode,
asList(topic, newTopic)));
- assertThat(processorTopology.source("topic-2").name(),
equalTo("source-1"));
+ assertThat(processorTopology.source(newTopic).name(),
equalTo(sourceNode));
}
@Test
public void shouldUpdateSourceTopicsWithRemovedTopic() {
- topology.addSource("source-1", "topic-1", "topic-2");
+ final String sourceNode = "source-1";
+ final String topic = "topic-1";
+ final String topicToRemove = "topic-2";
+ topology.addSource(sourceNode, topic, topicToRemove);
final ProcessorTopology processorTopology =
topology.getInternalBuilder("X").buildTopology();
+ assertThat(processorTopology.source(topicToRemove).name(),
equalTo(sourceNode));
+
+
processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode,
Collections.singletonList(topic)));
- assertThat(processorTopology.source("topic-2").name(),
equalTo("source-1"));
+ assertThat(processorTopology.source(topicToRemove), is(nullValue()));
+ }
+
+ @Test
+ public void shouldUpdateSourceTopicsWithAllTopicsRemoved() {
+ final String sourceNode = "source-1";
+ final String topic = "topic-1";
+ topology.addSource(sourceNode, topic);
+ final ProcessorTopology processorTopology =
topology.getInternalBuilder("X").buildTopology();
+ assertThat(processorTopology.source(topic).name(),
equalTo(sourceNode));
-
processorTopology.updateSourceTopics(Collections.singletonMap("source-1",
Collections.singletonList("topic-1")));
+
processorTopology.updateSourceTopics(Collections.singletonMap(sourceNode,
Collections.emptyList()));
- assertNull(processorTopology.source("topic-2"));
+ assertThat(processorTopology.source(topic), is(nullValue()));
}
@Test
public void
shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
- topology.addSource("source-1", "topic-1");
+ final String sourceNodeWithinSubtopology = "source-1";
+ final String sourceNodeOutsideSubtopology = "source-2";
+ final String topicWithinSubtopology = "topic-1";
+ final String topicOutsideSubtopology = "topic-2";
+ topology.addSource(sourceNodeWithinSubtopology,
topicWithinSubtopology);
final ProcessorTopology processorTopology =
topology.getInternalBuilder("X").buildTopology();
processorTopology.updateSourceTopics(mkMap(
- mkEntry("source-1", Collections.singletonList("topic-1")),
- mkEntry("source-2", Collections.singletonList("topic-2")))
+ mkEntry(sourceNodeWithinSubtopology,
Collections.singletonList(topicWithinSubtopology)),
+ mkEntry(sourceNodeOutsideSubtopology,
Collections.singletonList(topicOutsideSubtopology))
+ )
);
- assertNull(processorTopology.source("topic-2"));
+ assertThat(processorTopology.source(topicOutsideSubtopology),
is(nullValue()));
assertThat(processorTopology.sources().size(), equalTo(1));
}
@Test
+ public void shouldThrowIfSourceNodeToUpdateDoesNotExist() {
+ final String existingSourceNode = "source-1";
+ final String nonExistingSourceNode = "source-2";
+ final String topicOfExistingSourceNode = "topic-1";
+ final String topicOfNonExistingSourceNode = "topic-2";
+ topology.addSource(nonExistingSourceNode,
topicOfNonExistingSourceNode);
+ final ProcessorTopology processorTopology =
topology.getInternalBuilder("X").buildTopology();
+
+ final Throwable exception = assertThrows(
+ IllegalStateException.class,
+ () ->
processorTopology.updateSourceTopics(Collections.singletonMap(
+ existingSourceNode,
Collections.singletonList(topicOfExistingSourceNode)
+ ))
+ );
+ assertThat(exception.getMessage(), is("Node " + nonExistingSourceNode
+ " not found in full topology"));
+ }
+
+ @Test
+ public void
shouldThrowIfMultipleSourceNodeOfSameSubtopologySubscribedToSameTopic() {
+ final String sourceNode = "source-1";
+ final String updatedSourceNode = "source-2";
+ final String doublySubscribedTopic = "topic-1";
+ final String topic = "topic-2";
+ topology.addSource(sourceNode, doublySubscribedTopic);
+ topology.addSource(updatedSourceNode, topic);
+ final ProcessorTopology processorTopology =
topology.getInternalBuilder("X").buildTopology();
+
+ final Throwable exception = assertThrows(
+ IllegalStateException.class,
+ () -> processorTopology.updateSourceTopics(mkMap(
+ mkEntry(sourceNode,
Collections.singletonList(doublySubscribedTopic)),
+ mkEntry(updatedSourceNode, Arrays.asList(topic,
doublySubscribedTopic))
+ ))
+ );
+ assertThat(
+ exception.getMessage(),
+ startsWith("Topic " + doublySubscribedTopic + " was already
registered to source node")
+ );
+ }
+
+ @Test
public void testDrivingSimpleTopology() {
final int partition = 10;
driver = new TopologyTestDriver(createSimpleTopology(partition),
props);