guozhangwang commented on a change in pull request #9361: URL: https://github.com/apache/kafka/pull/9361#discussion_r498951242
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) { } final StateStore store = stateManager.getStore(name); - return getReadWriteStore(store); + return (S) getReadWriteStore(store); } @Override public <K, V> void forward(final K key, final V value) { - throwUnsupportedOperationExceptionIfStandby("forward"); - forward(key, value, SEND_TO_ALL); + final Record<K, V> toForward = new Record<>( + key, + value, + timestamp(), + headers() + ); + forward(toForward); } @Override @Deprecated public <K, V> void forward(final K key, final V value, final int childIndex) { - throwUnsupportedOperationExceptionIfStandby("forward"); - forward( + final Record<K, V> toForward = new Record<>( key, value, - To.child((currentNode().children()).get(childIndex).name())); + timestamp(), + headers() + ); + forward(toForward, (currentNode().children()).get(childIndex).name()); } @Override @Deprecated public <K, V> void forward(final K key, final V value, final String childName) { - throwUnsupportedOperationExceptionIfStandby("forward"); - forward(key, value, To.child(childName)); + final Record<K, V> toForward = new Record<>( + key, + value, + timestamp(), + headers() + ); + forward(toForward, childName); } - @SuppressWarnings("unchecked") @Override public <K, V> void forward(final K key, final V value, final To to) { + final ToInternal toInternal = new ToInternal(to); + final Record<K, V> toForward = new Record<>( + key, + value, + toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(), + headers() + ); + forward(toForward, toInternal.child()); + } + + @Override + public <K, V> void forward(final Record<K, V> record) { + forward(record, null); + } + + @SuppressWarnings("unchecked") Review comment: Thanks for your thoughts, and I agree it is too stretch for KIP-478 to extend to that. Let's just keep it as is now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org