guozhangwang commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498951242



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")

Review comment:
       Thanks for your thoughts, and I agree it is too stretch for KIP-478 to 
extend to that. Let's just keep it as is now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to