mjsax commented on code in PR #14157: URL: https://github.com/apache/kafka/pull/14157#discussion_r1285341728
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java: ########## @@ -44,72 +45,98 @@ * @param <VO> Type of foreign values * @param <VR> Type of joined result of primary and foreign values */ -public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> { +public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<Bytes, SubscriptionResponseWrapper<byte[]>, K, VR> { private static final Logger LOG = LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class); - private final KTableValueGetterSupplier<K, V> valueGetterSupplier; - private final Serializer<V> constructionTimeValueSerializer; + private final KTableValueGetterSupplier<Bytes, byte[]> rawValueGetterSupplier; + private final Deserializer<K> keyDeserializer; + private final Deserializer<V> leftValueDeserializer; + private final Deserializer<VO> rightValueDeserializer; private final Supplier<String> valueHashSerdePseudoTopicSupplier; private final ValueJoiner<V, VO, VR> joiner; private final boolean leftJoin; - public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier, - final Serializer<V> valueSerializer, + public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<Bytes, byte[]> rawValueGetterSupplier, + final Deserializer<K> keyDeserializer, + final Deserializer<V> leftValueDeserializer, + final Deserializer<VO> rightValueDeserializer, final Supplier<String> valueHashSerdePseudoTopicSupplier, final ValueJoiner<V, VO, VR> joiner, final boolean leftJoin) { - this.valueGetterSupplier = valueGetterSupplier; - constructionTimeValueSerializer = valueSerializer; + this.rawValueGetterSupplier = rawValueGetterSupplier; + this.keyDeserializer = keyDeserializer; + this.leftValueDeserializer = leftValueDeserializer; + this.rightValueDeserializer = rightValueDeserializer; this.valueHashSerdePseudoTopicSupplier = valueHashSerdePseudoTopicSupplier; this.joiner = joiner; this.leftJoin = leftJoin; } @Override - public Processor<K, SubscriptionResponseWrapper<VO>, K, VR> get() { - return new ContextualProcessor<K, SubscriptionResponseWrapper<VO>, K, VR>() { + public Processor<Bytes, SubscriptionResponseWrapper<byte[]>, K, VR> get() { + return new ContextualProcessor<Bytes, SubscriptionResponseWrapper<byte[]>, K, VR>() { private String valueHashSerdePseudoTopic; - private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer; - - private KTableValueGetter<K, V> valueGetter; + private Deserializer<K> keyDeserializer = ResponseJoinProcessorSupplier.this.keyDeserializer; + private Deserializer<V> leftValueDeserializer = ResponseJoinProcessorSupplier.this.leftValueDeserializer; + private Deserializer<VO> rightValueDeserializer = ResponseJoinProcessorSupplier.this.rightValueDeserializer; + private KTableValueGetter<Bytes, byte[]> rawValueGetter; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext<K, VR> context) { super.init(context); valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get(); - valueGetter = valueGetterSupplier.get(); - valueGetter.init(context); - if (runtimeValueSerializer == null) { - runtimeValueSerializer = (Serializer<V>) context.valueSerde().serializer(); + rawValueGetter = rawValueGetterSupplier.get(); + rawValueGetter.init(context); + if (keyDeserializer == null) { + keyDeserializer = (Deserializer<K>) context.keySerde().deserializer(); + } + if (leftValueDeserializer == null) { + leftValueDeserializer = (Deserializer<V>) context.valueSerde().deserializer(); + } + if (rightValueDeserializer == null) { + rightValueDeserializer = (Deserializer<VO>) context.valueSerde().deserializer(); } } @Override - public void process(final Record<K, SubscriptionResponseWrapper<VO>> record) { + public void process(final Record<Bytes, SubscriptionResponseWrapper<byte[]>> record) { if (record.value().getVersion() != SubscriptionResponseWrapper.CURRENT_VERSION) { //Guard against modifications to SubscriptionResponseWrapper. Need to ensure that there is //compatibility with previous versions to enable rolling upgrades. Must develop a strategy for //upgrading from older SubscriptionWrapper versions to newer versions. throw new UnsupportedVersionException("SubscriptionResponseWrapper is of an incompatible version."); } - final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(record.key()); + final ValueAndTimestamp<byte[]> currentValueWithTimestamp = rawValueGetter.get(record.key()); final long[] currentHash = currentValueWithTimestamp == null ? null : - Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value())); + //Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value())); + Murmur3.hash128(currentValueWithTimestamp.value()); Review Comment: This is the actual fix: we don't re-serialize the value to re-compute the hash. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org