vvcephei commented on a change in pull request #10744: URL: https://github.com/apache/kafka/pull/10744#discussion_r637232842
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java ########## @@ -17,23 +17,23 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; -class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { - private final KTableImpl<K, ?, V> parent; - private final Predicate<? super K, ? super V> predicate; +class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VIn> { Review comment: This is the only processor we migrate here. The point is to use this processor to make sure that the groundwork in the rest of these changes is sufficient. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ########## @@ -124,7 +124,10 @@ private static final String TOPIC_SUFFIX = "-topic"; private static final String SINK_NAME = "KTABLE-SINK-"; - private final ProcessorSupplier<?, ?> processorSupplier; + // Temporarily setting the processorSupplier to type Object so that we can transition from the + // old ProcessorSupplier to the new api.ProcessorSupplier. This works because all accesses to + // this field are guarded by typechecks anyway. + private final Object processorSupplier; Review comment: Calling this out as well. Hopefully the comment itself is clear. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java ########## @@ -154,20 +158,22 @@ private V computeOldValue(final K key, final Change<V> change) { } - private class KTableFilterValueGetter implements KTableValueGetter<K, V> { - private final KTableValueGetter<K, V> parentGetter; + private class KTableFilterValueGetter implements KTableValueGetter<KIn, VIn> { + private final KTableValueGetter<KIn, VIn> parentGetter; - KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) { + KTableFilterValueGetter(final KTableValueGetter<KIn, VIn> parentGetter) { this.parentGetter = parentGetter; } @Override - public void init(final ProcessorContext context) { + public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + // This is the old processor context for compatibility with the other KTable processors. + // Once we migrte them all, we can swap this out. Review comment: This particular interface was too much trouble to migrate now, and it's not terribly significant, since the value getter never forwards. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java ########## @@ -30,20 +32,40 @@ * @param <V> the type of the value */ class TimestampedTupleForwarder<K, V> { - private final ProcessorContext context; + private final InternalProcessorContext<K, Change<V>> context; private final boolean sendOldValues; private final boolean cachingEnabled; - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) TimestampedTupleForwarder(final StateStore store, - final ProcessorContext context, + final ProcessorContext<K, Change<V>> context, final TimestampedCacheFlushListener<K, V> flushListener, final boolean sendOldValues) { - this.context = context; + this.context = (InternalProcessorContext<K, Change<V>>) context; this.sendOldValues = sendOldValues; cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); } + @SuppressWarnings({"unchecked", "rawtypes"}) + TimestampedTupleForwarder(final StateStore store, + final org.apache.kafka.streams.processor.ProcessorContext context, + final TimestampedCacheFlushListener<K, V> flushListener, + final boolean sendOldValues) { + this.context = (InternalProcessorContext) context; + this.sendOldValues = sendOldValues; + cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); + } Review comment: This is a bit weird, but I added a second constructor for the old ProcessorContext, even though the _actual_ processor context is always an Internal one, which implements both interfaces. I did it this way so that we can avoid any changes at all in the bulk of the Processor implementations. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java ########## @@ -34,7 +34,7 @@ import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; -public class GlobalProcessorContextImpl extends AbstractProcessorContext { +public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> { Review comment: These parameters are largely unimportant for now. ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java ########## @@ -42,7 +43,7 @@ public void shouldForwardValueTimestampIfNewValueExists() { expectLastCall(); replay(context); - new TimestampedCacheFlushListener<>(context).apply( + new TimestampedCacheFlushListener<>((ProcessorContext<String, Change<String>>) context).apply( Review comment: Lines like this are because we have to cast to differentiate between the two constructors. Since the context is an IPC, it actually implements both interfaces, and it doesn't matter which one we cast to. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java ########## @@ -19,30 +19,45 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> { - private final InternalProcessorContext context; +class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Windowed<KOut>, VOut> { + private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> context; Review comment: Sadly, I had to add generics to the InternalProcessorContext, so there are a _lot_ of changes in this PR that are purely related to resolving rawtypes and unchecked warnings. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ########## @@ -816,6 +832,8 @@ public String queryableStoreName() { return new KTableSourceValueGetterSupplier<>(source.queryableName()); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view(); + } else if (processorSupplier instanceof KTableNewProcessorSupplier) { + return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view(); Review comment: We have to add a new typecheck for the new supplier type. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.api.ProcessorSupplier; + +public interface KTableNewProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> { Review comment: This is just a temporary class so that we don't have to migrate all the processors in a "big bang". Once the old processorSupplier is unused, we can rename this class back. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ########## @@ -832,6 +850,12 @@ public boolean enableSendingOldValues(final boolean forceMaterialization) { source.enableSendingOldValues(); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues(); + } else if (processorSupplier instanceof KTableNewProcessorSupplier) { + final KTableNewProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier = + (KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier; + if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) { + return false; + } Review comment: We have to add a new typecheck for the new supplier type. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java ########## @@ -19,30 +19,45 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> { - private final InternalProcessorContext context; +class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Windowed<KOut>, VOut> { + private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> context; + + @SuppressWarnings("rawtypes") private final ProcessorNode myNode; + @SuppressWarnings("unchecked") SessionCacheFlushListener(final ProcessorContext context) { - this.context = (InternalProcessorContext) context; + this.context = (InternalProcessorContext<Windowed<KOut>, Change<VOut>>) context; myNode = this.context.currentNode(); } @Override - public void apply(final Windowed<K> key, - final V newValue, - final V oldValue, + public void apply(final Windowed<KOut> key, + final VOut newValue, + final VOut oldValue, final long timestamp) { - final ProcessorNode prev = context.currentNode(); + @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); Review comment: There is also a sprinkling of suppressions like this. I did the best I could, but it's also not possible/easy/important to resolve them all right now. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ########## @@ -69,7 +69,7 @@ public GlobalStateUpdateTask(final LogContext logContext, final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic(); for (final String storeName : storeNames) { final String sourceTopic = storeNameToTopic.get(storeName); - final SourceNode<?, ?, ?, ?> source = topology.source(sourceTopic); + final SourceNode<?, ?> source = topology.source(sourceTopic); Review comment: I forget why now, but it was necessary to revise the generic params on source and sink nodes. Source nodes are always just pass-thoughs, and sink nodes can't forward, so their output types are Void. In both cases, it means they only need two generic parameters now, so I had to change like 100 declarations like this. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java ########## @@ -19,30 +19,45 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> { - private final InternalProcessorContext context; +class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Windowed<KOut>, VOut> { + private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> context; + + @SuppressWarnings("rawtypes") private final ProcessorNode myNode; + @SuppressWarnings("unchecked") SessionCacheFlushListener(final ProcessorContext context) { - this.context = (InternalProcessorContext) context; + this.context = (InternalProcessorContext<Windowed<KOut>, Change<VOut>>) context; myNode = this.context.currentNode(); } @Override - public void apply(final Windowed<K> key, - final V newValue, - final V oldValue, + public void apply(final Windowed<KOut> key, + final VOut newValue, + final VOut oldValue, final long timestamp) { - final ProcessorNode prev = context.currentNode(); + @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end())); } finally { context.setCurrentNode(prev); } } + + @Override + public void apply(final Record<Windowed<KOut>, Change<VOut>> record) { + @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); + context.setCurrentNode(myNode); + try { + context.forward(record.withTimestamp(record.key().window().end())); + } finally { + context.setCurrentNode(prev); + } + } Review comment: For compatibility these listeners and the forwarders now have apply methods that work for both the new and old APIs. They _should_ be functionally equivalent. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java ########## @@ -26,9 +26,9 @@ import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeyDeserializer; import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer; -public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { +public class SourceNode<KIn, VIn> extends ProcessorNode<KIn, VIn, KIn, VIn> { Review comment: Here, we declare that the source node can only forward the same type it receives. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ########## @@ -173,12 +175,28 @@ public boolean setFlushListener(final CacheFlushListener<K, V> listener, final KeyValueStore<Bytes, byte[]> wrapped = wrapped(); if (wrapped instanceof CachedStateStore) { return ((CachedStateStore<byte[], byte[]>) wrapped).setFlushListener( - (rawKey, rawNewValue, rawOldValue, timestamp) -> listener.apply( - serdes.keyFrom(rawKey), - rawNewValue != null ? serdes.valueFrom(rawNewValue) : null, - rawOldValue != null ? serdes.valueFrom(rawOldValue) : null, - timestamp - ), + new CacheFlushListener<byte[], byte[]>() { Review comment: Since the interface has two methods, it can't be a lambda anymore. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/To.java ########## @@ -89,4 +89,11 @@ public int hashCode() { throw new UnsupportedOperationException("To is unsafe for use in Hash collections"); } + @Override + public String toString() { + return "To{" + + "childName='" + childName + '\'' + + ", timestamp=" + timestamp + + '}'; + } Review comment: This was kind of funny. When the test framework gets a failing test, it calls toString to print out the explanation. Since toString wasn't overridden here, it used the default Object.toString, which calls hashCode. But To's hashCode implementation throws an exception so that we don't accidentally use it in a hash collection. Needless to say, it took a minute to figure out what was happening, so I left the toString implemented. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java ########## @@ -110,8 +110,8 @@ private void testMetrics(final String builtInMetricsVersion) { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion, new MockTime()); - final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics); - final ProcessorNode<Object, Object, ?, ?> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet()); + final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics); + final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet()); Review comment: For complicated java-type-system reasons, I had to switch from wildcards to Object in some places. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ########## @@ -24,14 +24,14 @@ import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer; import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer; -public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { +public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> { Review comment: Here's where we declare the sink node cannot forward and hence only needs input parameters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org