This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dc91ce5 [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (#6227) dc91ce5 is described below commit dc91ce58af941b822fbad3b560c35774130848f0 Author: Lee Dongjin <dong...@apache.org> AuthorDate: Wed Feb 27 00:22:25 2019 +0900 [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (#6227) I found this defect while inspecting [KAFKA-7293: Merge followed by groupByKey/join might violate co-partitioning](https://issues.apache.org/jira/browse/KAFKA-7293); This flag is never used now. Instead, `KStreamImpl#repartitionRequired` is now covering its functionality. Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bbej...@gmail.com> --- .../internals/GroupedStreamAggregateBuilder.java | 3 +- .../kstream/internals/InternalStreamsBuilder.java | 2 +- .../kstream/internals/KGroupedTableImpl.java | 3 +- .../streams/kstream/internals/KStreamImpl.java | 41 +++++++++------------- .../streams/kstream/internals/KTableImpl.java | 8 ++--- .../internals/graph/BaseJoinProcessorNode.java | 3 +- .../internals/graph/BaseRepartitionNode.java | 2 +- .../internals/graph/ProcessorGraphNode.java | 14 ++------ .../kstream/internals/graph/StateStoreNode.java | 2 +- .../internals/graph/StatefulProcessorNode.java | 14 +++----- .../kstream/internals/graph/StreamSinkNode.java | 3 +- .../kstream/internals/graph/StreamSourceNode.java | 4 +-- .../internals/graph/StreamTableJoinNode.java | 3 +- .../kstream/internals/graph/StreamsGraphNode.java | 10 +----- .../internals/graph/TableProcessorNode.java | 2 +- .../kstream/internals/AbstractStreamTest.java | 3 +- .../internals/graph/GraphGraceSearchUtilTest.java | 24 +++++-------- 17 files changed, 46 insertions(+), 95 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 46546f4..cd5155f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -102,8 +102,7 @@ class GroupedStreamAggregateBuilder<K, V> { new StatefulProcessorNode<>( aggFunctionName, new ProcessorParameters<>(aggregateSupplier, aggFunctionName), - storeBuilder, - repartitionRequired + storeBuilder ); builder.addGraphNode(parentNode, statefulProcessorNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index e0983eb..c06b988 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -65,7 +65,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { private static final String TOPOLOGY_ROOT = "root"; private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class); - protected final StreamsGraphNode root = new StreamsGraphNode(TOPOLOGY_ROOT, false) { + protected final StreamsGraphNode root = new StreamsGraphNode(TOPOLOGY_ROOT) { @Override public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { // no-op for root node diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 4675f56..56be0f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -88,8 +88,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>( funcName, new ProcessorParameters<>(aggregateSupplier, funcName), - new KeyValueStoreMaterializer<>(materialized).materialize(), - false + new KeyValueStoreMaterializer<>(materialized).materialize() ); // now the repartition node must be the parent of the StateProcessorNode diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ba08b89..0eda64f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -133,7 +133,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name); - final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); + final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name, processorParameters); builder.addGraphNode(this.streamsGraphNode, filterProcessorNode); return new KStreamImpl<>(name, @@ -151,7 +151,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final String name = builder.newProcessorName(FILTER_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name); - final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); + final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name, processorParameters); builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode); @@ -185,7 +185,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name); - return new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); + return new ProcessorGraphNode<>(name, processorParameters); } @Override @@ -195,7 +195,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name); - final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name, processorParameters, true); + final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name, processorParameters); mapProcessorNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, mapProcessorNode); @@ -222,7 +222,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final String name = builder.newProcessorName(MAPVALUES_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name); - final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); + final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, processorParameters); mapValuesProcessorNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode); @@ -244,7 +244,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final String name = builder.newProcessorName(PRINTING_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name); - final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name, processorParameters, false); + final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name, processorParameters); builder.addGraphNode(this.streamsGraphNode, printNode); } @@ -255,7 +255,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final String name = builder.newProcessorName(FLATMAP_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name); - final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name, processorParameters, true); + final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name, processorParameters); flatMapNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, flatMapNode); @@ -281,7 +281,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final String name = builder.newProcessorName(FLATMAPVALUES_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name); - final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); + final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters); flatMapValuesNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode); @@ -308,14 +308,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName); - final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName, processorParameters, false); + final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName, processorParameters); builder.addGraphNode(this.streamsGraphNode, branchNode); final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length); for (int i = 0; i < predicates.length; i++) { final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]); - final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i], innerProcessorParameters, repartitionRequired); + final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i], innerProcessorParameters); builder.addGraphNode(branchNode, branchChildNode); branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valSerde, sourceNodes, repartitionRequired, branchChildNode, builder); @@ -343,9 +343,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamPassThrough<>(), name); - final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name, - processorParameters, - requireRepartitioning); + final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name, processorParameters); mergeNode.setMergeNode(true); builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode); @@ -364,9 +362,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K name ); - final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name, - processorParameters, - repartitionRequired); + final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name, processorParameters); builder.addGraphNode(this.streamsGraphNode, foreachNode); } @@ -380,9 +376,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K name ); - final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name, - processorParameters, - repartitionRequired); + final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name, processorParameters); builder.addGraphNode(this.streamsGraphNode, peekNode); @@ -460,8 +454,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>( name, new ProcessorParameters<>(new KStreamFlatTransform<>(transformerSupplier), name), - stateStoreNames, - true + stateStoreNames ); transformNode.keyChangingOperation(true); @@ -494,8 +487,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>( name, new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name), - stateStoreNames, - repartitionRequired + stateStoreNames ); transformNode.setValueChangingOperation(true); @@ -515,8 +507,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>( name, new ProcessorParameters<>(processorSupplier, name), - stateStoreNames, - repartitionRequired + stateStoreNames ); builder.addGraphNode(this.streamsGraphNode, processNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 6e65b89..68f940c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -328,8 +328,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final ProcessorGraphNode<K, V> toStreamNode = new ProcessorGraphNode<>( name, - processorParameters, - false + processorParameters ); builder.addGraphNode(this.streamsGraphNode, toStreamNode); @@ -370,8 +369,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName), - false + new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName) ); builder.addGraphNode(streamsGraphNode, node); @@ -579,7 +577,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier, selectName); // select the aggregate key and values (old and new), it would require parent to send old values - final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(selectName, processorParameters, false); + final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(selectName, processorParameters); builder.addGraphNode(this.streamsGraphNode, groupByMapNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java index fd1fcc9..ce410c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java @@ -41,8 +41,7 @@ abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends StreamsGraphNode { final String thisJoinSideNodeName, final String otherJoinSideNodeName) { - super(nodeName, - false); + super(nodeName); this.valueJoiner = valueJoiner; this.joinThisProcessorParameters = joinThisProcessorParameters; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java index e7f8e56..460f640 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java @@ -39,7 +39,7 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode { final String sinkName, final String repartitionTopic) { - super(nodeName, false); + super(nodeName); this.keySerde = keySerde; this.valueSerde = valueSerde; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java index 658f55e..2cfe3cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java @@ -29,23 +29,13 @@ public class ProcessorGraphNode<K, V> extends StreamsGraphNode { private final ProcessorParameters<K, V> processorParameters; public ProcessorGraphNode(final String nodeName, - final ProcessorParameters<K, V> processorParameters, - final boolean repartitionRequired) { + final ProcessorParameters<K, V> processorParameters) { - super(nodeName, repartitionRequired); + super(nodeName); this.processorParameters = processorParameters; } - public ProcessorGraphNode(final String nodeName, - final ProcessorParameters<K, V> processorParameters) { - this( - nodeName, - processorParameters, - false - ); - } - public ProcessorParameters processorParameters() { return processorParameters; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java index 6d3b8ba..ea42cec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java @@ -25,7 +25,7 @@ public class StateStoreNode extends StreamsGraphNode { protected final StoreBuilder storeBuilder; public StateStoreNode(final StoreBuilder storeBuilder) { - super(storeBuilder.name(), false); + super(storeBuilder.name()); this.storeBuilder = storeBuilder; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java index df2a52e..1e910ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java @@ -36,11 +36,8 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> { */ public StatefulProcessorNode(final String nodeName, final ProcessorParameters<K, V> processorParameters, - final String[] storeNames, - final boolean repartitionRequired) { - super(nodeName, - processorParameters, - repartitionRequired); + final String[] storeNames) { + super(nodeName, processorParameters); this.storeNames = storeNames; this.storeBuilder = null; @@ -53,11 +50,8 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> { */ public StatefulProcessorNode(final String nodeName, final ProcessorParameters<K, V> processorParameters, - final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder, - final boolean repartitionRequired) { - super(nodeName, - processorParameters, - repartitionRequired); + final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder) { + super(nodeName, processorParameters); this.storeNames = null; this.storeBuilder = materializedKTableStoreBuilder; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java index 95076c8..dfe7f9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java @@ -34,8 +34,7 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode { final TopicNameExtractor<K, V> topicNameExtractor, final ProducedInternal<K, V> producedInternal) { - super(nodeName, - false); + super(nodeName); this.topicNameExtractor = topicNameExtractor; this.producedInternal = producedInternal; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java index 7d50c2a..317a95f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java @@ -35,7 +35,7 @@ public class StreamSourceNode<K, V> extends StreamsGraphNode { public StreamSourceNode(final String nodeName, final Collection<String> topicNames, final ConsumedInternal<K, V> consumedInternal) { - super(nodeName, false); + super(nodeName); this.topicNames = topicNames; this.consumedInternal = consumedInternal; @@ -45,7 +45,7 @@ public class StreamSourceNode<K, V> extends StreamsGraphNode { final Pattern topicPattern, final ConsumedInternal<K, V> consumedInternal) { - super(nodeName, false); + super(nodeName); this.topicPattern = topicPattern; this.consumedInternal = consumedInternal; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java index 1389156..f12eec7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java @@ -36,8 +36,7 @@ public class StreamTableJoinNode<K, V> extends StreamsGraphNode { final ProcessorParameters<K, V> processorParameters, final String[] storeNames, final String otherJoinSideNodeName) { - super(nodeName, - false); + super(nodeName); // in the case of Stream-Table join the state stores associated with the KTable this.storeNames = storeNames; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java index 5bb3649..6ee8efd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java @@ -29,17 +29,14 @@ public abstract class StreamsGraphNode { private final Collection<StreamsGraphNode> childNodes = new LinkedHashSet<>(); private final Collection<StreamsGraphNode> parentNodes = new LinkedHashSet<>(); private final String nodeName; - private final boolean repartitionRequired; private boolean keyChangingOperation; private boolean valueChangingOperation; private boolean mergeNode; private Integer buildPriority; private boolean hasWrittenToTopology = false; - public StreamsGraphNode(final String nodeName, - final boolean repartitionRequired) { + public StreamsGraphNode(final String nodeName) { this.nodeName = nodeName; - this.repartitionRequired = repartitionRequired; } public Collection<StreamsGraphNode> parentNodes() { @@ -88,10 +85,6 @@ public abstract class StreamsGraphNode { return nodeName; } - public boolean repartitionRequired() { - return repartitionRequired; - } - public boolean isKeyChangingOperation() { return keyChangingOperation; } @@ -140,7 +133,6 @@ public abstract class StreamsGraphNode { return "StreamsGraphNode{" + "nodeName='" + nodeName + '\'' + ", buildPriority=" + buildPriority + - ", repartitionRequired=" + repartitionRequired + ", hasWrittenToTopology=" + hasWrittenToTopology + ", keyChangingOperation=" + keyChangingOperation + ", valueChangingOperation=" + valueChangingOperation + diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java index 0409c62..f335843 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java @@ -37,7 +37,7 @@ public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraph final MaterializedInternal<K, V, S> materializedInternal, final String[] storeNames) { - super(nodeName, false); + super(nodeName); this.processorParameters = processorParameters; this.materializedInternal = materializedInternal; this.storeNames = storeNames != null ? storeNames : new String[]{}; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java index 425f837..89242eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java @@ -107,8 +107,7 @@ public class AbstractStreamTest { final String name = builder.newProcessorName("RANDOM-FILTER-"); final ProcessorGraphNode<K, V> processorNode = new ProcessorGraphNode<>( name, - new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name), - false); + new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name)); builder.addGraphNode(this.streamsGraphNode, processorNode); return new KStreamImpl<>(name, null, null, sourceNodes, false, processorNode, builder); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 1612cb9..8c61c73 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -63,8 +63,7 @@ public class GraphGraceSearchUtilTest { }, "dummy" ), - (StoreBuilder<? extends StateStore>) null, - false + (StoreBuilder<? extends StateStore>) null ); final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); @@ -92,8 +91,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - (StoreBuilder<? extends StateStore>) null, - false + (StoreBuilder<? extends StateStore>) null ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -116,8 +114,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - (StoreBuilder<? extends StateStore>) null, - false + (StoreBuilder<? extends StateStore>) null ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -132,8 +129,7 @@ public class GraphGraceSearchUtilTest { new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>( windows, "asdf", null, null, null ), "asdf"), - (StoreBuilder<? extends StateStore>) null, - false + (StoreBuilder<? extends StateStore>) null ); final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>( @@ -151,8 +147,7 @@ public class GraphGraceSearchUtilTest { }, "dummy" ), - (StoreBuilder<? extends StateStore>) null, - false + (StoreBuilder<? extends StateStore>) null ); graceGrandparent.addChild(statefulParent); @@ -178,8 +173,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - (StoreBuilder<? extends StateStore>) null, - false + (StoreBuilder<? extends StateStore>) null ); final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless", null); @@ -206,8 +200,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - (StoreBuilder<? extends StateStore>) null, - false + (StoreBuilder<? extends StateStore>) null ); final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>( @@ -221,8 +214,7 @@ public class GraphGraceSearchUtilTest { ), "asdf" ), - (StoreBuilder<? extends StateStore>) null, - false + (StoreBuilder<? extends StateStore>) null ); final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);