This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d61a592 MINOR: refactored code duplicates in several files (Streams
project) (#4357)
d61a592 is described below
commit d61a592c8a5ee2279760c2c7c72550160032bf8a
Author: Wladimir Schmidt <[email protected]>
AuthorDate: Tue Jan 2 19:54:20 2018 +0100
MINOR: refactored code duplicates in several files (Streams project) (#4357)
* Removed code duplicate from GlobalProcessorContextImpl and
ProcessorContextImpl to parent class AbstractProcessorContext
* Exchanged concrete implementations with interfaces to make code more
maintainable
* Refactored major code duplicates in InternalTopologyBuilder
* Formatted function parameters as per code review
Added final to code introduced in this PR
* Added missing finals to putNodeGroupName function
Rearranged parameters for resetTopicsPattern function
Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang
<[email protected]>, Bill Bejeck <[email protected]>
---
.../internals/AbstractProcessorContext.java | 15 +++++
.../internals/GlobalProcessorContextImpl.java | 17 -----
.../internals/InternalTopologyBuilder.java | 72 ++++++++++++----------
.../processor/internals/ProcessorContextImpl.java | 14 -----
4 files changed, 53 insertions(+), 65 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 87408c6..e9b5a4c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -163,6 +164,20 @@ public abstract class AbstractProcessorContext implements
InternalProcessorConte
return combined;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <K, V> void forward(final K key, final V value) {
+ final ProcessorNode previousNode = currentNode();
+ try {
+ for (final ProcessorNode child : (List<ProcessorNode>)
currentNode().children()) {
+ setCurrentNode(child);
+ child.process(key, value);
+ }
+ } finally {
+ setCurrentNode(previousNode);
+ }
+ }
+
@Override
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
return config.originalsWithPrefix(prefix);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index fbb4cb6..37e7cb5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -25,8 +25,6 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache;
-import java.util.List;
-
public class GlobalProcessorContextImpl extends AbstractProcessorContext {
@@ -42,21 +40,6 @@ public class GlobalProcessorContextImpl extends
AbstractProcessorContext {
return stateManager.getGlobalStore(name);
}
- @SuppressWarnings("unchecked")
- @Override
- public <K, V> void forward(K key, V value) {
- final ProcessorNode previousNode = currentNode();
- try {
- for (ProcessorNode child : (List<ProcessorNode>)
currentNode().children()) {
- setCurrentNode(child);
- child.process(key, value);
- }
- } finally {
- setCurrentNode(previousNode);
- }
- }
-
-
/**
* @throws UnsupportedOperationException on every invocation
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index e71959b..39ea44f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -77,17 +77,17 @@ public class InternalTopologyBuilder {
private final List<Set<String>> copartitionSourceGroups = new
ArrayList<>();
// map from source processor names to subscribed topics (without
application-id prefix for internal topics)
- private final HashMap<String, List<String>> nodeToSourceTopics = new
HashMap<>();
+ private final Map<String, List<String>> nodeToSourceTopics = new
HashMap<>();
// map from source processor names to regex subscription patterns
- private final HashMap<String, Pattern> nodeToSourcePatterns = new
LinkedHashMap<>();
+ private final Map<String, Pattern> nodeToSourcePatterns = new
LinkedHashMap<>();
// map from sink processor names to subscribed topic (without
application-id prefix for internal topics)
- private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+ private final Map<String, String> nodeToSinkTopic = new HashMap<>();
// map from topics to their matched regex patterns, this is to ensure one
topic is passed through on source node
// even if it can be matched by multiple regex patterns
- private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
+ private final Map<String, Pattern> topicToPatterns = new HashMap<>();
// map from state store names to all the topics subscribed from source
processors that
// are connected to these state stores
@@ -804,43 +804,45 @@ public class InternalTopologyBuilder {
}
private Map<Integer, Set<String>> makeNodeGroups() {
- final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
- final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
+ final Map<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
+ final Map<String, Set<String>> rootToNodeGroup = new HashMap<>();
int nodeGroupId = 0;
// Go through source nodes first. This makes the group id assignment
easy to predict in tests
- final HashSet<String> allSourceNodes = new
HashSet<>(nodeToSourceTopics.keySet());
+ final Set<String> allSourceNodes = new
HashSet<>(nodeToSourceTopics.keySet());
allSourceNodes.addAll(nodeToSourcePatterns.keySet());
for (final String nodeName : Utils.sorted(allSourceNodes)) {
- final String root = nodeGrouper.root(nodeName);
- Set<String> nodeGroup = rootToNodeGroup.get(root);
- if (nodeGroup == null) {
- nodeGroup = new HashSet<>();
- rootToNodeGroup.put(root, nodeGroup);
- nodeGroups.put(nodeGroupId++, nodeGroup);
- }
- nodeGroup.add(nodeName);
+ nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups,
rootToNodeGroup);
}
// Go through non-source nodes
for (final String nodeName : Utils.sorted(nodeFactories.keySet())) {
if (!nodeToSourceTopics.containsKey(nodeName)) {
- final String root = nodeGrouper.root(nodeName);
- Set<String> nodeGroup = rootToNodeGroup.get(root);
- if (nodeGroup == null) {
- nodeGroup = new HashSet<>();
- rootToNodeGroup.put(root, nodeGroup);
- nodeGroups.put(nodeGroupId++, nodeGroup);
- }
- nodeGroup.add(nodeName);
+ nodeGroupId = putNodeGroupName(nodeName, nodeGroupId,
nodeGroups, rootToNodeGroup);
}
}
return nodeGroups;
}
+ private int putNodeGroupName(final String nodeName,
+ final int nodeGroupId,
+ final Map<Integer, Set<String>> nodeGroups,
+ final Map<String, Set<String>>
rootToNodeGroup) {
+ int newNodeGroupId = nodeGroupId;
+ final String root = nodeGrouper.root(nodeName);
+ Set<String> nodeGroup = rootToNodeGroup.get(root);
+ if (nodeGroup == null) {
+ nodeGroup = new HashSet<>();
+ rootToNodeGroup.put(root, nodeGroup);
+ nodeGroups.put(newNodeGroupId++, nodeGroup);
+ }
+ nodeGroup.add(nodeName);
+ return newNodeGroupId;
+ }
+
public synchronized ProcessorTopology build() {
return build((Integer) null);
}
@@ -1127,21 +1129,23 @@ public class InternalTopologyBuilder {
}
public synchronized Pattern earliestResetTopicsPattern() {
- final List<String> topics =
maybeDecorateInternalSourceTopics(earliestResetTopics);
- final Pattern earliestPattern =
buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
-
- ensureNoRegexOverlap(earliestPattern, latestResetPatterns,
latestResetTopics);
-
- return earliestPattern;
+ return resetTopicsPattern(earliestResetTopics, earliestResetPatterns,
latestResetTopics, latestResetPatterns);
}
public synchronized Pattern latestResetTopicsPattern() {
- final List<String> topics =
maybeDecorateInternalSourceTopics(latestResetTopics);
- final Pattern latestPattern = buildPatternForOffsetResetTopics(topics,
latestResetPatterns);
+ return resetTopicsPattern(latestResetTopics, latestResetPatterns,
earliestResetTopics, earliestResetPatterns);
+ }
+
+ private Pattern resetTopicsPattern(final Set<String> resetTopics,
+ final Set<Pattern> resetPatterns,
+ final Set<String> otherResetTopics,
+ final Set<Pattern> otherResetPatterns) {
+ final List<String> topics =
maybeDecorateInternalSourceTopics(resetTopics);
+ final Pattern pattern = buildPatternForOffsetResetTopics(topics,
resetPatterns);
- ensureNoRegexOverlap(latestPattern, earliestResetPatterns,
earliestResetTopics);
+ ensureNoRegexOverlap(pattern, otherResetPatterns, otherResetTopics);
- return latestPattern;
+ return pattern;
}
// TODO: we should check regex overlap at topology construction time and
then throw TopologyException
@@ -1338,7 +1342,7 @@ public class InternalTopologyBuilder {
final Integer subtopologyId,
final Set<String> nodeNames) {
- final HashMap<String, AbstractNode> nodesByName = new HashMap<>();
+ final Map<String, AbstractNode> nodesByName = new HashMap<>();
// add all nodes
for (final String nodeName : nodeNames) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 5010cf1..317581a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -77,20 +77,6 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
@SuppressWarnings("unchecked")
@Override
- public <K, V> void forward(final K key, final V value) {
- final ProcessorNode previousNode = currentNode();
- try {
- for (ProcessorNode child : (List<ProcessorNode>)
currentNode().children()) {
- setCurrentNode(child);
- child.process(key, value);
- }
- } finally {
- setCurrentNode(previousNode);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
public <K, V> void forward(final K key, final V value, final int
childIndex) {
final ProcessorNode previousNode = currentNode();
final ProcessorNode child = (ProcessorNode<K, V>)
currentNode().children().get(childIndex);
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].