[ https://issues.apache.org/jira/browse/KAFKA-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714158#comment-16714158 ]
ASF GitHub Bot commented on KAFKA-6036: --------------------------------------- guozhangwang closed pull request #6017: KAFKA-6036: Follow-up; cleanup sendOldValues logic ForwardingCacheFlushListener URL: https://github.com/apache/kafka/pull/6017 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java index f30ab79df9c..4065ceda61b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java @@ -22,13 +22,11 @@ class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> { private final InternalProcessorContext context; - private final boolean sendOldValues; private final ProcessorNode myNode; - ForwardingCacheFlushListener(final ProcessorContext context, final boolean sendOldValues) { + ForwardingCacheFlushListener(final ProcessorContext context) { this.context = (InternalProcessorContext) context; myNode = this.context.currentNode(); - this.sendOldValues = sendOldValues; } @Override @@ -36,11 +34,7 @@ public void apply(final K key, final V newValue, final V oldValue) { final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - if (sendOldValues) { - context.forward(key, new Change<>(newValue, oldValue)); - } else { - context.forward(key, new Change<>(newValue, null)); - } + context.forward(key, new Change<>(newValue, oldValue)); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 1b3a8f44e31..648c50b7e50 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -54,8 +54,8 @@ public void enableSendingOldValues() { private class KStreamAggregateProcessor extends AbstractProcessor<K, V> { private KeyValueStore<K, T> store; - private TupleForwarder<K, T> tupleForwarder; private StreamsMetricsImpl metrics; + private TupleForwarder<K, T> tupleForwarder; @SuppressWarnings("unchecked") @Override @@ -63,7 +63,7 @@ public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); store = (KeyValueStore<K, T>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } @@ -92,7 +92,7 @@ public void process(final K key, final V value) { // update the store with the new value store.put(key, newAgg); - tupleForwarder.maybeForward(key, newAgg, oldAgg); + tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index 9f404ea55f6..09e4fab48e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -61,7 +61,7 @@ public void init(final ProcessorContext context) { metrics = (StreamsMetricsImpl) context.metrics(); store = (KeyValueStore<K, V>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } @@ -89,7 +89,7 @@ public void process(final K key, final V value) { // update the store with the new value store.put(key, newAgg); - tupleForwarder.maybeForward(key, newAgg, oldAgg); + tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index b89399bf7b7..13f4a6eb409 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -92,7 +92,7 @@ public void init(final ProcessorContext context) { lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); store = (SessionStore<K, Agg>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } @Override @@ -135,7 +135,7 @@ public void process(final K key, final V value) { if (!mergedWindow.equals(newSessionWindow)) { for (final KeyValue<Windowed<K>, Agg> session : merged) { store.remove(session.key); - tupleForwarder.maybeForward(session.key, null, session.value); + tupleForwarder.maybeForward(session.key, null, sendOldValues ? session.value : null); } } @@ -151,10 +151,8 @@ public void process(final K key, final V value) { lateRecordDropSensor.record(); } } - } - private SessionWindow mergeSessionWindow(final SessionWindow one, final SessionWindow two) { final long start = one.start() < two.start() ? one.start() : two.start(); final long end = one.end() > two.end() ? one.end() : two.end(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index f29251573e2..0edbe4e4567 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -87,7 +87,7 @@ public void init(final ProcessorContext context) { lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context), sendOldValues); } @Override @@ -122,7 +122,7 @@ public void process(final K key, final V value) { // update the store with the new value windowStore.put(key, newAgg, windowStart); - tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg); + tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null); } else { log.debug( "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]", diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index b60f9ab1f73..b04a729a447 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -62,7 +62,7 @@ public void enableSendingOldValues() { public void init(final ProcessorContext context) { super.init(context); store = (KeyValueStore<K, T>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } /** @@ -95,7 +95,7 @@ public void process(final K key, final Change<V> value) { // update the store with the new value store.put(key, newAgg); - tupleForwarder.maybeForward(key, newAgg, oldAgg); + tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index 75fba9926d9..d1e524c88af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -70,7 +70,7 @@ public void init(final ProcessorContext context) { super.init(context); if (queryableName != null) { store = (KeyValueStore<K, V>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java index 2ed70bd46a2..78c1dc6f480 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java @@ -89,7 +89,7 @@ public void init(final ProcessorContext context) { if (queryableName != null) { store = (KeyValueStore<K, V>) context.getStateStore(queryableName); tupleForwarder = new TupleForwarder<>(store, context, - new ForwardingCacheFlushListener<K, V>(context, sendOldValues), + new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } } @@ -98,9 +98,13 @@ public void init(final ProcessorContext context) { public void process(final K key, final Change<V> value) { if (queryableName != null) { store.put(key, value.newValue); - tupleForwarder.maybeForward(key, value.newValue, value.oldValue); + tupleForwarder.maybeForward(key, value.newValue, sendOldValues ? value.oldValue : null); } else { - context().forward(key, value); + if (sendOldValues) { + context().forward(key, value); + } else { + context().forward(key, new Change<>(value.newValue, null)); + } } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index c2c84d5d362..2317947f5f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -90,7 +90,7 @@ public void init(final ProcessorContext context) { super.init(context); if (queryableName != null) { store = (KeyValueStore<K, V1>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context), sendOldValues); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java index 069b3604631..38c5a11f34d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -57,7 +57,7 @@ public void enableSendingOldValues() { public void init(final ProcessorContext context) { super.init(context); store = (KeyValueStore<K, V>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } /** @@ -89,7 +89,7 @@ public void process(final K key, final Change<V> value) { // update the store with the new value store.put(key, newAgg); - tupleForwarder.maybeForward(key, newAgg, oldAgg); + tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index 274d96ea40c..6fc57bcf582 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -77,7 +77,7 @@ public void init(final ProcessorContext context) { metrics = (StreamsMetricsImpl) context.metrics(); if (queryableName != null) { store = (KeyValueStore<K, V>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java index b3e84d79119..88cea4fc7c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java @@ -91,7 +91,7 @@ public void init(final ProcessorContext context) { valueTransformer.init(new ForwardingDisabledProcessorContext(context)); if (queryableName != null) { - final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context, sendOldValues); + final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context); store = (KeyValueStore<K, V1>) context.getStateStore(queryableName); tupleForwarder = new TupleForwarder<>(store, context, flushListener, sendOldValues); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java index ff3ef44894b..aec0d16ccef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java @@ -32,7 +32,6 @@ class TupleForwarder<K, V> { private final CachedStateStore cachedStateStore; private final ProcessorContext context; - private final boolean sendOldValues; @SuppressWarnings("unchecked") TupleForwarder(final StateStore store, @@ -41,7 +40,6 @@ final boolean sendOldValues) { this.cachedStateStore = cachedStateStore(store); this.context = context; - this.sendOldValues = sendOldValues; if (this.cachedStateStore != null) { cachedStateStore.setFlushListener(flushListener, sendOldValues); } @@ -61,11 +59,7 @@ public void maybeForward(final K key, final V newValue, final V oldValue) { if (cachedStateStore == null) { - if (sendOldValues) { - context.forward(key, new Change<>(newValue, oldValue)); - } else { - context.forward(key, new Change<>(newValue, null)); - } + context.forward(key, new Change<>(newValue, oldValue)); } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable logical materialization to physical materialization > ---------------------------------------------------------- > > Key: KAFKA-6036 > URL: https://issues.apache.org/jira/browse/KAFKA-6036 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > > Today whenever users specify a queryable store name for KTable, we would > always add a physical state store in the translated processor topology. > For some scenarios, we should consider not physically materialize the KTable > but only "logically" materialize it when you have some simple transformation > operations or even join operations that generated new KTables, and which > needs to be materialized with a state store, you can use the changelog topic > of the previous KTable and applies the transformation logic upon restoration > instead of creating a new changelog topic. For example: > {code} > table1 = builder.table("topic1"); > table2 = table1.filter(..).join(table3); // table2 needs to be materialized > for joining > {code} > We can actually set the {{getter}} function of table2's materialized store, > say {{state2}} to be reading from {{topic1}} and then apply the filter > operator, instead of creating a new {{state2-changelog}} topic in this case. > We can come up with a general internal impl optimizations to determine when > to logically materialize, and whether we should actually allow users of DSL > to "hint" whether to materialize or not (it then may need a KIP). -- This message was sent by Atlassian JIRA (v7.6.3#76005)