This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3905d9  KAFKA-8613: New APIs for Controlling Grace Period for 
Windowed Operations (#10926)
b3905d9 is described below

commit b3905d9f71d48a60f2a9ee38014582d7ec7bc3c2
Author: Israel Ekpo <[email protected]>
AuthorDate: Wed Jun 30 20:09:19 2021 -0400

    KAFKA-8613: New APIs for Controlling Grace Period for Windowed Operations 
(#10926)
    
    Implements KIP-633.
    
    Grace-period is an important parameter and its best to make it the user's 
responsibility to set it expliclity. Thus, we move off to provide a default and 
make it a mandatory parameter when creating a window.
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>, Luke Chen 
<[email protected]>, Matthias J. Sax <[email protected]>
---
 .../examples/pageview/PageViewTypedDemo.java       |  4 +-
 .../examples/pageview/PageViewUntypedDemo.java     |  4 +-
 .../examples/temperature/TemperatureDemo.java      |  4 +-
 .../apache/kafka/streams/kstream/JoinWindows.java  | 45 ++++++++++++---
 .../kafka/streams/kstream/SessionWindows.java      | 67 ++++++++++++++++++----
 .../kafka/streams/kstream/SlidingWindows.java      | 53 +++++++++++++++--
 .../apache/kafka/streams/kstream/TimeWindows.java  | 66 +++++++++++++++++++--
 .../org/apache/kafka/streams/kstream/Windows.java  | 14 ++++-
 .../org/apache/kafka/streams/TopologyTest.java     |  1 +
 .../integration/AbstractResetIntegrationTest.java  |  1 +
 .../integration/InternalTopicIntegrationTest.java  |  1 +
 .../integration/JoinStoreIntegrationTest.java      |  1 +
 .../KStreamAggregationDedupIntegrationTest.java    |  1 +
 .../KStreamAggregationIntegrationTest.java         | 16 +++++-
 .../KStreamRepartitionIntegrationTest.java         |  1 +
 .../integration/MetricsIntegrationTest.java        |  1 +
 .../integration/QueryableStateIntegrationTest.java |  1 +
 .../integration/RocksDBMetricsIntegrationTest.java |  1 +
 .../kafka/streams/kstream/JoinWindowsTest.java     | 25 ++++++++
 .../kstream/RepartitionTopicNamingTest.java        |  1 +
 .../kafka/streams/kstream/SessionWindowsTest.java  | 23 ++++++++
 .../kafka/streams/kstream/SlidingWindowsTest.java  | 14 +++++
 .../kafka/streams/kstream/TimeWindowsTest.java     | 21 +++++++
 .../kstream/internals/KGroupedStreamImplTest.java  |  1 -
 .../streams/kstream/internals/KStreamImplTest.java | 24 ++++++++
 .../kstream/internals/KStreamKStreamJoinTest.java  |  1 +
 .../internals/KStreamKStreamLeftJoinTest.java      |  2 +
 .../internals/KStreamKStreamOuterJoinTest.java     |  1 +
 .../kstream/internals/KStreamRepartitionTest.java  |  1 +
 ...KStreamSessionWindowAggregateProcessorTest.java |  1 +
 .../KStreamSlidingWindowAggregateTest.java         |  6 +-
 .../internals/KStreamWindowAggregateTest.java      |  1 +
 .../SessionWindowedCogroupedKStreamImplTest.java   |  2 +-
 .../SlidingWindowedCogroupedKStreamImplTest.java   |  1 +
 .../kstream/internals/SuppressScenarioTest.java    |  1 +
 .../kstream/internals/SuppressTopologyTest.java    |  1 +
 .../streams/kstream/internals/TimeWindowTest.java  |  1 +
 .../TimeWindowedCogroupedKStreamImplTest.java      |  1 +
 .../kstream/internals/graph/StreamsGraphTest.java  |  1 +
 .../internals/RepartitionOptimizingTest.java       |  1 +
 .../internals/StreamsPartitionAssignorTest.java    |  1 +
 .../kafka/streams/tests/SmokeTestClient.java       |  1 +
 .../kafka/streams/tests/StreamsOptimizedTest.java  |  1 +
 .../kafka/test/GenericInMemoryKeyValueStore.java   |  1 +
 .../GenericInMemoryTimestampedKeyValueStore.java   |  1 +
 .../kafka/streams/scala/kstream/KStream.scala      |  1 +
 .../apache/kafka/streams/scala/TopologyTest.scala  | 27 +++++----
 .../kafka/streams/scala/kstream/KStreamTest.scala  | 28 ++-------
 .../kafka/streams/scala/kstream/KTableTest.scala   |  9 +--
 49 files changed, 404 insertions(+), 79 deletions(-)

diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 1b467eb..a5086de 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -191,6 +191,8 @@ public class PageViewTypedDemo {
 
         final KTable<String, UserProfile> users = 
builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new 
JSONSerde<>()));
 
+        final Duration duration24Hours = Duration.ofHours(24);
+
         final KStream<WindowedPageViewByRegion, RegionCount> regionCount = 
views
             .leftJoin(users, (view, profile) -> {
                 final PageViewByRegion viewByRegion = new PageViewByRegion();
@@ -206,7 +208,7 @@ public class PageViewTypedDemo {
             })
             .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, 
viewRegion))
             .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
-            
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7), 
duration24Hours).advanceBy(Duration.ofSeconds(1)))
             .count()
             .toStream()
             .map((key, value) -> {
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 2a9972b..cdb3639 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -78,6 +78,8 @@ public class PageViewUntypedDemo {
 
         final KTable<String, String> userRegions = users.mapValues(record -> 
record.get("region").textValue());
 
+        final Duration duration24Hours = Duration.ofHours(24);
+
         final KStream<JsonNode, JsonNode> regionCount = views
             .leftJoin(userRegions, (view, region) -> {
                 final ObjectNode jNode = JsonNodeFactory.instance.objectNode();
@@ -88,7 +90,7 @@ public class PageViewUntypedDemo {
             })
             .map((user, viewRegion) -> new 
KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
             .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
-            
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7), 
duration24Hours).advanceBy(Duration.ofSeconds(1)))
             .count()
             .toStream()
             .map((key, value) -> {
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 4d63d30..6384466 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -79,6 +79,8 @@ public class TemperatureDemo {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
 
+        final Duration duration24Hours = Duration.ofHours(24);
+
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<String, String> source = 
builder.stream("iot-temperature");
@@ -88,7 +90,7 @@ public class TemperatureDemo {
             // to group and reduce them, a key is needed ("temp" has been 
chosen)
             .selectKey((key, value) -> "temp")
             .groupByKey()
-            
.windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE)))
+            
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE),
 duration24Hours))
             .reduce((value1, value2) -> {
                 if (Integer.parseInt(value1) > Integer.parseInt(value2)) {
                     return value1;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 2641286..84e3f7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -79,10 +79,7 @@ public class JoinWindows extends Windows<Window> {
     protected final boolean enableSpuriousResultFix;
 
     protected JoinWindows(final JoinWindows joinWindows) {
-        beforeMs = joinWindows.beforeMs;
-        afterMs = joinWindows.afterMs;
-        graceMs = joinWindows.graceMs;
-        enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+        this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, 
joinWindows.enableSpuriousResultFix);
     }
 
     private JoinWindows(final long beforeMs,
@@ -92,32 +89,62 @@ public class JoinWindows extends Windows<Window> {
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving
+     * after the window end will be dropped. The delay is defined as 
(stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition 
and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end 
will be dropped.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @return a new JoinWindows object with the window definition and no 
grace period. Note that this means out of order records arriving after the 
window end will be dropped
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return a new JoinWindows object with the window definition with and 
grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead
      */
+    @Deprecated
     public static JoinWindows of(final Duration timeDifference) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefix);
-        return new JoinWindows(timeDifferenceMs, timeDifferenceMs, 
DEFAULT_GRACE_PERIOD_MS, false);
+        return new JoinWindows(timeDifferenceMs, timeDifferenceMs, 
DEPRECATED_OLD_24_HR_GRACE_PERIOD, false);
     }
 
     /**
@@ -177,7 +204,9 @@ public class JoinWindows extends Windows<Window> {
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} instead
      */
+    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index f41dd67..65bcfd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -23,8 +23,9 @@ import java.util.Objects;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
-import static org.apache.kafka.streams.kstream.Windows.DEFAULT_GRACE_PERIOD_MS;
-
+import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
+import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
+import static java.time.Duration.ofMillis;
 
 /**
  * A session based window specification used for aggregating events into 
sessions.
@@ -78,23 +79,68 @@ public final class SessionWindows {
     private SessionWindows(final long gapMs, final long graceMs) {
         this.gapMs = gapMs;
         this.graceMs = graceMs;
+
+        if (gapMs <= 0) {
+            throw new IllegalArgumentException("Gap time cannot be zero or 
negative.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
     }
 
     /**
-     * Create a new window specification with the specified inactivity gap.
+     * Creates a new window specification with the specified inactivity gap.
+     * Using the method implicitly sets the grace period to zero which
+     * means that out of order records arriving after the window end will be 
dropped
+     *
+     * <p>
+     * Note that new events may change the boundaries of session windows, so 
aggressive
+     * close times can lead to surprising results in which an out-of-order 
event is rejected and then
+     * a subsequent event moves the window boundary forward.
      *
      * @param inactivityGap the gap of inactivity between sessions
-     * @return a new window specification with default maintain duration of 1 
day
+     * @return a window definition with the window size and no grace period. 
Note that this means out of order records arriving after the window end will be 
dropped
+     * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
+     */
+    public static SessionWindows ofInactivityGapWithNoGrace(final Duration 
inactivityGap) {
+        return ofInactivityGapAndGrace(inactivityGap, 
ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Creates a new window specification with the specified inactivity gap.
+     * Using the method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd} which
+     * means that out of order records arriving after the window end will be 
dropped
      *
+     * <p>
+     * Note that new events may change the boundaries of session windows, so 
aggressive
+     * close times can lead to surprising results in which an out-of-order 
event is rejected and then
+     * a subsequent event moves the window boundary forward.
+     *
+     * @param inactivityGap the gap of inactivity between sessions
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @return A SessionWindows object with the specified inactivity gap and 
grace period
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     */
+    public static SessionWindows ofInactivityGapAndGrace(final Duration 
inactivityGap, final Duration afterWindowEnd) {
+        return new SessionWindows(inactivityGap.toMillis(), 
afterWindowEnd.toMillis());
+    }
+
+
+    /**
+     * Create a new window specification with the specified inactivity gap.
+     *
+     * @param inactivityGap the gap of inactivity between sessions
+     * @return a new window specification without specifying a grace period 
(uses old default of 24 hours)
      * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofInactivityGapWithNoGrace(Duration)} 
 instead
      */
+    @Deprecated
     public static SessionWindows with(final Duration inactivityGap) {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
         final long inactivityGapMs = 
validateMillisecondDuration(inactivityGap, msgPrefix);
-        if (inactivityGapMs <= 0) {
-            throw new IllegalArgumentException("Gap time (inactivityGapMs) 
cannot be zero or negative.");
-        }
-        return new SessionWindows(inactivityGapMs, DEFAULT_GRACE_PERIOD_MS);
+
+        return new SessionWindows(inactivityGapMs, 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
     }
 
     /**
@@ -108,13 +154,12 @@ public final class SessionWindows {
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, 
Duration)} instead
      */
+    @Deprecated
     public SessionWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEndMs < 0) {
-            throw new IllegalArgumentException("Grace period must not be 
negative.");
-        }
 
         return new SessionWindows(gapMs, afterWindowEndMs);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
index 189770f..2cbda6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -17,10 +17,14 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.processor.TimestampExtractor;
+
 import java.time.Duration;
 import java.util.Objects;
+
+import static java.time.Duration.ofMillis;
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
 
 /**
  * A sliding window used for aggregating events.
@@ -78,6 +82,45 @@ public final class SlidingWindows {
     private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
         this.timeDifferenceMs = timeDifferenceMs;
         this.graceMs = graceMs;
+
+        if (timeDifferenceMs < 0) {
+            throw new IllegalArgumentException("Window time difference must 
not be negative.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Window grace period must not 
be negative.");
+        }
+    }
+
+    /**
+     * Return a window definition with the window size
+     * Using the method implicitly sets the grace period to zero which means 
that
+     * out of order records arriving after the window end will be dropped
+     *
+     * @param timeDifference the max time difference (inclusive) between two 
records in a window
+     * @return a new window definition with no grace period. Note that this 
means out of order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the timeDifference is negative
+     */
+    public static SlidingWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) throws IllegalArgumentException {
+        return ofTimeDifferenceAndGrace(timeDifference, 
ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Return a window definition with the window size based on the given 
maximum time difference (inclusive) between
+     * records in the same window and given window grace period. Reject 
out-of-order events that arrive after {@code afterWindowEnd}.
+     * A window is closed when {@code stream-time > window-end + grace-period}.
+     *
+     * @param timeDifference the max time difference (inclusive) between two 
records in a window
+     * @param afterWindowEnd  the grace period to admit out-of-order events to 
a window
+     * @return a new window definition with the specified grace period
+     * @throws IllegalArgumentException if the timeDifference or 
afterWindowEnd (grace period) is negative
+     */
+    public static SlidingWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) throws IllegalArgumentException {
+
+        final long timeDifferenceMs = timeDifference.toMillis();
+        final long afterWindowEndMs = afterWindowEnd.toMillis();
+
+        return new SlidingWindows(timeDifferenceMs, afterWindowEndMs);
     }
 
     /**
@@ -89,18 +132,16 @@ public final class SlidingWindows {
      * @param grace the grace period to admit out-of-order events to a window
      * @return a new window definition
      * @throws IllegalArgumentException if the specified window size is &lt; 0 
or grace &lt; 0, or either can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} or {@link 
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
      */
+    @Deprecated
     public static SlidingWindows withTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration grace) throws IllegalArgumentException {
         final String msgPrefixSize = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefixSize);
-        if (timeDifferenceMs < 0) {
-            throw new IllegalArgumentException("Window time difference must 
not be negative.");
-        }
+
         final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, 
"grace");
         final long graceMs = validateMillisecondDuration(grace, 
msgPrefixGrace);
-        if (graceMs < 0) {
-            throw new IllegalArgumentException("Window grace period must not 
be negative.");
-        }
+
         return new SlidingWindows(timeDifferenceMs, graceMs);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 2149556..7970085 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static java.time.Duration.ofMillis;
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 
@@ -72,6 +73,14 @@ public final class TimeWindows extends Windows<TimeWindow> {
         this.sizeMs = sizeMs;
         this.advanceMs = advanceMs;
         this.graceMs = graceMs;
+
+        if (sizeMs <= 0) {
+            throw new IllegalArgumentException("Window size (sizeMs) must be 
larger than zero.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
     }
 
     /**
@@ -81,18 +90,63 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+     * Using the method implicitly sets the grace period to zero which means
+     * that out of order records arriving after the window end will be dropped
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period. Note that 
this means out of order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws 
IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the 
advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, 
N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+     * Using the method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd} which means
+     * that out of order records arriving after the window end will be dropped.
+     *
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param size The size of the window. Must be larger than zero
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window. Must be non-negative.
+     * @return a TimeWindows object with the specified size and the specified 
grace period
+     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeAndGrace(final Duration size, final 
Duration afterWindowEnd)
+            throws IllegalArgumentException {
+
+        final long sizeMs = size.toMillis();
+        final long afterWindowEndMs = afterWindowEnd.toMillis();
+
+        return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the 
advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, 
N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+     *
+     * @param size The size of the window
+     * @return a new window definition without specifying the grace period 
(uses old default of 24 hours)
      * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
      */
+    @Deprecated
     public static TimeWindows of(final Duration size) throws 
IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
         final long sizeMs = validateMillisecondDuration(size, msgPrefix);
-        if (sizeMs <= 0) {
-            throw new IllegalArgumentException("Window size (sizeMs) must be 
larger than zero.");
-        }
-        return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS);
+
+        return new TimeWindows(sizeMs, sizeMs, 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
     }
 
     /**
@@ -142,7 +196,9 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
      * @throws IllegalArgumentException if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofSizeAndGrace(Duration, Duration)} 
instead
      */
+    @Deprecated
     public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index bece9e0..f0204d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -38,9 +38,17 @@ import java.util.Map;
  */
 public abstract class Windows<W extends Window> {
 
-    // By default grace period is 24 hours for all windows,
-    // in other words we allow out-of-order data for up to a day
-    protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+    /**
+     * By default grace period is 24 hours for all windows in other words we 
allow out-of-order data for up to a day
+     * This behavior is now deprecated and additional details are available in 
the motivation for the KIP
+     * Check out <a 
href="https://cwiki.apache.org/confluence/x/Ho2NCg";>KIP-633</a> for more details
+     */
+    protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 
60 * 1000L;
+
+    /**
+     * This constant is used as the specified grace period where we do not 
have any grace periods instead of magic constants
+     */
+    protected static final long NO_GRACE_PERIOD = 0L;
 
     protected Windows() {}
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 048a55b..9a178d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -62,6 +62,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class TopologyTest {
 
     private final StoreBuilder<MockKeyValueStore> storeBuilder = 
EasyMock.createNiceMock(StoreBuilder.class);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index c2b39b5..fd5da12 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -341,6 +341,7 @@ public abstract class AbstractResetIntegrationTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     private Topology setupTopologyWithIntermediateTopic(final boolean 
useRepartitioned,
                                                         final String 
outputTopic2) {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index eb241c5..504d9f6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -71,6 +71,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests related to internal topics in streams
  */
+@SuppressWarnings("deprecation")
 @Category({IntegrationTest.class})
 public class InternalTopicIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index f5ed891..59d8603 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -49,6 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThrows;
 
+@SuppressWarnings("deprecation")
 @Category({IntegrationTest.class})
 public class JoinStoreIntegrationTest {
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 8eba6de..4fe35a6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -65,6 +65,7 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
  * by virtue of having a large commit interval
  */
 @Category({IntegrationTest.class})
+@SuppressWarnings("deprecation")
 public class KStreamAggregationDedupIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final long COMMIT_INTERVAL_MS = 300L;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index b7b1f4e..1b92ab5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -98,7 +98,7 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "deprecation"})
 @Category({IntegrationTest.class})
 public class KStreamAggregationIntegrationTest {
     private static final int NUM_BROKERS = 1;
@@ -209,6 +209,7 @@ public class KStreamAggregationIntegrationTest {
         return keyComparison;
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceWindowed() throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -219,6 +220,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondBatchTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
+        //noinspection deprecation
         groupedStream
                 .windowedBy(TimeWindows.of(ofMillis(500L)))
                 .reduce(reducer)
@@ -318,6 +320,7 @@ public class KStreamAggregationIntegrationTest {
         )));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAggregateWindowed() throws Exception {
         final long firstTimestamp = mockTime.milliseconds();
@@ -328,6 +331,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
+        //noinspection deprecation
         groupedStream.windowedBy(TimeWindows.of(ofMillis(500L)))
                 .aggregate(
                         initializer,
@@ -442,12 +446,14 @@ public class KStreamAggregationIntegrationTest {
         shouldCountHelper();
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldGroupByKey() throws Exception {
         final long timestamp = mockTime.milliseconds();
         produceMessages(timestamp);
         produceMessages(timestamp);
 
+        //noinspection deprecation
         stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
                 .windowedBy(TimeWindows.of(ofMillis(500L)))
                 .count()
@@ -476,6 +482,7 @@ public class KStreamAggregationIntegrationTest {
         )));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceSlidingWindows() throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -487,6 +494,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(thirdBatchTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
+        //noinspection deprecation
         groupedStream
                 
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
                 .reduce(reducer)
@@ -580,6 +588,7 @@ public class KStreamAggregationIntegrationTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldAggregateSlidingWindows() throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -591,6 +600,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(thirdBatchTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
+        //noinspection deprecation
         
groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
 ofMinutes(5)))
                 .aggregate(
                         initializer,
@@ -689,6 +699,7 @@ public class KStreamAggregationIntegrationTest {
 
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCountSessionWindows() throws Exception {
         final long sessionGap = 5 * 60 * 1000L;
@@ -761,6 +772,7 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, KeyValue<Long, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
 
+        //noinspection deprecation
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
                 .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                 .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
@@ -797,6 +809,7 @@ public class KStreamAggregationIntegrationTest {
         assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, 
t3))), equalTo(KeyValue.pair(1L, t3)));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceSessionWindows() throws Exception {
         final long sessionGap = 1000L; // something to do with time
@@ -869,6 +882,7 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, KeyValue<String, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
         final String userSessionsStore = "UserSessionsStore";
+        //noinspection deprecation
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
                 .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                 .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 4ae4fdb..c2dee61 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -81,6 +81,7 @@ import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
 @Category({IntegrationTest.class})
+@SuppressWarnings("deprecation")
 public class KStreamRepartitionIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index b8ee31b..84f5cfc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -66,6 +66,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category({IntegrationTest.class})
+@SuppressWarnings("deprecation")
 public class MetricsIntegrationTest {
 
     private static final int NUM_BROKERS = 1;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 4e9b2b5..d07648b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -120,6 +120,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 
 @Category({IntegrationTest.class})
+@SuppressWarnings("deprecation")
 public class QueryableStateIntegrationTest {
     private static final Logger log = 
LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 648cfda..c698d06 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -69,6 +69,7 @@ import static org.hamcrest.Matchers.notNullValue;
 
 @Category({IntegrationTest.class})
 @RunWith(Parameterized.class)
+@SuppressWarnings("deprecation")
 public class RocksDBMetricsIntegrationTest {
 
     private static final int NUM_BROKERS = 3;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index cf9b6d6c..ca1512f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -28,11 +28,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class JoinWindowsTest {
 
     private static final long ANY_SIZE = 123L;
     private static final long ANY_OTHER_SIZE = 456L; // should be larger than 
anySize
+    private static final long ANY_GRACE = 1024L;
 
+    @SuppressWarnings("deprecation")
     @Test
     public void validWindows() {
         JoinWindows.of(ofMillis(ANY_OTHER_SIZE))   // [ -anyOtherSize ; 
anyOtherSize ]
@@ -69,6 +72,8 @@ public class JoinWindowsTest {
     @Test
     public void timeDifferenceMustNotBeNegative() {
         assertThrows(IllegalArgumentException.class, () -> 
JoinWindows.of(ofMillis(-1)));
+        assertThrows(IllegalArgumentException.class, () -> 
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(-1)));
+        assertThrows(IllegalArgumentException.class, () -> 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(-1), ofMillis(ANY_GRACE)));
     }
 
     @Test
@@ -133,6 +138,16 @@ public class JoinWindowsTest {
             
JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
             
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
         );
+
+        verifyEquality(
+                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)),
+                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
+        );
+
+        verifyEquality(
+                JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)),
+                JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4))
+        );
     }
 
     @Test
@@ -162,5 +177,15 @@ public class JoinWindowsTest {
             
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)),
             
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
         );
+
+        verifyInEquality(
+                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(9)),
+                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
+        );
+
+        verifyInEquality(
+                JoinWindows.ofTimeDifferenceAndGrace(ofMillis(9), ofMillis(9)),
+                JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(9))
+        );
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 5545fb6..cad978c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -38,6 +38,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class RepartitionTopicNamingTest {
 
     private final KeyValueMapper<String, String, String> kvMapper = (k, v) -> 
k + v;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 14104d69..f38be3c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -25,12 +25,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class SessionWindowsTest {
 
     @Test
     public void shouldSetWindowGap() {
         final long anyGap = 42L;
+        final long anyGrace = 1024L;
+
         assertEquals(anyGap, 
SessionWindows.with(ofMillis(anyGap)).inactivityGap());
+        assertEquals(anyGap, 
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(anyGap)).inactivityGap());
+        assertEquals(anyGap, 
SessionWindows.ofInactivityGapAndGrace(ofMillis(anyGap), 
ofMillis(anyGrace)).inactivityGap());
     }
 
     @Test
@@ -66,6 +71,15 @@ public class SessionWindowsTest {
     public void equalsAndHashcodeShouldBeValidForPositiveCases() {
         verifyEquality(SessionWindows.with(ofMillis(1)), 
SessionWindows.with(ofMillis(1)));
 
+        verifyEquality(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)),
+                SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1))
+        );
+
+        verifyEquality(
+                SessionWindows.ofInactivityGapAndGrace(ofMillis(1), 
ofMillis(11)),
+                SessionWindows.ofInactivityGapAndGrace(ofMillis(1), 
ofMillis(11))
+        );
+
         verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)), 
SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
 
         verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(7)), 
SessionWindows.with(ofMillis(1)).grace(ofMillis(7)));
@@ -75,6 +89,15 @@ public class SessionWindowsTest {
 
     @Test
     public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+
+        verifyInEquality(
+                SessionWindows.ofInactivityGapWithNoGrace(ofMillis(9)),
+                SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)));
+
+        verifyInEquality(
+                SessionWindows.ofInactivityGapAndGrace(ofMillis(9), 
ofMillis(9)),
+                SessionWindows.ofInactivityGapAndGrace(ofMillis(1), 
ofMillis(9)));
+
         verifyInEquality(SessionWindows.with(ofMillis(9)), 
SessionWindows.with(ofMillis(1)));
 
         verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), 
SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
index f6c63a3..dd06984 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
@@ -24,13 +24,17 @@ import static 
org.apache.kafka.streams.EqualityCheck.verifyInEquality;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 
+@SuppressWarnings("deprecation")
 public class SlidingWindowsTest {
 
     private static final long ANY_SIZE = 123L;
+    private static final long ANY_GRACE = 1024L;
 
     @Test
     public void shouldSetTimeDifference() {
         assertEquals(ANY_SIZE, 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), 
ofMillis(3)).timeDifferenceMs());
+        assertEquals(ANY_SIZE, 
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(ANY_SIZE), 
ofMillis(ANY_GRACE)).timeDifferenceMs());
+        assertEquals(ANY_SIZE, 
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(ANY_SIZE)).timeDifferenceMs());
     }
 
     @Test
@@ -56,6 +60,16 @@ public class SlidingWindowsTest {
                 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(grace)),
                 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(grace))
         );
+
+        verifyEquality(
+                
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(grace)),
+                
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(grace))
+        );
+
+        verifyEquality(
+                
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(timeDifference)),
+                
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(timeDifference))
+        );
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 765bad5..25a607d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -29,13 +29,17 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")
 public class TimeWindowsTest {
 
     private static final long ANY_SIZE = 123L;
+    private static final long ANY_GRACE = 1024L;
 
     @Test
     public void shouldSetWindowSize() {
         assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).sizeMs);
+        assertEquals(ANY_SIZE, 
TimeWindows.ofSizeWithNoGrace(ofMillis(ANY_SIZE)).sizeMs);
+        assertEquals(ANY_SIZE, TimeWindows.ofSizeAndGrace(ofMillis(ANY_SIZE), 
ofMillis(ANY_GRACE)).sizeMs);
     }
 
     @Test
@@ -140,10 +144,27 @@ public class TimeWindowsTest {
             
TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)),
             
TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4))
         );
+
+        verifyEquality(TimeWindows.ofSizeWithNoGrace(ofMillis(3)), 
TimeWindows.ofSizeWithNoGrace(ofMillis(3)));
+
+        verifyEquality(TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(33)),
+                TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(33))
+        );
     }
 
     @Test
     public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+
+        verifyInEquality(
+                TimeWindows.ofSizeWithNoGrace(ofMillis(9)),
+                TimeWindows.ofSizeWithNoGrace(ofMillis(3))
+        );
+
+        verifyInEquality(
+                TimeWindows.ofSizeAndGrace(ofMillis(9), ofMillis(9)),
+                TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(9))
+        );
+
         verifyInEquality(TimeWindows.of(ofMillis(9)), 
TimeWindows.of(ofMillis(3)));
 
         verifyInEquality(TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)), 
TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index b710e24..eba39a7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -61,7 +61,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 
-
 @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KGroupedStreamImplTest {
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 8464ab9..9753453 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -100,6 +100,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
+
 @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamImplTest {
 
@@ -704,6 +705,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("materialized can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullOtherStreamOnJoin() {
         final NullPointerException exception = assertThrows(
@@ -712,6 +714,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("otherStream can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullOtherStreamOnJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -724,6 +727,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("otherStream can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerOnJoin() {
         final NullPointerException exception = assertThrows(
@@ -732,6 +736,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerWithKeyOnJoin() {
         final NullPointerException exception = assertThrows(
@@ -740,6 +745,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerOnJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -752,6 +758,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerWithKeyOnJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -784,6 +791,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("windows can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullStreamJoinedOnJoin() {
         final NullPointerException exception = assertThrows(
@@ -796,6 +804,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("streamJoined can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullOtherStreamOnLeftJoin() {
         final NullPointerException exception = assertThrows(
@@ -804,6 +813,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("otherStream can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullOtherStreamOnLeftJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -816,6 +826,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("otherStream can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerOnLeftJoin() {
         final NullPointerException exception = assertThrows(
@@ -824,6 +835,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoin() {
         final NullPointerException exception = assertThrows(
@@ -832,6 +844,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerOnLeftJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -844,6 +857,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
shouldNotAllowNullValueJoinerWithKeyOnLeftJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -877,6 +891,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("windows can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullStreamJoinedOnLeftJoin() {
         final NullPointerException exception = assertThrows(
@@ -889,6 +904,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("streamJoined can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullOtherStreamOnOuterJoin() {
         final NullPointerException exception = assertThrows(
@@ -897,6 +913,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("otherStream can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullOtherStreamOnOuterJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -909,6 +926,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("otherStream can't be 
null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerOnOuterJoin() {
         final NullPointerException exception = assertThrows(
@@ -917,6 +935,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerWithKeyOnOuterJoin() {
         final NullPointerException exception = assertThrows(
@@ -925,6 +944,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullValueJoinerOnOuterJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -937,6 +957,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("joiner can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
shouldNotAllowNullValueJoinerWithKeyOnOuterJoinWithStreamJoined() {
         final NullPointerException exception = assertThrows(
@@ -969,6 +990,7 @@ public class KStreamImplTest {
         assertThat(exception.getMessage(), equalTo("windows can't be null"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotAllowNullStreamJoinedOnOuterJoin() {
         final NullPointerException exception = assertThrows(
@@ -1511,6 +1533,7 @@ public class KStreamImplTest {
         assertThat(mockProcessors.get(1).processed(), 
equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0))));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreatedWithRetention()
 {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -1538,6 +1561,7 @@ public class KStreamImplTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated()
 {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 1757584..1d50a37 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -63,6 +63,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
+
 @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamKStreamJoinTest {
     private final String topic1 = "topic1";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index eb705f1..bc20312 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -53,6 +53,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
+
 @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamKStreamLeftJoinTest {
     private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@@ -97,6 +98,7 @@ public class KStreamKStreamLeftJoinTest {
             false
         );
     }
+
     @Test
     public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
         runLeftJoinWithoutSpuriousResultFix(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index b4c0827..39ed039 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -52,6 +52,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
+
 @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamKStreamOuterJoinTest {
     private final String topic1 = "topic1";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
index 04bbda8..0344f46 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
@@ -59,6 +59,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThrows;
 
+@SuppressWarnings("deprecation")
 @RunWith(EasyMockRunner.class)
 public class KStreamRepartitionTest {
     private final String inputTopic = "input-topic";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 3022a36..f4ebfdd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -69,6 +69,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+
 @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 public class KStreamSessionWindowAggregateProcessorTest {
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index cf6efec..798159d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -711,7 +711,7 @@ public class KStreamSlidingWindowAggregateTest {
         builder
                 .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(100)))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(100)))
                 .aggregate(MockInitializer.STRING_INIT, 
MockAggregator.toStringInstance("+"), Materialized.<String, String, 
WindowStore<Bytes, 
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()));
 
         props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, 
builtInMetricsVersion);
@@ -743,7 +743,7 @@ public class KStreamSlidingWindowAggregateTest {
 
         final KStream<String, String> stream1 = builder.stream(topic, 
Consumed.with(Serdes.String(), Serdes.String()));
         stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(90)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(90)))
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -807,7 +807,7 @@ public class KStreamSlidingWindowAggregateTest {
         final KTable<Windowed<String>, String> table = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(10000)))
+            .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(10000)))
             // The aggregator needs to sort the strings so the window value is 
the same for the final windows even when
             // records are processed in a different order. Here, we sort 
alphabetically.
             .aggregate(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 39a7444..b7759bb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -62,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@SuppressWarnings("deprecation")
 public class KStreamWindowAggregateTest {
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
index 2946417..eee7cc5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
@@ -50,7 +50,7 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
-
+@SuppressWarnings("deprecation")
 public class SessionWindowedCogroupedKStreamImplTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 96d301d..52ff858 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+@SuppressWarnings("deprecation")
 public class SlidingWindowedCogroupedKStreamImplTest {
 
     private static final String TOPIC = "topic";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index e0b7957..7b521ab 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -70,6 +70,7 @@ import static 
org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@SuppressWarnings("deprecation")
 public class SuppressScenarioTest {
     private static final StringDeserializer STRING_DESERIALIZER = new 
StringDeserializer();
     private static final StringSerializer STRING_SERIALIZER = new 
StringSerializer();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
index d775c89..d775796 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
@@ -39,6 +39,7 @@ import static 
org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
+@SuppressWarnings("deprecation")
 public class SuppressTopologyTest {
     private static final Serde<String> STRING_SERDE = Serdes.String();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
index f905d32..c558eab 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
+@SuppressWarnings("deprecation")
 public class TimeWindowTest {
 
     private long start = 50;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
index d052429..cd9ca19 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+@SuppressWarnings("deprecation")
 public class TimeWindowedCogroupedKStreamImplTest {
 
     private static final Long WINDOW_SIZE = 500L;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 8926a1e..b912a96 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -53,6 +53,7 @@ import java.util.regex.Pattern;
 import static java.time.Duration.ofMillis;
 import static org.junit.Assert.assertEquals;
 
+@SuppressWarnings("deprecation")
 public class StreamsGraphTest {
 
     private final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index 3b7eb78..d1eb5b5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -66,6 +66,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
+@SuppressWarnings("deprecation")
 public class RepartitionOptimizingTest {
 
     private final Logger log = 
LoggerFactory.getLogger(RepartitionOptimizingTest.class);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 860ed73..7244bd6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -137,6 +137,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(value = Parameterized.class)
+@SuppressWarnings("deprecation")
 public class StreamsPartitionAssignorTest {
     private static final String CONSUMER_1 = "consumer1";
     private static final String CONSUMER_2 = "consumer2";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 2bd4437..86f7583 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
+@SuppressWarnings("deprecation")
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
index da063c0..714aa11 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -47,6 +47,7 @@ import java.util.regex.Pattern;
 
 import static java.time.Duration.ofMillis;
 
+@SuppressWarnings("deprecation")
 public class StreamsOptimizedTest {
 
     public static void main(final String[] args) throws Exception {
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index d9a2afe..7c3af25 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -58,6 +58,7 @@ public class GenericInMemoryKeyValueStore<K extends 
Comparable, V>
         return this.name;
     }
 
+    @SuppressWarnings("deprecation")
     @Deprecated
     @Override
     /* This is a "dummy" store used for testing;
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
index 2198d18..114ea06 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
@@ -37,6 +37,7 @@ import java.util.TreeMap;
  * This class is a generic version of the in-memory key-value store that is 
useful for testing when you
  *  need a basic KeyValueStore for arbitrary types and don't have/want to 
write a serde
  */
+@SuppressWarnings("deprecation")
 public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
     extends WrappedStateStore<StateStore, K, ValueAndTimestamp<V>>
     implements TimestampedKeyValueStore<K, V> {
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index d3f83e0..d097b85 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -54,6 +54,7 @@ import scala.jdk.CollectionConverters._
  * @param inner The underlying Java abstraction for KStream
  * @see `org.apache.kafka.streams.kstream.KStream`
  */
+//noinspection ScalaDeprecation
 class KStream[K, V](val inner: KStreamJ[K, V]) {
 
   /**
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 9653ddb..92ca5bd 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -51,6 +51,7 @@ import scala.jdk.CollectionConverters._
 /**
  * Test suite that verifies that the topology built by the Java and Scala APIs 
match.
  */
+//noinspection ScalaDeprecation
 class TopologyTest {
 
   private val inputTopic = "input-topic"
@@ -377,14 +378,16 @@ class TopologyTest {
 
       mappedStream
         .filter((k: String, _: String) => k == "A")
-        .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, 
JoinWindows.of(Duration.ofMillis(5000)))(
+        .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString,
+                       
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), 
Duration.ofHours(24)))(
           StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, 
NewSerdes.intSerde)
         )
         .to(JOINED_TOPIC)
 
       mappedStream
         .filter((k: String, _: String) => k == "A")
-        .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, 
JoinWindows.of(Duration.ofMillis(5000)))(
+        .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString,
+                       
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), 
Duration.ofHours(24)))(
           StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, 
NewSerdes.stringSerde)
         )
         .to(JOINED_TOPIC)
@@ -433,18 +436,22 @@ class TopologyTest {
 
       mappedStream
         .filter((key, _) => key == "A")
-        .join[Integer, String](stream2,
-                               valueJoiner2,
-                               JoinWindows.of(Duration.ofMillis(5000)),
-                               StreamJoinedJ.`with`(NewSerdes.stringSerde, 
NewSerdes.stringSerde, SerdesJ.Integer))
+        .join[Integer, String](
+          stream2,
+          valueJoiner2,
+          JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), 
Duration.ofHours(24)),
+          StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, 
SerdesJ.Integer)
+        )
         .to(JOINED_TOPIC)
 
       mappedStream
         .filter((key, _) => key == "A")
-        .join(stream3,
-              valueJoiner3,
-              JoinWindows.of(Duration.ofMillis(5000)),
-              StreamJoinedJ.`with`(NewSerdes.stringSerde, 
NewSerdes.stringSerde, SerdesJ.String))
+        .join(
+          stream3,
+          valueJoiner3,
+          JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), 
Duration.ofHours(24)),
+          StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, 
SerdesJ.String)
+        )
         .to(JOINED_TOPIC)
 
       builder
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 1d8a1f1..0ec7b0e 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -17,9 +17,7 @@
 package org.apache.kafka.streams.scala.kstream
 
 import java.time.Duration.ofSeconds
-import java.time.Instant
-import java.util.regex.Pattern
-
+import java.time.{Duration, Instant}
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{
   JoinWindows,
@@ -192,6 +190,7 @@ class KStreamTest extends TestDriver {
     testDriver.close()
   }
 
+  //noinspection ScalaDeprecation
   @Test
   def testJoinCorrectlyRecords(): Unit = {
     val builder = new StreamsBuilder()
@@ -201,7 +200,9 @@ class KStreamTest extends TestDriver {
 
     val stream1 = builder.stream[String, String](sourceTopic1)
     val stream2 = builder.stream[String, String](sourceTopic2)
-    stream1.join(stream2)((a, b) => s"$a-$b", 
JoinWindows.of(ofSeconds(1))).to(sinkTopic)
+    stream1
+      .join(stream2)((a, b) => s"$a-$b", 
JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(1), Duration.ofHours(24)))
+      .to(sinkTopic)
 
     val now = Instant.now()
 
@@ -464,23 +465,4 @@ class KStreamTest extends TestDriver {
     val transformNode = 
builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
     assertEquals("my-name", transformNode.name())
   }
-
-  @Test
-  def testSettingNameOnStream(): Unit = {
-    val builder = new StreamsBuilder()
-    val topicsPattern = "t-[A-Za-z0-9-].suffix"
-    val sinkTopic = "sink"
-
-    builder
-      .stream[String, String](Pattern.compile(topicsPattern))(
-        Consumed.`with`[String, String].withName("my-fancy-name")
-      )
-      .to(sinkTopic)
-
-    import scala.jdk.CollectionConverters._
-
-    val streamNode = 
builder.build().describe().subtopologies().asScala.head.nodes().asScala.head
-    assertEquals("my-fancy-name", streamNode.name())
-  }
-
 }
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index 15e090d..09a3a7d 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -36,6 +36,7 @@ import java.time.Duration.ofMillis
 
 import scala.jdk.CollectionConverters._
 
+//noinspection ScalaDeprecation
 class KTableTest extends TestDriver {
 
   @Test
@@ -166,7 +167,7 @@ class KTableTest extends TestDriver {
     val builder = new StreamsBuilder()
     val sourceTopic = "source"
     val sinkTopic = "sink"
-    val window = TimeWindows.of(Duration.ofSeconds(1L))
+    val window = TimeWindows.ofSizeAndGrace(Duration.ofSeconds(1L), 
Duration.ofHours(24))
     val suppression = 
JSuppressed.untilTimeLimit[Windowed[String]](Duration.ofSeconds(2L), 
BufferConfig.unbounded())
 
     val table: KTable[Windowed[String], Long] = builder
@@ -224,7 +225,7 @@ class KTableTest extends TestDriver {
     val builder = new StreamsBuilder()
     val sourceTopic = "source"
     val sinkTopic = "sink"
-    val window = SlidingWindows.withTimeDifferenceAndGrace(ofMillis(1000L), 
ofMillis(1000L))
+    val window = SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(1000L), 
ofMillis(1000L))
     val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
 
     val table: KTable[Windowed[String], Long] = builder
@@ -262,7 +263,7 @@ class KTableTest extends TestDriver {
     val builder = new StreamsBuilder()
     val sourceTopic = "source"
     val sinkTopic = "sink"
-    val window = 
TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(1L))
+    val window = TimeWindows.ofSizeAndGrace(Duration.ofSeconds(1L), 
Duration.ofSeconds(1L))
     val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
 
     val table: KTable[Windowed[String], Long] = builder
@@ -321,7 +322,7 @@ class KTableTest extends TestDriver {
     val sourceTopic = "source"
     val sinkTopic = "sink"
     // Very similar to 
SuppressScenarioTest.shouldSupportFinalResultsForSessionWindows
-    val window = 
SessionWindows.`with`(Duration.ofMillis(5L)).grace(Duration.ofMillis(10L))
+    val window = SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(5L), 
Duration.ofMillis(10L))
     val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
 
     val table: KTable[Windowed[String], Long] = builder

Reply via email to