vvcephei commented on a change in pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#discussion_r637232842



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
##########
@@ -17,23 +17,23 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
-class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
-    private final KTableImpl<K, ?, V> parent;
-    private final Predicate<? super K, ? super V> predicate;
+class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, 
KIn, VIn> {

Review comment:
       This is the only processor we migrate here. The point is to use this 
processor to make sure that the groundwork in the rest of these changes is 
sufficient.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -124,7 +124,10 @@
     private static final String TOPIC_SUFFIX = "-topic";
     private static final String SINK_NAME = "KTABLE-SINK-";
 
-    private final ProcessorSupplier<?, ?> processorSupplier;
+    // Temporarily setting the processorSupplier to type Object so that we can 
transition from the
+    // old ProcessorSupplier to the new api.ProcessorSupplier. This works 
because all accesses to
+    // this field are guarded by typechecks anyway.
+    private final Object processorSupplier;

Review comment:
       Calling this out as well. Hopefully the comment itself is clear.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
##########
@@ -154,20 +158,22 @@ private V computeOldValue(final K key, final Change<V> 
change) {
     }
 
 
-    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
-        private final KTableValueGetter<K, V> parentGetter;
+    private class KTableFilterValueGetter implements KTableValueGetter<KIn, 
VIn> {
+        private final KTableValueGetter<KIn, VIn> parentGetter;
 
-        KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) {
+        KTableFilterValueGetter(final KTableValueGetter<KIn, VIn> 
parentGetter) {
             this.parentGetter = parentGetter;
         }
 
         @Override
-        public void init(final ProcessorContext context) {
+        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+            // This is the old processor context for compatibility with the 
other KTable processors.
+            // Once we migrte them all, we can swap this out.

Review comment:
       This particular interface was too much trouble to migrate now, and it's 
not terribly significant, since the value getter never forwards.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
##########
@@ -30,20 +32,40 @@
  * @param <V> the type of the value
  */
 class TimestampedTupleForwarder<K, V> {
-    private final ProcessorContext context;
+    private final InternalProcessorContext<K, Change<V>> context;
     private final boolean sendOldValues;
     private final boolean cachingEnabled;
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     TimestampedTupleForwarder(final StateStore store,
-                              final ProcessorContext context,
+                              final ProcessorContext<K, Change<V>> context,
                               final TimestampedCacheFlushListener<K, V> 
flushListener,
                               final boolean sendOldValues) {
-        this.context = context;
+        this.context = (InternalProcessorContext<K, Change<V>>) context;
         this.sendOldValues = sendOldValues;
         cachingEnabled = ((WrappedStateStore) 
store).setFlushListener(flushListener, sendOldValues);
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    TimestampedTupleForwarder(final StateStore store,
+                              final 
org.apache.kafka.streams.processor.ProcessorContext context,
+                              final TimestampedCacheFlushListener<K, V> 
flushListener,
+                              final boolean sendOldValues) {
+        this.context = (InternalProcessorContext) context;
+        this.sendOldValues = sendOldValues;
+        cachingEnabled = ((WrappedStateStore) 
store).setFlushListener(flushListener, sendOldValues);
+    }

Review comment:
       This is a bit weird, but I added a second constructor for the old 
ProcessorContext, even though the _actual_ processor context is always an 
Internal one, which implements both interfaces. I did it this way so that we 
can avoid any changes at all in the bulk of the Processor implementations.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##########
@@ -34,7 +34,7 @@
 
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
-public class GlobalProcessorContextImpl extends AbstractProcessorContext {
+public class GlobalProcessorContextImpl extends 
AbstractProcessorContext<Object, Object> {

Review comment:
       These parameters are largely unimportant for now.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
##########
@@ -42,7 +43,7 @@ public void shouldForwardValueTimestampIfNewValueExists() {
         expectLastCall();
         replay(context);
 
-        new TimestampedCacheFlushListener<>(context).apply(
+        new TimestampedCacheFlushListener<>((ProcessorContext<String, 
Change<String>>) context).apply(

Review comment:
       Lines like this are because we have to cast to differentiate between the 
two constructors. Since the context is an IPC, it actually implements both 
interfaces, and it doesn't matter which one we cast to.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
##########
@@ -19,30 +19,45 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener<K, V> implements 
CacheFlushListener<Windowed<K>, V> {
-    private final InternalProcessorContext context;
+class SessionCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<Windowed<KOut>, VOut> {
+    private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> 
context;

Review comment:
       Sadly, I had to add generics to the InternalProcessorContext, so there 
are a _lot_ of changes in this PR that are purely related to resolving rawtypes 
and unchecked warnings.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -816,6 +832,8 @@ public String queryableStoreName() {
             return new 
KTableSourceValueGetterSupplier<>(source.queryableName());
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) 
processorSupplier).view();
+        } else if (processorSupplier instanceof KTableNewProcessorSupplier) {
+            return ((KTableNewProcessorSupplier<?, ?, K, V>) 
processorSupplier).view();

Review comment:
       We have to add a new typecheck for the new supplier type.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+
+public interface KTableNewProcessorSupplier<KIn, VIn, KOut, VOut> extends 
ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> {

Review comment:
       This is just a temporary class so that we don't have to migrate all the 
processors in a "big bang". Once the old processorSupplier is unused, we can 
rename this class back.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,6 +850,12 @@ public boolean enableSendingOldValues(final boolean 
forceMaterialization) {
                 source.enableSendingOldValues();
             } else if (processorSupplier instanceof 
KStreamAggProcessorSupplier) {
                 ((KStreamAggProcessorSupplier<?, K, S, V>) 
processorSupplier).enableSendingOldValues();
+            } else if (processorSupplier instanceof 
KTableNewProcessorSupplier) {
+                final KTableNewProcessorSupplier<?, ?, ?, ?> 
tableProcessorSupplier =
+                    (KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier;
+                if 
(!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
+                    return false;
+                }

Review comment:
       We have to add a new typecheck for the new supplier type.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
##########
@@ -19,30 +19,45 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener<K, V> implements 
CacheFlushListener<Windowed<K>, V> {
-    private final InternalProcessorContext context;
+class SessionCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<Windowed<KOut>, VOut> {
+    private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> 
context;
+
+    @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
+    @SuppressWarnings("unchecked")
     SessionCacheFlushListener(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+        this.context = (InternalProcessorContext<Windowed<KOut>, 
Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
     @Override
-    public void apply(final Windowed<K> key,
-                      final V newValue,
-                      final V oldValue,
+    public void apply(final Windowed<KOut> key,
+                      final VOut newValue,
+                      final VOut oldValue,
                       final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();

Review comment:
       There is also a sprinkling of suppressions like this. I did the best I 
could, but it's also not possible/easy/important to resolve them all right now.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##########
@@ -69,7 +69,7 @@ public GlobalStateUpdateTask(final LogContext logContext,
         final Map<String, String> storeNameToTopic = 
topology.storeToChangelogTopic();
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
-            final SourceNode<?, ?, ?, ?> source = topology.source(sourceTopic);
+            final SourceNode<?, ?> source = topology.source(sourceTopic);

Review comment:
       I forget why now, but it was necessary to revise the generic params on 
source and sink nodes. Source nodes are always just pass-thoughs, and sink 
nodes can't forward, so their output types are Void. In both cases, it means 
they only need two generic parameters now, so I had to change like 100 
declarations like this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
##########
@@ -19,30 +19,45 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener<K, V> implements 
CacheFlushListener<Windowed<K>, V> {
-    private final InternalProcessorContext context;
+class SessionCacheFlushListener<KOut, VOut> implements 
CacheFlushListener<Windowed<KOut>, VOut> {
+    private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> 
context;
+
+    @SuppressWarnings("rawtypes")
     private final ProcessorNode myNode;
 
+    @SuppressWarnings("unchecked")
     SessionCacheFlushListener(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+        this.context = (InternalProcessorContext<Windowed<KOut>, 
Change<VOut>>) context;
         myNode = this.context.currentNode();
     }
 
     @Override
-    public void apply(final Windowed<K> key,
-                      final V newValue,
-                      final V oldValue,
+    public void apply(final Windowed<KOut> key,
+                      final VOut newValue,
+                      final VOut oldValue,
                       final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
         context.setCurrentNode(myNode);
         try {
             context.forward(key, new Change<>(newValue, oldValue), 
To.all().withTimestamp(key.window().end()));
         } finally {
             context.setCurrentNode(prev);
         }
     }
+
+    @Override
+    public void apply(final Record<Windowed<KOut>, Change<VOut>> record) {
+        @SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();
+        context.setCurrentNode(myNode);
+        try {
+            context.forward(record.withTimestamp(record.key().window().end()));
+        } finally {
+            context.setCurrentNode(prev);
+        }
+    }

Review comment:
       For compatibility these listeners and the forwarders now have apply 
methods that work for both the new and old APIs. They _should_ be functionally 
equivalent.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -26,9 +26,9 @@
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeyDeserializer;
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer;
 
-public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, 
KOut, VOut> {
+public class SourceNode<KIn, VIn> extends ProcessorNode<KIn, VIn, KIn, VIn> {

Review comment:
       Here, we declare that the source node can only forward the same type it 
receives.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -173,12 +175,28 @@ public boolean setFlushListener(final 
CacheFlushListener<K, V> listener,
         final KeyValueStore<Bytes, byte[]> wrapped = wrapped();
         if (wrapped instanceof CachedStateStore) {
             return ((CachedStateStore<byte[], byte[]>) 
wrapped).setFlushListener(
-                (rawKey, rawNewValue, rawOldValue, timestamp) -> 
listener.apply(
-                    serdes.keyFrom(rawKey),
-                    rawNewValue != null ? serdes.valueFrom(rawNewValue) : null,
-                    rawOldValue != null ? serdes.valueFrom(rawOldValue) : null,
-                    timestamp
-                ),
+                new CacheFlushListener<byte[], byte[]>() {

Review comment:
       Since the interface has two methods, it can't be a lambda anymore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/To.java
##########
@@ -89,4 +89,11 @@ public int hashCode() {
         throw new UnsupportedOperationException("To is unsafe for use in Hash 
collections");
     }
 
+    @Override
+    public String toString() {
+        return "To{" +
+               "childName='" + childName + '\'' +
+               ", timestamp=" + timestamp +
+               '}';
+    }

Review comment:
       This was kind of funny. When the test framework gets a failing test, it 
calls toString to print out the explanation. Since toString wasn't overridden 
here, it used the default Object.toString, which calls hashCode. But To's 
hashCode implementation throws an exception so that we don't accidentally use 
it in a hash collection. Needless to say, it took a minute to figure out what 
was happening, so I left the toString implemented.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
##########
@@ -110,8 +110,8 @@ private void testMetrics(final String 
builtInMetricsVersion) {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", 
builtInMetricsVersion, new MockTime());
-        final InternalMockProcessorContext context = new 
InternalMockProcessorContext(streamsMetrics);
-        final ProcessorNode<Object, Object, ?, ?> node = new 
ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
+        final InternalMockProcessorContext<Object, Object> context = new 
InternalMockProcessorContext<>(streamsMetrics);
+        final ProcessorNode<Object, Object, Object, Object> node = new 
ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());

Review comment:
       For complicated java-type-system reasons, I had to switch from wildcards 
to Object in some places.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##########
@@ -24,14 +24,14 @@
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer;
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer;
 
-public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, 
KOut, VOut> {
+public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> {

Review comment:
       Here's where we declare the sink node cannot forward and hence only 
needs input parameters.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to