This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 54dc27c  KAFKA-10515: Properly initialize nullable Serdes with default 
values (#9467)
54dc27c is described below

commit 54dc27c81306228ecdfbaa6abcee9d730420ea7b
Author: Thorsten Hake <[email protected]>
AuthorDate: Thu Oct 29 23:37:36 2020 +0100

    KAFKA-10515: Properly initialize nullable Serdes with default values (#9467)
    
    This is a cherry pick of PR #9338 on branch 2.6.
    
    Introduced the notion of WrappingNullableSerdes
    (aligned to the concept of WrappingNullableSerializer and
    WrappingNullableDeserializer) and centralized
    initialization in WrappingNullables.
    
    The added integeration test KTableKTableForeignKeyJoinDistributedTest tests
    whether all serdes are now correctly set on all stream clients.
    
    Reviewers: John Roesler <[email protected]>
---
 .../kstream/internals/WrappingNullableSerde.java   |  68 ++++++
 .../kstream/internals/WrappingNullableUtils.java   |  97 +++++++++
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   |  26 +--
 .../processor/internals/ProcessorContextUtils.java |  56 +++++
 .../streams/processor/internals/SinkNode.java      |  26 +--
 .../streams/processor/internals/SourceNode.java    |  25 +--
 .../state/internals/MeteredKeyValueStore.java      |  12 +-
 .../state/internals/MeteredSessionStore.java       |   6 +-
 .../internals/MeteredTimestampedKeyValueStore.java |  15 +-
 .../internals/MeteredTimestampedWindowStore.java   |  14 +-
 .../state/internals/MeteredWindowStore.java        |  10 +-
 .../internals/ValueAndTimestampDeserializer.java   |  11 +-
 .../state/internals/ValueAndTimestampSerde.java    |  41 +---
 .../internals/ValueAndTimestampSerializer.java     |  14 +-
 .../KTableKTableForeignKeyJoinDistributedTest.java | 235 +++++++++++++++++++++
 .../internals/GlobalStateStoreProviderTest.java    |   8 +
 .../MeteredTimestampedKeyValueStoreTest.java       |   6 +
 17 files changed, 553 insertions(+), 117 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
new file mode 100644
index 0000000..c15ff23
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+import java.util.Objects;
+
+public abstract class WrappingNullableSerde<T, InnerK, InnerV> implements 
Serde<T> {
+    private final WrappingNullableSerializer<T, InnerK, InnerV> serializer;
+    private final WrappingNullableDeserializer<T, InnerK, InnerV> deserializer;
+
+    protected WrappingNullableSerde(final WrappingNullableSerializer<T, 
InnerK, InnerV> serializer,
+                                    final WrappingNullableDeserializer<T, 
InnerK, InnerV> deserializer) {
+        Objects.requireNonNull(serializer, "serializer can't be null");
+        Objects.requireNonNull(deserializer, "deserializer can't be null");
+        this.serializer = serializer;
+        this.deserializer = deserializer;
+    }
+
+    @Override
+    public Serializer<T> serializer() {
+        return serializer;
+    }
+
+    @Override
+    public Deserializer<T> deserializer() {
+        return deserializer;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs,
+                          final boolean isKey) {
+        serializer.configure(configs, isKey);
+        deserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public void close() {
+        serializer.close();
+        deserializer.close();
+    }
+
+    public void setIfUnset(final Serde<InnerK> defaultKeySerde, final 
Serde<InnerV> defaultValueSerde) {
+        Objects.requireNonNull(defaultKeySerde);
+        Objects.requireNonNull(defaultValueSerde);
+        serializer.setIfUnset(defaultKeySerde.serializer(), 
defaultValueSerde.serializer());
+        deserializer.setIfUnset(defaultKeySerde.deserializer(), 
defaultValueSerde.deserializer());
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
new file mode 100644
index 0000000..23954d2
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * If a component's serdes are Wrapping serdes, then they require a little 
extra setup
+ * to be fully initialized at run time.
+ */
+public class WrappingNullableUtils {
+
+    @SuppressWarnings("unchecked")
+    private static <T> Deserializer<T> prepareDeserializer(final 
Deserializer<T> specificDeserializer, final Deserializer<?> 
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final 
boolean isKey) {
+        Deserializer<T> deserializerToUse = specificDeserializer;
+        if (deserializerToUse == null) {
+            deserializerToUse = (Deserializer<T>) (isKey ? 
contextKeyDeserializer : contextValueDeserializer);
+        } else {
+            initNullableDeserializer(deserializerToUse, 
contextKeyDeserializer, contextValueDeserializer);
+        }
+        return deserializerToUse;
+    }
+    @SuppressWarnings("unchecked")
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> 
specificSerializer, final Serializer<?> contextKeySerializer, final 
Serializer<?> contextValueSerializer, final boolean isKey) {
+        Serializer<T> serializerToUse = specificSerializer;
+        if (serializerToUse == null) {
+            serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : 
contextValueSerializer);
+        } else {
+            initNullableSerializer(serializerToUse, contextKeySerializer, 
contextValueSerializer);
+        }
+        return serializerToUse;
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, 
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean 
isKey) {
+        Serde<T> serdeToUse = specificSerde;
+        if (serdeToUse == null) {
+            serdeToUse = (Serde<T>) (isKey ?  contextKeySerde : 
contextValueSerde);
+        } else if (serdeToUse instanceof WrappingNullableSerde) {
+            ((WrappingNullableSerde) serdeToUse).setIfUnset(contextKeySerde, 
contextValueSerde);
+        }
+        return serdeToUse;
+    }
+
+    public static <K> Deserializer<K> prepareKeyDeserializer(final 
Deserializer<K> specificDeserializer, final Deserializer<?> 
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer) {
+        return prepareDeserializer(specificDeserializer, 
contextKeyDeserializer, contextValueDeserializer, true);
+    }
+
+    public static <V> Deserializer<V> prepareValueDeserializer(final 
Deserializer<V> specificDeserializer, final Deserializer<?> 
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer) {
+        return prepareDeserializer(specificDeserializer, 
contextKeyDeserializer, contextValueDeserializer, false);
+    }
+
+    public static <K> Serializer<K> prepareKeySerializer(final Serializer<K> 
specificSerializer, final Serializer<?> contextKeySerializer, final 
Serializer<?> contextValueSerializer) {
+        return prepareSerializer(specificSerializer, contextKeySerializer, 
contextValueSerializer, true);
+    }
+
+    public static <V> Serializer<V> prepareValueSerializer(final Serializer<V> 
specificSerializer, final Serializer<?> contextKeySerializer, final 
Serializer<?> contextValueSerializer) {
+        return prepareSerializer(specificSerializer, contextKeySerializer, 
contextValueSerializer, false);
+    }
+
+    public static <K> Serde<K> prepareKeySerde(final Serde<K> specificSerde, 
final Serde<?> keySerde, final Serde<?> valueSerde) {
+        return prepareSerde(specificSerde, keySerde, valueSerde, true);
+    }
+
+    public static <V> Serde<V> prepareValueSerde(final Serde<V> specificSerde, 
final Serde<?> keySerde, final Serde<?> valueSerde) {
+        return prepareSerde(specificSerde, keySerde, valueSerde, false);
+    }
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <T> void initNullableSerializer(final Serializer<T> 
specificSerializer, final Serializer<?> contextKeySerializer, final 
Serializer<?> contextValueSerializer) {
+        if (specificSerializer instanceof WrappingNullableSerializer) {
+            ((WrappingNullableSerializer) 
specificSerializer).setIfUnset(contextKeySerializer, contextValueSerializer);
+        }
+    }
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <T> void initNullableDeserializer(final Deserializer<T> 
specificDeserializer, final Deserializer<?> contextKeyDeserializer, final 
Deserializer<?> contextValueDeserializer) {
+        if (specificDeserializer instanceof WrappingNullableDeserializer) {
+            ((WrappingNullableDeserializer) 
specificDeserializer).setIfUnset(contextKeyDeserializer, 
contextValueDeserializer);
+        }
+    }
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index d2cc989..7fe124a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -21,32 +21,22 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.function.Supplier;
 
-public class SubscriptionWrapperSerde<K> implements 
Serde<SubscriptionWrapper<K>> {
-    private final SubscriptionWrapperSerializer<K> serializer;
-    private final SubscriptionWrapperDeserializer<K> deserializer;
-
+public class SubscriptionWrapperSerde<K> extends 
WrappingNullableSerde<SubscriptionWrapper<K>, K, Void> {
     public SubscriptionWrapperSerde(final Supplier<String> 
primaryKeySerializationPseudoTopicSupplier,
                                     final Serde<K> primaryKeySerde) {
-        serializer = new 
SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
-                                                         primaryKeySerde == 
null ? null : primaryKeySerde.serializer());
-        deserializer = new 
SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
-                                                             primaryKeySerde 
== null ? null : primaryKeySerde.deserializer());
-    }
-
-    @Override
-    public Serializer<SubscriptionWrapper<K>> serializer() {
-        return serializer;
-    }
-
-    @Override
-    public Deserializer<SubscriptionWrapper<K>> deserializer() {
-        return deserializer;
+        super(
+            new 
SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
+                                                primaryKeySerde == null ? null 
: primaryKeySerde.serializer()),
+            new 
SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
+                                                  primaryKeySerde == null ? 
null : primaryKeySerde.deserializer())
+        );
     }
 
     private static class SubscriptionWrapperSerializer<K>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
new file mode 100644
index 0000000..25721ec
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+
+/**
+ * This class bridges the gap for components that _should_ be compatible with
+ * the public ProcessorContext interface, but have come to depend on features
+ * in InternalProcessorContext. In theory, all the features adapted here could
+ * migrate to the public interface, so each method in this class should 
reference
+ * the ticket that would ultimately obviate it.
+ */
+public final class ProcessorContextUtils {
+
+    private ProcessorContextUtils() {}
+
+    public static Serializer<?> getKeySerializer(final ProcessorContext 
processorContext) {
+        return getSerializer(processorContext, true);
+    }
+    public static Serializer<?> getValueSerializer(final ProcessorContext 
processorContext) {
+        return getSerializer(processorContext, false);
+    }
+    private static Serializer<?> getSerializer(final ProcessorContext 
processorContext, final boolean key) {
+        final Serde<?> serde = key ? processorContext.keySerde() : 
processorContext.valueSerde();
+        return serde == null ? null : serde.serializer();
+    }
+    public static Deserializer<?> getKeyDeserializer(final ProcessorContext 
processorContext) {
+        return getDeserializer(processorContext, true);
+    }
+    public static Deserializer<?> getValueDeserializer(final ProcessorContext 
processorContext) {
+        return getDeserializer(processorContext, false);
+    }
+    private static Deserializer<?> getDeserializer(final ProcessorContext 
processorContext, final boolean key) {
+        final Serde<?> serde = key ? processorContext.keySerde() : 
processorContext.valueSerde();
+        return serde == null ? null : serde.deserializer();
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 9b0a254..4efdfbb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -18,10 +18,12 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer;
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer;
+
 public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
     private Serializer<K> keySerializer;
@@ -52,28 +54,14 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         throw new UnsupportedOperationException("sink node does not allow 
addChild");
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final InternalProcessorContext context) {
         super.init(context);
         this.context = context;
-
-        // if serializers are null, get the default ones from the context
-        if (keySerializer == null) {
-            keySerializer = (Serializer<K>) context.keySerde().serializer();
-        }
-        if (valSerializer == null) {
-            valSerializer = (Serializer<V>) context.valueSerde().serializer();
-        }
-
-        // if serializers are internal wrapping serializers that may need to 
be given the default serializer
-        // then pass it the default one from the context
-        if (valSerializer instanceof WrappingNullableSerializer) {
-            ((WrappingNullableSerializer) valSerializer).setIfUnset(
-                context.keySerde().serializer(),
-                context.valueSerde().serializer()
-            );
-        }
+        final Serializer<?> contextKeySerializer = 
ProcessorContextUtils.getKeySerializer(context);
+        final Serializer<?> contextValueSerializer = 
ProcessorContextUtils.getValueSerializer(context);
+        keySerializer = prepareKeySerializer(keySerializer, 
contextKeySerializer, contextValueSerializer);
+        valSerializer = prepareValueSerializer(valSerializer, 
contextKeySerializer, contextValueSerializer);
     }
 
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 8508a7d..d49178a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -19,10 +19,12 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeyDeserializer;
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueDeserializer;
+
 public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
     private InternalProcessorContext context;
@@ -55,7 +57,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         return valDeserializer.deserialize(topic, headers, data);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final InternalProcessorContext context) {
         // It is important to first create the sensor before calling init on 
the
@@ -72,22 +73,10 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         super.init(context);
         this.context = context;
 
-        // if deserializers are null, get the default ones from the context
-        if (this.keyDeserializer == null) {
-            this.keyDeserializer = (Deserializer<K>) 
context.keySerde().deserializer();
-        }
-        if (this.valDeserializer == null) {
-            this.valDeserializer = (Deserializer<V>) 
context.valueSerde().deserializer();
-        }
-
-        // if deserializers are internal wrapping deserializers that may need 
to be given the default
-        // then pass it the default one from the context
-        if (valDeserializer instanceof WrappingNullableDeserializer) {
-            ((WrappingNullableDeserializer) valDeserializer).setIfUnset(
-                    context.keySerde().deserializer(),
-                    context.valueSerde().deserializer()
-            );
-        }
+        final Deserializer<?> contextKeyDeserializer = 
ProcessorContextUtils.getKeyDeserializer(context);
+        final Deserializer<?> contextValueDeserializer = 
ProcessorContextUtils.getValueDeserializer(context);
+        keyDeserializer = prepareKeyDeserializer(keyDeserializer, 
contextKeyDeserializer, contextValueDeserializer);
+        valDeserializer = prepareValueDeserializer(valDeserializer, 
contextKeyDeserializer, contextValueDeserializer);
     }
 
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index c844e03..4608985 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -34,6 +35,7 @@ import 
org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
 /**
@@ -101,12 +103,16 @@ public class MeteredKeyValueStore<K, V>
         maybeMeasureLatency(() -> super.init(context, root), time, 
restoreSensor);
     }
 
-    @SuppressWarnings("unchecked")
+    protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, 
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {
+        return WrappingNullableUtils.prepareValueSerde(valueSerde, 
contextKeySerde, contextValueSerde);
+    }
+
+    @Deprecated
     void initStoreSerde(final ProcessorContext context) {
         serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        prepareKeySerde(keySerde, context.keySerde(), context.valueSerde()),
+        prepareValueSerdeForStore(valueSerde, context.keySerde(), 
context.valueSerde()));
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index c7d4290..6421b9f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -65,14 +66,13 @@ public class MeteredSessionStore<K, V>
         this.time = time;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
         serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            WrappingNullableUtils.prepareKeySerde(keySerde, 
context.keySerde(), context.valueSerde()),
+            WrappingNullableUtils.prepareValueSerde(valueSerde, 
context.keySerde(), context.valueSerde()));
         taskId = context.taskId().toString();
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index d1446dc..042188e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -22,10 +22,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
@@ -50,11 +47,13 @@ public class MeteredTimestampedKeyValueStore<K, V>
     }
 
     @SuppressWarnings("unchecked")
-    void initStoreSerde(final ProcessorContext context) {
-        serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) 
context.valueSerde()) : valueSerde);
+    @Override
+    protected Serde<ValueAndTimestamp<V>> prepareValueSerdeForStore(final 
Serde<ValueAndTimestamp<V>> valueSerde, final Serde<?> contextKeySerde, final 
Serde<?> contextValueSerde) {
+        if (valueSerde == null) {
+            return new ValueAndTimestampSerde<>((Serde<V>) contextValueSerde);
+        } else {
+            return super.prepareValueSerdeForStore(valueSerde, 
contextKeySerde, contextValueSerde);
+        }
     }
 
     public RawAndDeserializedValue<V> getWithBinary(final K key) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index aecf69f..cb68863 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -19,9 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
@@ -49,10 +46,11 @@ class MeteredTimestampedWindowStore<K, V>
 
     @SuppressWarnings("unchecked")
     @Override
-    void initStoreSerde(final ProcessorContext context) {
-        serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) 
context.valueSerde()) : valueSerde);
+    protected Serde<ValueAndTimestamp<V>> prepareValueSerde(final 
Serde<ValueAndTimestamp<V>> valueSerde, final Serde<?> contextKeySerde, final 
Serde<?> contextValueSerde) {
+        if (valueSerde == null) {
+            return new ValueAndTimestampSerde<>((Serde<V>) contextValueSerde);
+        } else {
+            return super.prepareValueSerde(valueSerde, contextKeySerde, 
contextValueSerde);
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index fd39468..4bef5d0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -32,6 +33,7 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
 public class MeteredWindowStore<K, V>
@@ -84,13 +86,15 @@ public class MeteredWindowStore<K, V>
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, 
restoreSensor);
     }
+    protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final 
Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {
+        return WrappingNullableUtils.prepareValueSerde(valueSerde, 
contextKeySerde, contextValueSerde);
+    }
 
-    @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
         serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+            prepareKeySerde(keySerde, context.keySerde(), 
context.valueSerde()),
+            prepareValueSerde(valueSerde, context.keySerde(), 
context.valueSerde()));
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
index 7cd37d2..b7e56a3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampDeserializer.java
@@ -18,13 +18,16 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
 
-class ValueAndTimestampDeserializer<V> implements 
Deserializer<ValueAndTimestamp<V>> {
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+class ValueAndTimestampDeserializer<V> implements 
WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
     private final static LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
 
     public final Deserializer<V> valueDeserializer;
@@ -81,4 +84,10 @@ class ValueAndTimestampDeserializer<V> implements 
Deserializer<ValueAndTimestamp
         return LONG_DESERIALIZER.deserialize(null, 
rawTimestamp(rawValueAndTimestamp));
     }
 
+    @Override
+    public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, 
final Deserializer<V> defaultValueDeserializer) {
+        // ValueAndTimestampDeserializer never wraps a null deserializer (or 
configure would throw),
+        // but it may wrap a deserializer that itself wraps a null 
deserializer.
+        initNullableDeserializer(valueDeserializer, defaultKeyDeserializer, 
defaultValueDeserializer);
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
index c02992f..1936d29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
@@ -16,44 +16,17 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
-import java.util.Map;
-import java.util.Objects;
-
-public class ValueAndTimestampSerde<V> implements Serde<ValueAndTimestamp<V>> {
-    private final ValueAndTimestampSerializer<V> valueAndTimestampSerializer;
-    private final ValueAndTimestampDeserializer<V> 
valueAndTimestampDeserializer;
+import static java.util.Objects.requireNonNull;
 
+public class ValueAndTimestampSerde<V> extends 
WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
     public ValueAndTimestampSerde(final Serde<V> valueSerde) {
-        Objects.requireNonNull(valueSerde);
-        valueAndTimestampSerializer = new 
ValueAndTimestampSerializer<>(valueSerde.serializer());
-        valueAndTimestampDeserializer = new 
ValueAndTimestampDeserializer<>(valueSerde.deserializer());
-    }
-
-    @Override
-    public void configure(final Map<String, ?> configs,
-                          final boolean isKey) {
-        valueAndTimestampSerializer.configure(configs, isKey);
-        valueAndTimestampDeserializer.configure(configs, isKey);
-    }
-
-    @Override
-    public void close() {
-        valueAndTimestampSerializer.close();
-        valueAndTimestampDeserializer.close();
-    }
-
-    @Override
-    public Serializer<ValueAndTimestamp<V>> serializer() {
-        return valueAndTimestampSerializer;
-    }
-
-    @Override
-    public Deserializer<ValueAndTimestamp<V>> deserializer() {
-        return valueAndTimestampDeserializer;
+        super(
+            new ValueAndTimestampSerializer<>(requireNonNull(valueSerde, 
"valueSerde was null").serializer()),
+            new ValueAndTimestampDeserializer<>(requireNonNull(valueSerde, 
"valueSerde was null").deserializer())
+        );
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index 3b2663d..58c6159 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -18,13 +18,16 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
 
-public class ValueAndTimestampSerializer<V> implements 
Serializer<ValueAndTimestamp<V>> {
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+public class ValueAndTimestampSerializer<V> implements 
WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
     public final Serializer<V> valueSerializer;
     private final Serializer<Long> timestampSerializer;
 
@@ -54,7 +57,7 @@ public class ValueAndTimestampSerializer<V> implements 
Serializer<ValueAndTimest
     /**
      * @param left  the serialized byte array of the old record in state store
      * @param right the serialized byte array of the new record being processed
-     * @return true if the two serialized values are the same (excluding 
timestamp) or 
+     * @return true if the two serialized values are the same (excluding 
timestamp) or
      *              if the timestamp of right is less than left (indicating 
out of order record)
      *         false otherwise
      */
@@ -125,4 +128,11 @@ public class ValueAndTimestampSerializer<V> implements 
Serializer<ValueAndTimest
         valueSerializer.close();
         timestampSerializer.close();
     }
+
+    @Override
+    public void setIfUnset(final Serializer<Void> defaultKeySerializer, final 
Serializer<V> defaultValueSerializer) {
+        // ValueAndTimestampSerializer never wraps a null serializer (or 
configure would throw),
+        // but it may wrap a serializer that itself wraps a null serializer.
+        initNullableSerializer(valueSerializer, defaultKeySerializer, 
defaultValueSerializer);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
new file mode 100644
index 0000000..de71360
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+
+@Category({IntegrationTest.class})
+public class KTableKTableForeignKeyJoinDistributedTest {
+    private static final int NUM_BROKERS = 1;
+    private static final String LEFT_TABLE = "left_table";
+    private static final String RIGHT_TABLE = "right_table";
+    private static final String OUTPUT = "output-topic";
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private static final Properties CONSUMER_CONFIG = new Properties();
+
+    @Rule
+    public TestName testName = new TestName();
+
+
+    private static final String INPUT_TOPIC = "input-topic";
+
+    private KafkaStreams client1;
+    private KafkaStreams client2;
+
+    private volatile boolean client1IsOk = false;
+    private volatile boolean client2IsOk = false;
+
+    @BeforeClass
+    public static void createTopics() throws InterruptedException {
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+    }
+
+    @Before
+    public void setupTopics() throws InterruptedException {
+        CLUSTER.createTopic(LEFT_TABLE, 1, 1);
+        CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
+        CLUSTER.createTopic(OUTPUT, 11, 1);
+
+        //Fill test tables
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        final List<KeyValue<String, String>> leftTable = Arrays.asList(
+                new KeyValue<>("lhsValue1", "lhsValue1|rhs1"),
+                new KeyValue<>("lhsValue2", "lhsValue2|rhs2"),
+                new KeyValue<>("lhsValue3", "lhsValue3|rhs3"),
+                new KeyValue<>("lhsValue4", "lhsValue4|rhs4")
+        );
+        final List<KeyValue<String, String>> rightTable = Arrays.asList(
+                new KeyValue<>("rhs1", "rhsValue1"),
+                new KeyValue<>("rhs2", "rhsValue2"),
+                new KeyValue<>("rhs3", "rhsValue3")
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, 
leftTable, producerConfig, CLUSTER.time);
+        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, 
rightTable, producerConfig, CLUSTER.time);
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, 
"ktable-ktable-distributed-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+    }
+
+    @After
+    public void after() {
+        client1.close();
+        client2.close();
+        quietlyCleanStateAfterTest(CLUSTER, client1);
+        quietlyCleanStateAfterTest(CLUSTER, client2);
+    }
+
+    public Properties getStreamsConfiguration() {
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
+        return streamsConfiguration;
+    }
+
+
+    private void configureBuilder(final StreamsBuilder builder) {
+        final KTable<String, String> left = builder.table(
+                LEFT_TABLE
+        );
+        final KTable<String, String> right = builder.table(
+                RIGHT_TABLE
+        );
+
+        final Function<String, String> extractor = value -> 
value.split("\\|")[1];
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> 
"(" + value1 + "," + value2 + ")";
+
+        final KTable<String, String> fkJoin = left.join(right, extractor, 
joiner);
+        fkJoin
+                .toStream()
+                .to(OUTPUT);
+    }
+
+    @Test
+    public void shouldBeInitializedWithDefaultSerde() throws Exception {
+        final Properties streamsConfiguration1 = getStreamsConfiguration();
+        final Properties streamsConfiguration2 = getStreamsConfiguration();
+
+        //Each streams client needs to have it's own StreamsBuilder in order 
to simulate
+        //a truly distributed run
+        final StreamsBuilder builder1 = new StreamsBuilder();
+        configureBuilder(builder1);
+        final StreamsBuilder builder2 = new StreamsBuilder();
+        configureBuilder(builder2);
+
+
+        createClients(
+                builder1.build(streamsConfiguration1),
+                streamsConfiguration1,
+                builder2.build(streamsConfiguration2),
+                streamsConfiguration2
+        );
+
+        setStateListenersForVerification(thread -> 
!thread.activeTasks().isEmpty());
+
+        startClients();
+
+        waitUntilBothClientAreOK(
+                "At least one client did not reach state RUNNING with active 
tasks"
+        );
+        final Set<KeyValue<String, String>> expectedResult = new HashSet<>();
+        expectedResult.add(new KeyValue<>("lhsValue1", 
"(lhsValue1|rhs1,rhsValue1)"));
+        expectedResult.add(new KeyValue<>("lhsValue2", 
"(lhsValue2|rhs2,rhsValue2)"));
+        expectedResult.add(new KeyValue<>("lhsValue3", 
"(lhsValue3|rhs3,rhsValue3)"));
+        final Set<KeyValue<String, String>> result = new 
HashSet<>(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+                CONSUMER_CONFIG,
+                OUTPUT,
+                expectedResult.size()));
+
+        assertEquals(expectedResult, result);
+        //Check that both clients are still running
+        assertEquals(KafkaStreams.State.RUNNING, client1.state());
+        assertEquals(KafkaStreams.State.RUNNING, client2.state());
+    }
+
+    private void createClients(final Topology topology1,
+                               final Properties streamsConfiguration1,
+                               final Topology topology2,
+                               final Properties streamsConfiguration2) {
+
+        client1 = new KafkaStreams(topology1, streamsConfiguration1);
+        client2 = new KafkaStreams(topology2, streamsConfiguration2);
+    }
+
+    private void setStateListenersForVerification(final 
Predicate<ThreadMetadata> taskCondition) {
+        client1.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING &&
+                    
client1.localThreadsMetadata().stream().allMatch(taskCondition)) {
+                client1IsOk = true;
+            }
+        });
+        client2.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING &&
+                    
client2.localThreadsMetadata().stream().allMatch(taskCondition)) {
+                client2IsOk = true;
+            }
+        });
+    }
+
+    private void startClients() {
+        client1.start();
+        client2.start();
+    }
+
+    private void waitUntilBothClientAreOK(final String message) throws 
Exception {
+        TestUtils.waitForCondition(() -> client1IsOk && client2IsOk,
+                30 * 1000,
+                message + ": "
+                        + "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK, 
"
+                        + "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK."
+        );
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 1ab8684..024dd44 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -96,12 +97,19 @@ public class GlobalStateStoreProviderTest {
             .anyTimes();
         expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes();
         expect(mockContext.recordCollector()).andReturn(null).anyTimes();
+        expectSerdes(mockContext);
         replay(mockContext);
         for (final StateStore store : stores.values()) {
             store.init(mockContext, null);
         }
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static void expectSerdes(final ProcessorContextImpl context) {
+        expect(context.keySerde()).andReturn((Serde) 
Serdes.String()).anyTimes();
+        expect(context.valueSerde()).andReturn((Serde) 
Serdes.Long()).anyTimes();
+    }
+
     @Test
     public void shouldReturnSingleItemListIfStoreExists() {
         final GlobalStateStoreProvider provider =
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 5522855..44b215d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -128,6 +128,7 @@ public class MeteredTimestampedKeyValueStoreTest {
         expect(context.metrics())
             .andReturn(new StreamsMetricsImpl(metrics, "test", 
builtInMetricsVersion)).anyTimes();
         expect(context.taskId()).andReturn(taskId).anyTimes();
+        expectSerdes(context);
         expect(inner.name()).andReturn("metered").anyTimes();
         storeLevelGroup =
             StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion) ? 
STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP;
@@ -140,6 +141,11 @@ public class MeteredTimestampedKeyValueStoreTest {
         );
 
     }
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static void expectSerdes(final InternalProcessorContext context) {
+        expect(context.keySerde()).andReturn((Serde) 
Serdes.String()).anyTimes();
+        expect(context.valueSerde()).andReturn((Serde) 
Serdes.Long()).anyTimes();
+    }
 
     private void init() {
         replay(inner, context);

Reply via email to