This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1e14b0c9640 MINOR: Cleanup Joined class (#14551)
1e14b0c9640 is described below

commit 1e14b0c964091121a5116cae18d9494a5d07bb5d
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Wed Aug 21 14:31:22 2024 -0700

    MINOR: Cleanup Joined class (#14551)
    
    Code cleanup and JavaDocs fixed, plus add missing getters to JoinedInternal.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../org/apache/kafka/streams/kstream/Joined.java   | 222 ++++++++++++---------
 .../streams/kstream/internals/JoinedInternal.java  |  18 +-
 .../streams/kstream/internals/KStreamImpl.java     |  29 ++-
 3 files changed, 153 insertions(+), 116 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
index 55dff2428f8..2978f943f31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -24,47 +24,56 @@ import java.time.Duration;
  * The {@code Joined} class represents optional params that can be passed to
  * {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)} 
and
  * {@link KStream#leftJoin(KTable, ValueJoiner) KStream#leftJoin(KTable,...)} 
operations.
+ *
+ * @param <K> type of record key
+ * @param <VLeft> type of left record value
+ * @param <VRight> type of right record value
  */
-public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
+public class Joined<K, VLeft, VRight> implements NamedOperation<Joined<K, 
VLeft, VRight>> {
 
     protected final Serde<K> keySerde;
-    protected final Serde<V> valueSerde;
-    protected final Serde<VO> otherValueSerde;
+    protected final Serde<VLeft> leftValueSerde;
+    protected final Serde<VRight> rightValueSerde;
     protected final String name;
     protected final Duration gracePeriod;
 
     private Joined(final Serde<K> keySerde,
-                   final Serde<V> valueSerde,
-                   final Serde<VO> otherValueSerde,
+                   final Serde<VLeft> leftValueSerde,
+                   final Serde<VRight> rightValueSerde,
                    final String name,
                    final Duration gracePeriod) {
         this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-        this.otherValueSerde = otherValueSerde;
+        this.leftValueSerde = leftValueSerde;
+        this.rightValueSerde = rightValueSerde;
         this.name = name;
         this.gracePeriod = gracePeriod;
     }
 
-    protected Joined(final Joined<K, V, VO> joined) {
-        this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, 
joined.name, joined.gracePeriod);
+    protected Joined(final Joined<K, VLeft, VRight> joined) {
+        this(joined.keySerde, joined.leftValueSerde, joined.rightValueSerde, 
joined.name, joined.gracePeriod);
     }
 
     /**
      * Create an instance of {@code Joined} with key, value, and otherValue 
{@link Serde} instances.
      * {@code null} values are accepted and will be replaced by the default 
serdes as defined in config.
      *
-     * @param keySerde        the key serde to use. If {@code null} the 
default key serde from config will be used
-     * @param valueSerde      the value serde to use. If {@code null} the 
default value serde from config will be used
-     * @param otherValueSerde the otherValue serde to use. If {@code null} the 
default value serde from config will be used
-     * @param <K>             key type
-     * @param <V>             value type
-     * @param <VO>            other value type
+     * @param keySerde
+     *        the key serde to use. If {@code null} the default key serde from 
config will be used
+     * @param leftValueSerde
+     *        the value serde to use. If {@code null} the default value serde 
from config will be used
+     * @param rightValueSerde
+     *        the otherValue serde to use. If {@code null} the default value 
serde from config will be used
+     *
+     * @param <K> key type
+     * @param <VLeft> left value type
+     * @param <VRight> right value type
+     *
      * @return new {@code Joined} instance with the provided serdes
      */
-    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
-                                                   final Serde<V> valueSerde,
-                                                   final Serde<VO> 
otherValueSerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, null, null);
+    public static <K, VLeft, VRight> Joined<K, VLeft, VRight> with(final 
Serde<K> keySerde,
+                                                                   final 
Serde<VLeft> leftValueSerde,
+                                                                   final 
Serde<VRight> rightValueSerde) {
+        return new Joined<>(keySerde, leftValueSerde, rightValueSerde, null, 
null);
     }
 
     /**
@@ -72,24 +81,26 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * {@code null} values are accepted and will be replaced by the default 
serdes as defined in
      * config.
      *
-     * @param keySerde the key serde to use. If {@code null} the default key 
serde from config will be
-     * used
-     * @param valueSerde the value serde to use. If {@code null} the default 
value serde from config
-     * will be used
-     * @param otherValueSerde the otherValue serde to use. If {@code null} the 
default value serde
-     * from config will be used
-     * @param name the name used as the base for naming components of the join 
including any
-     * repartition topics
+     * @param keySerde
+     *        the key serde to use. If {@code null} the default key serde from 
config will be used
+     * @param leftValueSerde
+     *        the left value serde to use. If {@code null} the default value 
serde from config will be used
+     * @param rightValueSerde
+     *        the right value serde to use. If {@code null} the default value 
serde from config will be used
+     * @param name
+     *        the name used as the base for naming components of the join 
including any repartition topics
+     *
      * @param <K> key type
-     * @param <V> value type
-     * @param <VO> other value type
+     * @param <VLeft> left value type
+     * @param <VRight> right value type
+     *
      * @return new {@code Joined} instance with the provided serdes
      */
-    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
-                                                   final Serde<V> valueSerde,
-                                                   final Serde<VO> 
otherValueSerde,
-                                                   final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, null);
+    public static <K, VLeft, VRight> Joined<K, VLeft, VRight> with(final 
Serde<K> keySerde,
+                                                                   final 
Serde<VLeft> leftValueSerde,
+                                                                   final 
Serde<VRight> rightValueSerde,
+                                                                   final 
String name) {
+        return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, 
null);
     }
 
     /**
@@ -97,39 +108,45 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * {@code null} values are accepted and will be replaced by the default 
serdes as defined in
      * config.
      *
-     * @param keySerde the key serde to use. If {@code null} the default key 
serde from config will be
-     * used
-     * @param valueSerde the value serde to use. If {@code null} the default 
value serde from config
-     * will be used
-     * @param otherValueSerde the otherValue serde to use. If {@code null} the 
default value serde
-     * from config will be used
-     * @param name the name used as the base for naming components of the join 
including any
-     * repartition topics
-     * @param gracePeriod stream buffer time
+     * @param keySerde
+     *        the key serde to use. If {@code null} the default key serde from 
config will be used
+     * @param leftValueSerde
+     *        the left value serde to use. If {@code null} the default value 
serde from config will be used
+     * @param rightValueSerde
+     *        the right value serde to use. If {@code null} the default value 
serde from config will be used
+     * @param name
+     *        the name used as the base for naming components of the join 
including any repartition topics
+     * @param gracePeriod
+     *        stream buffer time
+     *
      * @param <K> key type
-     * @param <V> value type
-     * @param <VO> other value type
+     * @param <VLeft> value value type
+     * @param <VRight> right value type
+     *
      * @return new {@code Joined} instance with the provided serdes
      */
-    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
-                                                   final Serde<V> valueSerde,
-                                                   final Serde<VO> 
otherValueSerde,
-                                                   final String name,
-                                                   final Duration gracePeriod) 
{
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    public static <K, VLeft, VRight> Joined<K, VLeft, VRight> with(final 
Serde<K> keySerde,
+                                                                   final 
Serde<VLeft> leftValueSerde,
+                                                                   final 
Serde<VRight> rightValueSerde,
+                                                                   final 
String name,
+                                                                   final 
Duration gracePeriod) {
+        return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, 
gracePeriod);
     }
 
     /**
      * Create an instance of {@code Joined} with  a key {@link Serde}.
      * {@code null} values are accepted and will be replaced by the default 
key serde as defined in config.
      *
-     * @param keySerde the key serde to use. If {@code null} the default key 
serde from config will be used
-     * @param <K>      key type
-     * @param <V>      value type
-     * @param <VO>     other value type
+     * @param keySerde
+     *        the key serde to use. If {@code null} the default key serde from 
config will be used
+     *
+     * @param <K> key type
+     * @param <VLeft> value value type
+     * @param <VRight> right value type
+     *
      * @return new {@code Joined} instance configured with the keySerde
      */
-    public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> 
keySerde) {
+    public static <K, VLeft, VRight> Joined<K, VLeft, VRight> keySerde(final 
Serde<K> keySerde) {
         return new Joined<>(keySerde, null, null, null, null);
     }
 
@@ -137,44 +154,51 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * Create an instance of {@code Joined} with a value {@link Serde}.
      * {@code null} values are accepted and will be replaced by the default 
value serde as defined in config.
      *
-     * @param valueSerde the value serde to use. If {@code null} the default 
value serde from config will be used
-     * @param <K>        key type
-     * @param <V>        value type
-     * @param <VO>       other value type
+     * @param leftValueSerde
+     *        the left value serde to use. If {@code null} the default value 
serde from config will be used
+     *
+     * @param <K> key type
+     * @param <VLeft> left value type
+     * @param <VRight> right value type
+     *
      * @return new {@code Joined} instance configured with the valueSerde
      */
-    public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> 
valueSerde) {
-        return new Joined<>(null, valueSerde, null, null, null);
+    public static <K, VLeft, VRight> Joined<K, VLeft, VRight> valueSerde(final 
Serde<VLeft> leftValueSerde) {
+        return new Joined<>(null, leftValueSerde, null, null, null);
     }
 
 
     /**
-     * Create an instance of {@code Joined} with an other value {@link Serde}.
+     * Create an instance of {@code Joined} with aother value {@link Serde}.
      * {@code null} values are accepted and will be replaced by the default 
value serde as defined in config.
      *
-     * @param otherValueSerde the otherValue serde to use. If {@code null} the 
default value serde from config will be used
-     * @param <K>             key type
-     * @param <V>             value type
-     * @param <VO>            other value type
+     * @param rightValueSerde
+     *        the right value serde to use. If {@code null} the default value 
serde from config will be used
+     *
+     * @param <K> key type
+     * @param <VLeft> value type
+     * @param <VRight> right value type
+     *
      * @return new {@code Joined} instance configured with the otherValueSerde
      */
-    public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> 
otherValueSerde) {
-        return new Joined<>(null, null, otherValueSerde, null, null);
+    public static <K, VLeft, VRight> Joined<K, VLeft, VRight> 
otherValueSerde(final Serde<VRight> rightValueSerde) {
+        return new Joined<>(null, null, rightValueSerde, null, null);
     }
 
     /**
      * Create an instance of {@code Joined} with base name for all components 
of the join, this may
      * include any repartition topics created to complete the join.
      *
-     * @param name the name used as the base for naming components of the join 
including any
-     * repartition topics
+     * @param name
+     *        the name used as the base for naming components of the join 
including any repartition topics
+     *
      * @param <K> key type
-     * @param <V> value type
-     * @param <VO> other value type
-     * @return new {@code Joined} instance configured with the name
+     * @param <VLeft> left value type
+     * @param <VRight> right value type
      *
+     * @return new {@code Joined} instance configured with the name
      */
-    public static <K, V, VO> Joined<K, V, VO> as(final String name) {
+    public static <K, VLeft, VRight> Joined<K, VLeft, VRight> as(final String 
name) {
         return new Joined<>(null, null, null, name, null);
     }
 
@@ -182,46 +206,53 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * Set the key {@link Serde} to be used. Null values are accepted and will 
be replaced by the default
      * key serde as defined in config
      *
-     * @param keySerde the key serde to use. If null the default key serde 
from config will be used
+     * @param keySerde
+     *        the key serde to use. If null the default key serde from config 
will be used
+     *
      * @return new {@code Joined} instance configured with the {@code name}
      */
-    public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    public Joined<K, VLeft, VRight> withKeySerde(final Serde<K> keySerde) {
+        return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, 
gracePeriod);
     }
 
     /**
      * Set the value {@link Serde} to be used. Null values are accepted and 
will be replaced by the default
      * value serde as defined in config
      *
-     * @param valueSerde the value serde to use. If null the default value 
serde from config will be used
+     * @param leftValueSerde
+     *        the left value serde to use. If null the default value serde 
from config will be used
+     *
      * @return new {@code Joined} instance configured with the {@code 
valueSerde}
      */
-    public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    public Joined<K, VLeft, VRight> withValueSerde(final Serde<VLeft> 
leftValueSerde) {
+        return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, 
gracePeriod);
     }
 
     /**
      * Set the otherValue {@link Serde} to be used. Null values are accepted 
and will be replaced by the default
      * value serde as defined in config
      *
-     * @param otherValueSerde the otherValue serde to use. If null the default 
value serde from config will be used
+     * @param rightValueSerde
+     *        the right value serde to use. If null the default value serde 
from config will be used
+     *
      * @return new {@code Joined} instance configured with the {@code 
valueSerde}
      */
-    public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> 
otherValueSerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    public Joined<K, VLeft, VRight> withOtherValueSerde(final Serde<VRight> 
rightValueSerde) {
+        return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, 
gracePeriod);
     }
 
     /**
      * Set the base name used for all components of the join, this may include 
any repartition topics
      * created to complete the join.
      *
-     * @param name the name used as the base for naming components of the join 
including any
-     * repartition topics
+     * @param name
+     *        the name used as the base for naming components of the join 
including any repartition topics
+     *
      * @return new {@code Joined} instance configured with the {@code name}
      */
     @Override
-    public Joined<K, V, VO> withName(final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    public Joined<K, VLeft, VRight> withName(final String name) {
+        return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, 
gracePeriod);
     }
 
     /**
@@ -231,12 +262,13 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * result in a null join. Long gaps in stream side arriving records will 
cause
      * records to be delayed in processing.
      *
+     * @param gracePeriod
+     *        the duration of the grace period. Must be less than the joining 
table's history retention.
      *
-     * @param gracePeriod the duration of the grace period. Must be less than 
the joining table's history retention.
      * @return new {@code Joined} instance configured with the gracePeriod
      */
-    public Joined<K, V, VO> withGracePeriod(final Duration gracePeriod) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    public Joined<K, VLeft, VRight> withGracePeriod(final Duration 
gracePeriod) {
+        return new Joined<>(keySerde, leftValueSerde, rightValueSerde, name, 
gracePeriod);
     }
 
     public Duration gracePeriod() {
@@ -247,11 +279,11 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
         return keySerde;
     }
 
-    public Serde<V> valueSerde() {
-        return valueSerde;
+    public Serde<VLeft> valueSerde() {
+        return leftValueSerde;
     }
 
-    public Serde<VO> otherValueSerde() {
-        return otherValueSerde;
+    public Serde<VRight> otherValueSerde() {
+        return rightValueSerde;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
index eb5884042cd..e01adb5ae4f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
@@ -19,22 +19,28 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Joined;
 
-public class JoinedInternal<K, V, VO> extends Joined<K, V, VO>  {
+import java.time.Duration;
 
-    JoinedInternal(final Joined<K, V, VO> joined) {
+public class JoinedInternal<K, VLeft, VRight> extends Joined<K, VLeft, VRight> 
 {
+
+    JoinedInternal(final Joined<K, VLeft, VRight> joined) {
         super(joined);
     }
 
+    public Duration gracePeriod() {
+        return gracePeriod;
+    }
+
     public Serde<K> keySerde() {
         return keySerde;
     }
 
-    public Serde<V> valueSerde() {
-        return valueSerde;
+    public Serde<VLeft> leftValueSerde() {
+        return leftValueSerde;
     }
 
-    public Serde<VO> otherValueSerde() {
-        return otherValueSerde;
+    public Serde<VRight> rightValueSerde() {
+        return rightValueSerde;
     }
 
     public String name() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index e03ba1e19d1..556ff94c95a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1079,12 +1079,12 @@ public class KStreamImpl<K, V> extends 
AbstractStream<K, V> implements KStream<K
         if (repartitionRequired) {
             final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(
                     name != null ? name : this.name,
-                    joined.keySerde(),
-                    joined.valueSerde()
+                    joinedInternal.keySerde(),
+                    joinedInternal.leftValueSerde()
             );
-            return thisStreamRepartitioned.doStreamTableJoin(table, joiner, 
joined, false);
+            return thisStreamRepartitioned.doStreamTableJoin(table, joiner, 
joinedInternal, false);
         } else {
-            return doStreamTableJoin(table, joiner, joined, false);
+            return doStreamTableJoin(table, joiner, joinedInternal, false);
         }
     }
 
@@ -1122,12 +1122,12 @@ public class KStreamImpl<K, V> extends 
AbstractStream<K, V> implements KStream<K
         if (repartitionRequired) {
             final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(
                     name != null ? name : this.name,
-                    joined.keySerde(),
-                    joined.valueSerde()
+                    joinedInternal.keySerde(),
+                    joinedInternal.leftValueSerde()
             );
-            return thisStreamRepartitioned.doStreamTableJoin(table, joiner, 
joined, true);
+            return thisStreamRepartitioned.doStreamTableJoin(table, joiner, 
joinedInternal, true);
         } else {
-            return doStreamTableJoin(table, joiner, joined, true);
+            return doStreamTableJoin(table, joiner, joinedInternal, true);
         }
     }
 
@@ -1232,27 +1232,26 @@ public class KStreamImpl<K, V> extends 
AbstractStream<K, V> implements KStream<K
     @SuppressWarnings("unchecked")
     private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> 
table,
                                                       final 
ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
-                                                      final Joined<K, V, VO> 
joined,
+                                                      final JoinedInternal<K, 
V, VO> joinedInternal,
                                                       final boolean leftJoin) {
         Objects.requireNonNull(table, "table can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
         final Set<String> allSourceNodes = 
ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) table));
 
-        final JoinedInternal<K, V, VO> joinedInternal = new 
JoinedInternal<>(joined);
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
 
         Optional<String> bufferStoreName = Optional.empty();
 
-        if (joined.gracePeriod() != null) {
+        if (joinedInternal.gracePeriod() != null) {
             if (!((KTableImpl<K, ?, VO>) 
table).graphNode.isOutputVersioned().orElse(true)) {
                 throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
             }
             bufferStoreName = Optional.of(name + "-Buffer");
             final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> 
storeBuilder =
-                    new 
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), 
joined.gracePeriod(), name);
+                    new 
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), 
joinedInternal.gracePeriod(), name);
             builder.addStateStore(new StoreBuilderWrapper(storeBuilder));
         }
 
@@ -1260,7 +1259,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             ((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
             joiner,
             leftJoin,
-            Optional.ofNullable(joined.gracePeriod()),
+            Optional.ofNullable(joinedInternal.gracePeriod()),
             bufferStoreName);
 
         final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
@@ -1269,7 +1268,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             processorParameters,
             ((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
             this.name,
-            joined.gracePeriod(),
+            joinedInternal.gracePeriod(),
             bufferStoreName
         );
 
@@ -1281,7 +1280,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         // do not have serde for joined result
         return new KStreamImpl<>(
             name,
-            joined.keySerde() != null ? joined.keySerde() : keySerde,
+            joinedInternal.keySerde() != null ? joinedInternal.keySerde() : 
keySerde,
             null,
             allSourceNodes,
             false,

Reply via email to