This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf3a5a3e047 MINOR: remove unnecessary `KeyValueMapper` (#13545)
bf3a5a3e047 is described below

commit bf3a5a3e0479c10ae10d202669c6a3c42ed8e70d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Apr 14 14:37:40 2023 -0700

    MINOR: remove unnecessary `KeyValueMapper` (#13545)
    
    Reviewers: Christo Lolov (@clolov), Bill Bejeck <[email protected]>
---
 .../kafka/streams/kstream/internals/KTableKTableInnerJoin.java      | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 56842d7ac81..98587b16fe5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
@@ -35,8 +34,6 @@ import static 
org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 class KTableKTableInnerJoin<K, V1, V2, VOut> extends 
KTableKTableAbstractJoin<K, V1, V2, VOut> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KTableKTableInnerJoin.class);
 
-    private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> 
key;
-
     KTableKTableInnerJoin(final KTableImpl<K, ?, V1> table1,
                           final KTableImpl<K, ?, V2> table2,
                           final ValueJoiner<? super V1, ? super V2, ? extends 
VOut> joiner) {
@@ -172,8 +169,7 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends 
KTableKTableAbstractJoin<K,
             final V1 value1 = getValueOrNull(valueAndTimestamp1);
 
             if (value1 != null) {
-                final ValueAndTimestamp<V2> valueAndTimestamp2
-                    = valueGetter2.get(keyValueMapper.apply(key, value1));
+                final ValueAndTimestamp<V2> valueAndTimestamp2 = 
valueGetter2.get(key);
                 final V2 value2 = getValueOrNull(valueAndTimestamp2);
 
                 if (value2 != null) {

Reply via email to