guozhangwang commented on code in PR #14157: URL: https://github.com/apache/kafka/pull/14157#discussion_r1359694691
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java: ########## @@ -44,72 +45,98 @@ * @param <VO> Type of foreign values * @param <VR> Type of joined result of primary and foreign values */ -public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> { +public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<Bytes, SubscriptionResponseWrapper<byte[]>, K, VR> { private static final Logger LOG = LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class); - private final KTableValueGetterSupplier<K, V> valueGetterSupplier; - private final Serializer<V> constructionTimeValueSerializer; + private final KTableValueGetterSupplier<Bytes, byte[]> rawValueGetterSupplier; + private final Deserializer<K> keyDeserializer; + private final Deserializer<V> leftValueDeserializer; + private final Deserializer<VO> rightValueDeserializer; private final Supplier<String> valueHashSerdePseudoTopicSupplier; private final ValueJoiner<V, VO, VR> joiner; private final boolean leftJoin; - public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier, Review Comment: I did not pay attention to the code here, but now that I'm reading it, I wonder why we have to pass in a value-getter-supplier here instead of directly passing in the store itself and use the store.get APIs? The KTableValueGetter is (or at least, was) designed for a KTable object to be used by another KStream / KTable's processor as a parameter, like in a join in order to get its values while not directly accessing the underlying stores. For this case though, it is actually referencing the KTable itself, not the other "foreignKeyTable". If we just pass in the store itself would that make a lot of these wrapping gone and hence make the code simpler? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org