This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5f8ae0424a KAFKA-16260: Deprecate window.size.ms and 
window.inner.class.serde in StreamsConfig (#18297)
c5f8ae0424a is described below

commit c5f8ae0424ad189cca87126e55a4e17c676578d2
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Mar 31 20:15:37 2025 +0800

    KAFKA-16260: Deprecate window.size.ms and window.inner.class.serde in 
StreamsConfig (#18297)
    
    The `window.size.ms` and `window.inner.class.serde` are not a true
    KafkaStreams config, and are ignored when set from a KStreams
    application. Both belong on the client.
    
    Reviewers: Lucas Brutschy <[email protected]>
    Signed-off-by: PoAn Yang <[email protected]>
---
 docs/upgrade.html                                  |   8 ++
 .../KStreamAggregationIntegrationTest.java         |  20 ++--
 .../SelfJoinUpgradeIntegrationTest.java            |   3 +-
 .../SlidingWindowedKStreamIntegrationTest.java     |   9 +-
 .../TimeWindowedKStreamIntegrationTest.java        |   9 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  23 ++++-
 .../kstream/SessionWindowedDeserializer.java       |  49 ++++++---
 .../streams/kstream/SessionWindowedSerializer.java |  48 ++++++---
 .../streams/kstream/TimeWindowedDeserializer.java  | 115 +++++++++++++--------
 .../streams/kstream/TimeWindowedSerializer.java    |  48 ++++++---
 .../kstream/SessionWindowedDeserializerTest.java   |  37 ++++++-
 .../kstream/SessionWindowedSerializerTest.java     |  37 ++++++-
 .../kstream/TimeWindowedDeserializerTest.java      |  71 +++++++++++--
 .../kstream/TimeWindowedSerializerTest.java        |  36 ++++++-
 14 files changed, 394 insertions(+), 119 deletions(-)

diff --git a/docs/upgrade.html b/docs/upgrade.html
index 88be13c531e..3ffa47f0b02 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -36,6 +36,14 @@
                     </li>
                 </ul>
             </li>
+            <li><b>Stream</b>
+                <ul>
+                    <li>
+                        The <code>window.size.ms</code> and 
<code>window.inner.serde.class</code> in <code>StreamsConfig</code> are 
deprecated.
+                        Use the corresponding string constants defined in 
<code>TimeWindowedSerializer</code>, <code>TimeWindowedDeserializer</code>, 
<code>SessionWindowedSerializer</code> and 
<code>SessionWindowedDeserializer</code> instead.
+                    </li>
+                </ul>
+            </li>
         </ul>
 
 <h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0</a></h4>
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index bf0d54bc5c0..7037ce1368d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -1065,9 +1065,12 @@ public class KStreamAggregationIntegrationTest {
         
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass().getName());
         
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass().getName());
-        consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
-        if (keyDeserializer instanceof TimeWindowedDeserializer || 
keyDeserializer instanceof SessionWindowedDeserializer) {
-            
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
+        consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 
500L);
+        if (keyDeserializer instanceof TimeWindowedDeserializer) {
+            
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
+                Serdes.serdeFrom(innerClass).getClass().getName());
+        } else if (keyDeserializer instanceof SessionWindowedDeserializer) {
+            
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
                 Serdes.serdeFrom(innerClass).getClass().getName());
         }
         return 
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
@@ -1089,9 +1092,12 @@ public class KStreamAggregationIntegrationTest {
         
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass().getName());
         
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass().getName());
-        consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
-        if (keyDeserializer instanceof TimeWindowedDeserializer || 
keyDeserializer instanceof SessionWindowedDeserializer) {
-            
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
+        consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 
500L);
+        if (keyDeserializer instanceof TimeWindowedDeserializer) {
+            
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
+                Serdes.serdeFrom(innerClass).getClass().getName());
+        } else if (keyDeserializer instanceof SessionWindowedDeserializer) {
+            
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
                 Serdes.serdeFrom(innerClass).getClass().getName());
         }
         return 
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
@@ -1123,7 +1129,7 @@ public class KStreamAggregationIntegrationTest {
                 "--property", "key.deserializer=" + 
keyDeserializer.getClass().getName(),
                 "--property", "value.deserializer=" + 
valueDeserializer.getClass().getName(),
                 "--property", "key.separator=" + keySeparator,
-                "--property", "key.deserializer." + 
StreamsConfig.WINDOWED_INNER_CLASS_SERDE + "=" + 
Serdes.serdeFrom(innerClass).getClass().getName(),
+                "--property", "key.deserializer." + 
TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS + "=" + 
Serdes.serdeFrom(innerClass).getClass().getName(),
                 "--property", "key.deserializer.window.size.ms=500",
             };
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
index d97d85a6af3..c9b498d4054 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.TestUtils;
 
@@ -262,7 +263,7 @@ public class SelfJoinUpgradeIntegrationTest {
         
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
+        consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 
500L);
 
 
         final List<KeyValueTimestamp<K, V>> actual =
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index 5f972f7c4f4..2954fa9806d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -444,9 +444,12 @@ public class SlidingWindowedKStreamIntegrationTest {
         
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass().getName());
         
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass().getName());
-        consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 
windowSize);
-        if (keyDeserializer instanceof TimeWindowedDeserializer || 
keyDeserializer instanceof SessionWindowedDeserializer) {
-            
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
+        consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 
windowSize);
+        if (keyDeserializer instanceof TimeWindowedDeserializer) {
+            
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
+                Serdes.serdeFrom(innerClass).getClass().getName());
+        } else if (keyDeserializer instanceof SessionWindowedDeserializer) {
+            
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
                 Serdes.serdeFrom(innerClass).getClass().getName());
         }
         return 
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 14a07e569ee..53746cd7fd6 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -478,9 +478,12 @@ public class TimeWindowedKStreamIntegrationTest {
         
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass().getName());
         
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass().getName());
-        consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 
windowSize);
-        if (keyDeserializer instanceof TimeWindowedDeserializer || 
keyDeserializer instanceof SessionWindowedDeserializer) {
-            
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
+        consumerProperties.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 
windowSize);
+        if (keyDeserializer instanceof TimeWindowedDeserializer) {
+            
consumerProperties.setProperty(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
+                Serdes.serdeFrom(innerClass).getClass().getName());
+        } else if (keyDeserializer instanceof SessionWindowedDeserializer) {
+            
consumerProperties.setProperty(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
                 Serdes.serdeFrom(innerClass).getClass().getName());
         }
         return 
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6aedae59576..b98806590a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -45,6 +45,10 @@ import 
org.apache.kafka.streams.errors.ProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.internals.UpgradeFromValues;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.SessionWindowedSerializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.assignment.TaskAssignor;
@@ -829,13 +833,28 @@ public class StreamsConfig extends AbstractConfig {
         + CONFIG_ERROR_MSG
         + "\"NO_OPTIMIZATION\" by default.";
 
-    /** {@code windowed.inner.class.serde} */
+    /**
+     * {@code windowed.inner.class.serde}
+     *
+     * @deprecated since 4.1.0.
+     * Use {@link TimeWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for 
{@link TimeWindowedSerializer}.
+     * Use {@link TimeWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} 
for {@link TimeWindowedDeserializer}.
+     * Use {@link SessionWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} 
for {@link SessionWindowedSerializer}.
+     * Use {@link 
SessionWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link 
SessionWindowedDeserializer}.
+     */
+    @Deprecated
     public static final String WINDOWED_INNER_CLASS_SERDE = 
"windowed.inner.class.serde";
     private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default 
serializer / deserializer for the inner class of a windowed record. Must 
implement the " +
         "<code>org.apache.kafka.common.serialization.Serde</code> interface. 
Note that setting this config in KafkaStreams application would result " +
         "in an error as it is meant to be used only from Plain consumer 
client.";
 
-    /** {@code window.size.ms} */
+    /**
+     * {@code window.size.ms}
+     *
+     * @deprecated since 4.1.0.
+     * Use {@link TimeWindowedDeserializer#WINDOW_SIZE_MS_CONFIG} for {@link 
TimeWindowedDeserializer}.
+     */
+    @Deprecated
     public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
     private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the 
deserializer in order to calculate window end times.";
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
index e77efe799cd..11795459c9c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
@@ -23,10 +23,20 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.internals.SessionKeySchema;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class SessionWindowedDeserializer<T> implements 
Deserializer<Windowed<T>> {
 
+    /**
+     * Default deserializer for the inner deserializer class of a windowed 
record. Must implement the {@link Serde} interface.
+     */
+    public static final String WINDOWED_INNER_DESERIALIZER_CLASS = 
"windowed.inner.deserializer.class";
+
+    private final Logger log = 
LoggerFactory.getLogger(SessionWindowedDeserializer.class);
+
     private Deserializer<T> inner;
 
     // Default constructor needed by Kafka
@@ -36,34 +46,43 @@ public class SessionWindowedDeserializer<T> implements 
Deserializer<Windowed<T>>
         this.inner = inner;
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"deprecation", "unchecked"})
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
-        final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
-
-        Serde<T> windowInnerClassSerde = null;
+        String deserializerConfigKey = WINDOWED_INNER_DESERIALIZER_CLASS;
+        String deserializerConfigValue = (String) 
configs.get(WINDOWED_INNER_DESERIALIZER_CLASS);
+        if (deserializerConfigValue == null) {
+            final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
+            if (windowedInnerClassSerdeConfig != null) {
+                deserializerConfigKey = 
StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
+                deserializerConfigValue = windowedInnerClassSerdeConfig;
+                log.warn("Config {} is deprecated. Please use {} instead.",
+                    StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
WINDOWED_INNER_DESERIALIZER_CLASS);
+            }
+        }
 
-        if (windowedInnerClassSerdeConfig != null) {
+        Serde<T> windowedInnerDeserializerClass = null;
+        if (deserializerConfigValue != null) {
             try {
-                windowInnerClassSerde = 
Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
+                windowedInnerDeserializerClass = 
Utils.newInstance(deserializerConfigValue, Serde.class);
             } catch (final ClassNotFoundException e) {
-                throw new 
ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
windowedInnerClassSerdeConfig,
-                    "Serde class " + windowedInnerClassSerdeConfig + " could 
not be found.");
+                throw new ConfigException(deserializerConfigKey, 
deserializerConfigValue,
+                    "Serde class " + deserializerConfigValue + " could not be 
found.");
             }
         }
 
-        if (inner != null && windowedInnerClassSerdeConfig != null) {
-            if 
(!inner.getClass().getName().equals(windowInnerClassSerde.deserializer().getClass().getName()))
 {
+        if (inner != null && deserializerConfigValue != null) {
+            if 
(!inner.getClass().getName().equals(windowedInnerDeserializerClass.deserializer().getClass().getName()))
 {
                 throw new IllegalArgumentException("Inner class deserializer 
set using constructor "
                     + "(" + inner.getClass().getName() + ")" +
-                    " is different from the one set in 
windowed.inner.class.serde config " +
-                    "(" + 
windowInnerClassSerde.deserializer().getClass().getName() + ").");
+                    " is different from the one set in " + 
deserializerConfigKey + " config " +
+                    "(" + 
windowedInnerDeserializerClass.deserializer().getClass().getName() + ").");
             }
-        } else if (inner == null && windowedInnerClassSerdeConfig == null) {
+        } else if (inner == null && deserializerConfigValue == null) {
             throw new IllegalArgumentException("Inner class deserializer 
should be set either via constructor " +
-                "or via the windowed.inner.class.serde config");
+                "or via the " + WINDOWED_INNER_DESERIALIZER_CLASS + " config");
         } else if (inner == null)
-            inner = windowInnerClassSerde.deserializer();
+            inner = windowedInnerDeserializerClass.deserializer();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
index 6ec10bf8668..60e2b81d497 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedSerializer.java
@@ -24,10 +24,20 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.state.internals.SessionKeySchema;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class SessionWindowedSerializer<T> implements WindowedSerializer<T> {
 
+    /**
+     * Default serializer for the inner serializer class of a windowed record. 
Must implement the {@link Serde} interface.
+     */
+    public static final String WINDOWED_INNER_SERIALIZER_CLASS = 
"windowed.inner.serializer.class";
+
+    private final Logger log = 
LoggerFactory.getLogger(SessionWindowedSerializer.class);
+
     private Serializer<T> inner;
 
     // Default constructor needed by Kafka
@@ -37,32 +47,42 @@ public class SessionWindowedSerializer<T> implements 
WindowedSerializer<T> {
         this.inner = inner;
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"deprecation", "unchecked"})
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
-        final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
-        Serde<T> windowInnerClassSerde = null;
-        if (windowedInnerClassSerdeConfig != null) {
+        String serializerConfigKey = WINDOWED_INNER_SERIALIZER_CLASS;
+        String serializerConfigValue = (String) 
configs.get(WINDOWED_INNER_SERIALIZER_CLASS);
+        if (serializerConfigValue == null) {
+            final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
+            if (windowedInnerClassSerdeConfig != null) {
+                serializerConfigKey = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
+                serializerConfigValue = windowedInnerClassSerdeConfig;
+                log.warn("Config {} is deprecated. Please use {} instead.",
+                    StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
WINDOWED_INNER_SERIALIZER_CLASS);
+            }
+        }
+        Serde<T> windowedInnerSerializerClass = null;
+        if (serializerConfigValue != null) {
             try {
-                windowInnerClassSerde = 
Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
+                windowedInnerSerializerClass = 
Utils.newInstance(serializerConfigValue, Serde.class);
             } catch (final ClassNotFoundException e) {
-                throw new 
ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
windowedInnerClassSerdeConfig,
-                    "Serde class " + windowedInnerClassSerdeConfig + " could 
not be found.");
+                throw new ConfigException(serializerConfigKey, 
serializerConfigValue,
+                    "Serde class " + serializerConfigValue + " could not be 
found.");
             }
         }
 
-        if (inner != null && windowedInnerClassSerdeConfig != null) {
-            if 
(!inner.getClass().getName().equals(windowInnerClassSerde.serializer().getClass().getName()))
 {
+        if (inner != null && serializerConfigValue != null) {
+            if 
(!inner.getClass().getName().equals(windowedInnerSerializerClass.serializer().getClass().getName()))
 {
                 throw new IllegalArgumentException("Inner class serializer set 
using constructor "
                     + "(" + inner.getClass().getName() + ")" +
-                    " is different from the one set in 
windowed.inner.class.serde config " +
-                    "(" + 
windowInnerClassSerde.serializer().getClass().getName() + ").");
+                    " is different from the one set in " + serializerConfigKey 
+ " config " +
+                    "(" + 
windowedInnerSerializerClass.serializer().getClass().getName() + ").");
             }
-        } else if (inner == null && windowedInnerClassSerdeConfig == null) {
+        } else if (inner == null && serializerConfigValue == null) {
             throw new IllegalArgumentException("Inner class serializer should 
be set either via constructor " +
-                "or via the windowed.inner.class.serde config");
+                "or via the " + WINDOWED_INNER_SERIALIZER_CLASS + " config");
         } else if (inner == null)
-            inner = windowInnerClassSerde.serializer();
+            inner = windowedInnerSerializerClass.serializer();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
index 77825f2e1fa..26fcbac785f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
@@ -23,10 +23,25 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.internals.WindowKeySchema;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> {
 
+    /**
+     * Sets window size for the deserializer in order to calculate window end 
times.
+     */
+    public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
+
+    /**
+     * Default deserializer for the inner deserializer class of a windowed 
record. Must implement the {@link Serde} interface.
+     */
+    public static final String WINDOWED_INNER_DESERIALIZER_CLASS = 
"windowed.inner.deserializer.class";
+
+    private final Logger log = 
LoggerFactory.getLogger(TimeWindowedDeserializer.class);
+
     private Long windowSize;
     private boolean isChangelogTopic;
 
@@ -47,50 +62,10 @@ public class TimeWindowedDeserializer<T> implements 
Deserializer<Windowed<T>> {
         return this.windowSize;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
-        //check to see if the window size config is set and the window size is 
already set from the constructor
-        final Long configWindowSize;
-        if (configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG) instanceof 
String) {
-            configWindowSize = Long.parseLong((String) 
configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG));
-        } else {
-            configWindowSize = (Long) 
configs.get(StreamsConfig.WINDOW_SIZE_MS_CONFIG);
-        }
-        if (windowSize != null && configWindowSize != null) {
-            throw new IllegalArgumentException("Window size should not be set 
in both the time windowed deserializer constructor and the window.size.ms 
config");
-        } else if (windowSize == null && configWindowSize == null) {
-            throw new IllegalArgumentException("Window size needs to be set 
either through the time windowed deserializer " +
-                "constructor or the window.size.ms config but not both");
-        } else {
-            windowSize = windowSize == null ? configWindowSize : windowSize;
-        }
-
-        final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
-
-        Serde<T> windowInnerClassSerde = null;
-
-        if (windowedInnerClassSerdeConfig != null) {
-            try {
-                windowInnerClassSerde = 
Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
-            } catch (final ClassNotFoundException e) {
-                throw new 
ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
windowedInnerClassSerdeConfig,
-                    "Serde class " + windowedInnerClassSerdeConfig + " could 
not be found.");
-            }
-        }
-
-        if (inner != null && windowedInnerClassSerdeConfig != null) {
-            if 
(!inner.getClass().getName().equals(windowInnerClassSerde.deserializer().getClass().getName()))
 {
-                throw new IllegalArgumentException("Inner class deserializer 
set using constructor "
-                    + "(" + inner.getClass().getName() + ")" +
-                    " is different from the one set in 
windowed.inner.class.serde config " +
-                    "(" + 
windowInnerClassSerde.deserializer().getClass().getName() + ").");
-            }
-        } else if (inner == null && windowedInnerClassSerdeConfig == null) {
-            throw new IllegalArgumentException("Inner class deserializer 
should be set either via  constructor " +
-                "or via the windowed.inner.class.serde config");
-        } else if (inner == null)
-            inner = windowInnerClassSerde.deserializer();
+        configureWindowSizeMs(configs);
+        configureWindowInnerDeserializerClass(configs);
     }
 
     @Override
@@ -125,4 +100,60 @@ public class TimeWindowedDeserializer<T> implements 
Deserializer<Windowed<T>> {
     Deserializer<T> innerDeserializer() {
         return inner;
     }
+
+    private void configureWindowSizeMs(final Map<String, ?> configs) {
+        //check to see if the window size config is set and the window size is 
already set from the constructor
+        final Long configWindowSize;
+        if (configs.get(WINDOW_SIZE_MS_CONFIG) instanceof String) {
+            configWindowSize = Long.parseLong((String) 
configs.get(WINDOW_SIZE_MS_CONFIG));
+        } else {
+            configWindowSize = (Long) configs.get(WINDOW_SIZE_MS_CONFIG);
+        }
+        if (windowSize != null && configWindowSize != null) {
+            throw new IllegalArgumentException("Window size should not be set 
in both the time windowed deserializer constructor and the window.size.ms 
config");
+        } else if (windowSize == null && configWindowSize == null) {
+            throw new IllegalArgumentException("Window size needs to be set 
either through the time windowed deserializer " +
+                "constructor or the window.size.ms config but not both");
+        } else {
+            windowSize = windowSize == null ? configWindowSize : windowSize;
+        }
+    }
+
+    @SuppressWarnings({"deprecation", "unchecked"})
+    private void configureWindowInnerDeserializerClass(final Map<String, ?> 
configs) {
+        String deserializerConfigKey = WINDOWED_INNER_DESERIALIZER_CLASS;
+        String deserializerConfigValue = (String) 
configs.get(WINDOWED_INNER_DESERIALIZER_CLASS);
+        if (deserializerConfigValue == null) {
+            final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
+            if (windowedInnerClassSerdeConfig != null) {
+                deserializerConfigKey = 
StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
+                deserializerConfigValue = windowedInnerClassSerdeConfig;
+                log.warn("Config {} is deprecated. Please use {} instead.",
+                    StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
WINDOWED_INNER_DESERIALIZER_CLASS);
+            }
+        }
+
+        Serde<T> windowedInnerDeserializerClass = null;
+        if (deserializerConfigValue != null) {
+            try {
+                windowedInnerDeserializerClass = 
Utils.newInstance(deserializerConfigValue, Serde.class);
+            } catch (final ClassNotFoundException e) {
+                throw new ConfigException(deserializerConfigKey, 
deserializerConfigValue,
+                    "Serde class " + deserializerConfigValue + " could not be 
found.");
+            }
+        }
+
+        if (inner != null && deserializerConfigValue != null) {
+            if 
(!inner.getClass().getName().equals(windowedInnerDeserializerClass.deserializer().getClass().getName()))
 {
+                throw new IllegalArgumentException("Inner class deserializer 
set using constructor "
+                    + "(" + inner.getClass().getName() + ")" +
+                    " is different from the one set in " + 
deserializerConfigKey + " config " +
+                    "(" + 
windowedInnerDeserializerClass.deserializer().getClass().getName() + ").");
+            }
+        } else if (inner == null && deserializerConfigValue == null) {
+            throw new IllegalArgumentException("Inner class deserializer 
should be set either via  constructor " +
+                "or via the " + WINDOWED_INNER_DESERIALIZER_CLASS + " config");
+        } else if (inner == null)
+            inner = windowedInnerDeserializerClass.deserializer();
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index 54bdd6a3b71..7cd13afc1d3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -24,10 +24,20 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.state.internals.WindowKeySchema;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class TimeWindowedSerializer<T> implements WindowedSerializer<T> {
 
+    /**
+     * Default serializer for the inner serializer class of a windowed record. 
Must implement the {@link Serde} interface.
+     */
+    public static final String WINDOWED_INNER_SERIALIZER_CLASS = 
"windowed.inner.serializer.class";
+
+    private final Logger log = 
LoggerFactory.getLogger(TimeWindowedSerializer.class);
+
     private Serializer<T> inner;
 
     // Default constructor needed by Kafka
@@ -38,32 +48,42 @@ public class TimeWindowedSerializer<T> implements 
WindowedSerializer<T> {
         this.inner = inner;
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"deprecation", "unchecked"})
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
-        final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
-        Serde<T> windowInnerClassSerde = null;
-        if (windowedInnerClassSerdeConfig != null) {
+        String serializerConfigKey = WINDOWED_INNER_SERIALIZER_CLASS;
+        String serializerConfigValue = (String) 
configs.get(WINDOWED_INNER_SERIALIZER_CLASS);
+        if (serializerConfigValue == null) {
+            final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
+            if (windowedInnerClassSerdeConfig != null) {
+                serializerConfigKey = StreamsConfig.WINDOWED_INNER_CLASS_SERDE;
+                serializerConfigValue = windowedInnerClassSerdeConfig;
+                log.warn("Config {} is deprecated. Please use {} instead.",
+                    StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
WINDOWED_INNER_SERIALIZER_CLASS);
+            }
+        }
+        Serde<T> windowedInnerSerializerClass = null;
+        if (serializerConfigValue != null) {
             try {
-                windowInnerClassSerde = 
Utils.newInstance(windowedInnerClassSerdeConfig, Serde.class);
+                windowedInnerSerializerClass = 
Utils.newInstance(serializerConfigValue, Serde.class);
             } catch (final ClassNotFoundException e) {
-                throw new 
ConfigException(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
windowedInnerClassSerdeConfig,
-                    "Serde class " + windowedInnerClassSerdeConfig + " could 
not be found.");
+                throw new ConfigException(serializerConfigKey, 
serializerConfigValue,
+                    "Serde class " + serializerConfigValue + " could not be 
found.");
             }
         }
 
-        if (inner != null && windowedInnerClassSerdeConfig != null) {
-            if 
(!inner.getClass().getName().equals(windowInnerClassSerde.serializer().getClass().getName()))
 {
+        if (inner != null && serializerConfigValue != null) {
+            if 
(!inner.getClass().getName().equals(windowedInnerSerializerClass.serializer().getClass().getName()))
 {
                 throw new IllegalArgumentException("Inner class serializer set 
using constructor "
                     + "(" + inner.getClass().getName() + ")" +
-                    " is different from the one set in 
windowed.inner.class.serde config " +
-                    "(" + 
windowInnerClassSerde.serializer().getClass().getName() + ").");
+                    " is different from the one set in " + serializerConfigKey 
+ " config " +
+                    "(" + 
windowedInnerSerializerClass.serializer().getClass().getName() + ").");
             }
-        } else if (inner == null && windowedInnerClassSerdeConfig == null) {
+        } else if (inner == null && serializerConfigValue == null) {
             throw new IllegalArgumentException("Inner class serializer should 
be set either via constructor " +
-                "or via the windowed.inner.class.serde config");
+                "or via the " + WINDOWED_INNER_SERIALIZER_CLASS + " config");
         } else if (inner == null)
-            inner = windowInnerClassSerde.serializer();
+            inner = windowedInnerSerializerClass.serializer();
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
index 82f0e355307..2bace901779 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -45,7 +45,7 @@ public class SessionWindowedDeserializerTest {
     }
 
     @Test
-    public void shouldSetWindowedInnerClassDeserialiserThroughConfig() {
+    public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         final SessionWindowedDeserializer<?> deserializer = new 
SessionWindowedDeserializer<>();
         deserializer.configure(props, false);
@@ -53,20 +53,49 @@ public class SessionWindowedDeserializerTest {
     }
 
     @Test
-    public void shouldThrowErrorIfWindowInnerClassDeserialiserIsNotSet() {
+    public void 
shouldSetSerializerThroughWindowedInnerDeserializerClassConfig() {
+        
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        final SessionWindowedDeserializer<?> deserializer = new 
SessionWindowedDeserializer<>();
+        deserializer.configure(props, false);
+        assertInstanceOf(ByteArrayDeserializer.class, 
deserializer.innerDeserializer());
+    }
+
+    @Test
+    public void 
shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet()
 {
+        
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
"some.non.existent.class");
+        final SessionWindowedDeserializer<?> deserializer = new 
SessionWindowedDeserializer<>();
+        deserializer.configure(props, false);
+        assertInstanceOf(ByteArrayDeserializer.class, 
deserializer.innerDeserializer());
+    }
+
+    @Test
+    public void 
shouldThrowErrorIfWindowedInnerClassSerdeAndSessionWindowedDeserializerClassAreNotSet()
 {
         final SessionWindowedDeserializer<?> deserializer = new 
SessionWindowedDeserializer<>();
         assertThrows(IllegalArgumentException.class, () -> 
deserializer.configure(props, false));
     }
 
     @Test
-    public void 
shouldThrowErrorIfDeserialisersConflictInConstructorAndConfig() {
+    public void 
shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInnerClassSerdeConfig()
 {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         assertThrows(IllegalArgumentException.class, () -> 
sessionWindowedDeserializer.configure(props, false));
     }
 
     @Test
-    public void 
shouldThrowConfigExceptionWhenInvalidWindowInnerClassDeserialiserSupplied() {
+    public void 
shouldThrowErrorIfDeserializersConflictInConstructorAndWindowedInnerDeserializerClassConfig()
 {
+        
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        assertThrows(IllegalArgumentException.class, () -> 
sessionWindowedDeserializer.configure(props, false));
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
"some.non.existent.class");
         assertThrows(ConfigException.class, () -> 
sessionWindowedDeserializer.configure(props, false));
     }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerDeserializerClassSupplied() {
+        
props.put(SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
"some.non.existent.class");
+        assertThrows(ConfigException.class, () -> 
sessionWindowedDeserializer.configure(props, false));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
index 729c8408dfe..d7e30bc3fe4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -45,7 +45,7 @@ public class SessionWindowedSerializerTest {
     }
 
     @Test
-    public void shouldSetWindowedInnerClassSerialiserThroughConfig() {
+    public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         final SessionWindowedSerializer<?> serializer = new 
SessionWindowedSerializer<>();
         serializer.configure(props, false);
@@ -53,20 +53,49 @@ public class SessionWindowedSerializerTest {
     }
 
     @Test
-    public void shouldThrowErrorIfWindowInnerClassSerialiserIsNotSet() {
+    public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig() 
{
+        props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        final SessionWindowedSerializer<?> serializer = new 
SessionWindowedSerializer<>();
+        serializer.configure(props, false);
+        assertInstanceOf(ByteArraySerializer.class, 
serializer.innerSerializer());
+    }
+
+    @Test
+    public void 
shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet()
 {
+        props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
"some.non.existent.class");
+        final SessionWindowedSerializer<?> serializer = new 
SessionWindowedSerializer<>();
+        serializer.configure(props, false);
+        assertInstanceOf(ByteArraySerializer.class, 
serializer.innerSerializer());
+    }
+
+    @Test
+    public void 
shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet()
 {
         final SessionWindowedSerializer<?> serializer = new 
SessionWindowedSerializer<>();
         assertThrows(IllegalArgumentException.class, () -> 
serializer.configure(props, false));
     }
 
     @Test
-    public void shouldThrowErrorIfSerialisersConflictInConstructorAndConfig() {
+    public void 
shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerClassSerdeConfig()
 {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         assertThrows(IllegalArgumentException.class, () -> 
sessionWindowedSerializer.configure(props, false));
     }
 
     @Test
-    public void 
shouldThrowConfigExceptionWhenInvalidWindowInnerClassSerialiserSupplied() {
+    public void 
shouldThrowErrorIfSerializersConflictInConstructorAndWindowedInnerSerializerClassConfig()
 {
+        props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        assertThrows(IllegalArgumentException.class, () -> 
sessionWindowedSerializer.configure(props, false));
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
"some.non.existent.class");
         assertThrows(ConfigException.class, () -> 
sessionWindowedSerializer.configure(props, false));
     }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerSerializerClassSupplied() {
+        props.put(SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
"some.non.existent.class");
+        assertThrows(ConfigException.class, () -> 
sessionWindowedSerializer.configure(props, false));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
index b82b48d7730..bfb8c80cf09 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -49,7 +50,7 @@ public class TimeWindowedDeserializerTest {
     }
 
     @Test
-    public void 
shouldSetWindowSizeAndWindowedInnerDeserialiserThroughConfigs() {
+    public void 
shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerClassSerdeConfigs()
 {
         props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         final TimeWindowedDeserializer<?> deserializer = new 
TimeWindowedDeserializer<>();
@@ -59,34 +60,92 @@ public class TimeWindowedDeserializerTest {
     }
 
     @Test
-    public void shouldThrowErrorIfWindowSizeSetInConfigsAndConstructor() {
+    public void 
shouldSetWindowSizeAndDeserializerThroughWindowSizeMsAndWindowedInnerDeserializerClassConfigs()
 {
+        props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
+        props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        final TimeWindowedDeserializer<?> deserializer = new 
TimeWindowedDeserializer<>();
+        deserializer.configure(props, false);
+        assertThat(deserializer.getWindowSize(), is(500L));
+        assertInstanceOf(ByteArrayDeserializer.class, 
deserializer.innerDeserializer());
+    }
+
+    @Test
+    public void shouldHaveSameConfigNameForWindowSizeMs() {
+        assertEquals(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 
StreamsConfig.WINDOW_SIZE_MS_CONFIG);
+    }
+
+    @Test
+    public void 
shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerDeserializerClassConfigIsSet()
 {
+        props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
+        props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
"some.non.existent.class");
+        final TimeWindowedDeserializer<?> deserializer = new 
TimeWindowedDeserializer<>();
+        deserializer.configure(props, false);
+        assertThat(deserializer.getWindowSize(), is(500L));
+        assertInstanceOf(ByteArrayDeserializer.class, 
deserializer.innerDeserializer());
+    }
+
+    @Test
+    public void shouldThrowErrorIfWindowSizeSetInStreamsConfigAndConstructor() 
{
         props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
         assertThrows(IllegalArgumentException.class, () -> 
timeWindowedDeserializer.configure(props, false));
     }
 
     @Test
-    public void shouldThrowErrorIfWindowSizeIsNotSet() {
+    public void 
shouldThrowErrorIfWindowSizeSetInConstructorConfigAndConstructor() {
+        props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
+        assertThrows(IllegalArgumentException.class, () -> 
timeWindowedDeserializer.configure(props, false));
+    }
+
+    @Test
+    public void 
shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerClassSerdeIsSet() {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         final TimeWindowedDeserializer<?> deserializer = new 
TimeWindowedDeserializer<>();
         assertThrows(IllegalArgumentException.class, () -> 
deserializer.configure(props, false));
     }
 
     @Test
-    public void shouldThrowErrorIfWindowedInnerClassDeserialiserIsNotSet() {
+    public void 
shouldThrowErrorIfWindowSizeIsNotSetAndWindowedInnerDeserializerClassIsSet() {
+        props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, 
Serdes.ByteArraySerde.class.getName());
+        final TimeWindowedDeserializer<?> deserializer = new 
TimeWindowedDeserializer<>();
+        assertThrows(IllegalArgumentException.class, () -> 
deserializer.configure(props, false));
+    }
+
+    @Test
+    public void 
shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInStreamsConfigIsSet()
 {
         props.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, "500");
         final TimeWindowedDeserializer<?> deserializer = new 
TimeWindowedDeserializer<>();
         assertThrows(IllegalArgumentException.class, () -> 
deserializer.configure(props, false));
     }
 
     @Test
-    public void 
shouldThrowErrorIfWindowedInnerClassDeserialisersConflictInConstructorAndConfig()
 {
+    public void 
shouldThrowErrorIfWindowedInnerClassSerdeIsNotSetAndWindowSizeMsInConstructorConfigIsSet()
 {
+        props.put(TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG, "500");
+        final TimeWindowedDeserializer<?> deserializer = new 
TimeWindowedDeserializer<>();
+        assertThrows(IllegalArgumentException.class, () -> 
deserializer.configure(props, false));
+    }
+
+    @Test
+    public void 
shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerClassSerdeConfig()
 {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         assertThrows(IllegalArgumentException.class, () -> 
timeWindowedDeserializer.configure(props, false));
     }
 
     @Test
-    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassDeserialiserSupplied() {
+    public void 
shouldThrowErrorIfDeserializerConflictInConstructorAndWindowedInnerDeserializerClassConfig()
 {
+        props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        assertThrows(IllegalArgumentException.class, () -> 
timeWindowedDeserializer.configure(props, false));
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
"some.non.existent.class");
         assertThrows(ConfigException.class, () -> 
timeWindowedDeserializer.configure(props, false));
     }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerDeserializerClassSupplied() {
+        props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
"some.non.existent.class");
+        assertThrows(ConfigException.class, () -> 
timeWindowedDeserializer.configure(props, false));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
index ba5b9c339a4..7a13117db4a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
@@ -45,7 +45,7 @@ public class TimeWindowedSerializerTest {
     }
 
     @Test
-    public void shouldSetWindowedInnerClassSerialiserThroughConfig() {
+    public void shouldSetSerializerThroughWindowedInnerClassSerdeConfig() {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         final TimeWindowedSerializer<?> serializer = new 
TimeWindowedSerializer<>();
         serializer.configure(props, false);
@@ -53,21 +53,49 @@ public class TimeWindowedSerializerTest {
     }
 
     @Test
-    public void shouldThrowErrorIfWindowedInnerClassSerialiserIsNotSet() {
+    public void shouldSetSerializerThroughWindowedInnerSerializerClassConfig() 
{
+        props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        final TimeWindowedSerializer<?> serializer = new 
TimeWindowedSerializer<>();
+        serializer.configure(props, false);
+        assertInstanceOf(ByteArraySerializer.class, 
serializer.innerSerializer());
+    }
+
+    @Test
+    public void 
shouldIgnoreWindowedInnerClassSerdeConfigIfWindowedInnerSerializerClassConfigIsSet()
 {
+        props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
"some.non.existent.class");
+        final TimeWindowedSerializer<?> serializer = new 
TimeWindowedSerializer<>();
+        serializer.configure(props, false);
+        assertInstanceOf(ByteArraySerializer.class, 
serializer.innerSerializer());
+    }
+
+    @Test
+    public void 
shouldThrowErrorIfWindowedInnerClassSerdeAndWindowedInnerSerializerClassAreNotSet()
 {
         final TimeWindowedSerializer<?> serializer = new 
TimeWindowedSerializer<>();
         assertThrows(IllegalArgumentException.class, () -> 
serializer.configure(props, false));
     }
 
     @Test
-    public void 
shouldThrowErrorIfWindowedInnerClassSerialisersConflictInConstructorAndConfig() 
{
+    public void 
shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerClassSerdeConfig()
 {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
Serdes.ByteArraySerde.class.getName());
         assertThrows(IllegalArgumentException.class, () -> 
timeWindowedSerializer.configure(props, false));
     }
 
     @Test
-    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerialiserSupplied() {
+    public void 
shouldThrowErrorIfSerializerConflictInConstructorAndWindowedInnerSerializerClassConfig()
 {
+        props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
Serdes.ByteArraySerde.class.getName());
+        assertThrows(IllegalArgumentException.class, () -> 
timeWindowedSerializer.configure(props, false));
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerClassSerdeSupplied() {
         props.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, 
"some.non.existent.class");
         assertThrows(ConfigException.class, () -> 
timeWindowedSerializer.configure(props, false));
     }
 
+    @Test
+    public void 
shouldThrowConfigExceptionWhenInvalidWindowedInnerSerializerClassSupplied() {
+        props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
"some.non.existent.class");
+        assertThrows(ConfigException.class, () -> 
timeWindowedSerializer.configure(props, false));
+    }
 }

Reply via email to