[ 
https://issues.apache.org/jira/browse/KAFKA-7222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580061#comment-16580061
 ] 

ASF GitHub Bot commented on KAFKA-7222:
---------------------------------------

guozhangwang closed pull request #5369: KAFKA-7222: Add Windows grace period
URL: https://github.com/apache/kafka/pull/5369
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 863ae9509ba..bd3117548c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -133,12 +133,20 @@ public long size() {
         return beforeMs + afterMs;
     }
 
+    @Override
+    public JoinWindows grace(final long millisAfterWindowEnd) {
+        super.grace(millisAfterWindowEnd);
+        return this;
+    }
+
     /**
      * @param durationMs the window retention time in milliseconds
      * @return itself
      * @throws IllegalArgumentException if {@code durationMs} is smaller than 
the window size
+     * @deprecated since 2.1. Use {@link JoinWindows#grace(long)} instead.
      */
     @Override
+    @Deprecated
     public JoinWindows until(final long durationMs) throws 
IllegalArgumentException {
         if (durationMs < size()) {
             throw new IllegalArgumentException("Window retention time 
(durationMs) cannot be smaller than the window size.");
@@ -153,30 +161,11 @@ public JoinWindows until(final long durationMs) throws 
IllegalArgumentException
      * For {@link TimeWindows} the maintain duration is at least as small as 
the window size.
      *
      * @return the window maintain duration
+     * @deprecated since 2.1. Use {@link JoinWindows#gracePeriodMs()} instead.
      */
     @Override
+    @Deprecated
     public long maintainMs() {
         return Math.max(super.maintainMs(), size());
     }
-
-    @Override
-    public final boolean equals(final Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof JoinWindows)) {
-            return false;
-        }
-
-        final JoinWindows other = (JoinWindows) o;
-        return beforeMs == other.beforeMs && afterMs == other.afterMs;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = (int) (beforeMs ^ (beforeMs >>> 32));
-        result = 31 * result + (int) (afterMs ^ (afterMs >>> 32));
-        return result;
-    }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index 89603fa7d3a..15ec6ce8772 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -60,6 +61,7 @@
     protected boolean loggingEnabled = true;
     protected boolean cachingEnabled = true;
     protected Map<String, String> topicConfig = new HashMap<>();
+    protected Duration retention;
 
     private Materialized(final StoreSupplier<S> storeSupplier) {
         this.storeSupplier = storeSupplier;
@@ -81,6 +83,7 @@ protected Materialized(final Materialized<K, V, S> 
materialized) {
         this.loggingEnabled = materialized.loggingEnabled;
         this.cachingEnabled = materialized.cachingEnabled;
         this.topicConfig = materialized.topicConfig;
+        this.retention = materialized.retention;
     }
 
     /**
@@ -101,6 +104,10 @@ protected Materialized(final Materialized<K, V, S> 
materialized) {
     /**
      * Materialize a {@link WindowStore} using the provided {@link 
WindowBytesStoreSupplier}.
      *
+     * Important: Custom subclasses are allowed here, but they should respect 
the retention contract:
+     * Window stores are required to retain windows at least as long as 
(window size + window grace period).
+     * Stores constructed via {@link org.apache.kafka.streams.state.Stores} 
already satisfy this contract.
+     *
      * @param supplier the {@link WindowBytesStoreSupplier} used to 
materialize the store
      * @param <K>      key type of the store
      * @param <V>      value type of the store
@@ -114,6 +121,10 @@ protected Materialized(final Materialized<K, V, S> 
materialized) {
     /**
      * Materialize a {@link SessionStore} using the provided {@link 
SessionBytesStoreSupplier}.
      *
+     * Important: Custom subclasses are allowed here, but they should respect 
the retention contract:
+     * Session stores are required to retain windows at least as long as 
(session inactivity gap + session grace period).
+     * Stores constructed via {@link org.apache.kafka.streams.state.Stores} 
already satisfy this contract.
+     *
      * @param supplier the {@link SessionBytesStoreSupplier} used to 
materialize the store
      * @param <K>      key type of the store
      * @param <V>      value type of the store
@@ -222,4 +233,22 @@ protected Materialized(final Materialized<K, V, S> 
materialized) {
         return this;
     }
 
+    /**
+     * Configure retention period for window and session stores. Ignored for 
key/value stores.
+     *
+     * Overridden by pre-configured store suppliers
+     * ({@link Materialized#as(SessionBytesStoreSupplier)} or {@link 
Materialized#as(WindowBytesStoreSupplier)}).
+     *
+     * Note that the retention period must be at least long enough to contain 
the windowed data's entire life cycle,
+     * from window-start through window-end, and for the entire grace period.
+     *
+     * @return itself
+     */
+    public Materialized<K, V, S> withRetention(final long retentionMs) {
+        if (retentionMs < 0) {
+            throw new IllegalArgumentException("Retention must not be 
negative.");
+        }
+        retention = Duration.ofMillis(retentionMs);
+        return this;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 36e7823648a..c5c44c6ff98 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -26,6 +26,8 @@
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.SessionStore;
 
+import java.time.Duration;
+
 /**
  * {@code SessionWindowedKStream} is an abstraction of a <i>windowed</i> 
record stream of {@link KeyValue} pairs.
  * It is an intermediate representation after a grouping and windowing of a 
{@link KStream} before an aggregation is applied to the
@@ -36,7 +38,7 @@
  * They have no fixed time boundaries, rather the size of the window is 
determined by the records.
  * Please see {@link SessionWindows} for more details.
  * <p>
- * {@link SessionWindows} are retained until their retention time expires 
(c.f. {@link SessionWindows#until(long)}).
+ * New events are added to {@link SessionWindows} until their grace period 
ends (see {@link SessionWindows#grace(Duration)}).
  *
  * Furthermore, updates are sent downstream into a windowed {@link KTable} 
changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index fc1fb9f5d13..96aea0a141e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+
+import java.time.Duration;
 
-import java.util.Objects;
 
 /**
  * A session based window specification used for aggregating events into 
sessions.
@@ -67,10 +69,13 @@
 
     private final long gapMs;
     private final long maintainDurationMs;
+    private final Duration grace;
+
 
-    private SessionWindows(final long gapMs, final long maintainDurationMs) {
+    private SessionWindows(final long gapMs, final long maintainDurationMs, 
final Duration grace) {
         this.gapMs = gapMs;
         this.maintainDurationMs = maintainDurationMs;
+        this.grace = grace;
     }
 
     /**
@@ -86,7 +91,7 @@ public static SessionWindows with(final long inactivityGapMs) 
{
             throw new IllegalArgumentException("Gap time (inactivityGapMs) 
cannot be zero or negative.");
         }
         final long oneDayMs = 24 * 60 * 60_000L;
-        return new SessionWindows(inactivityGapMs, oneDayMs);
+        return new SessionWindows(inactivityGapMs, oneDayMs, null);
     }
 
     /**
@@ -95,13 +100,50 @@ public static SessionWindows with(final long 
inactivityGapMs) {
      *
      * @return itself
      * @throws IllegalArgumentException if {@code durationMs} is smaller than 
window gap
+     *
+     * @deprecated since 2.1. Use {@link Materialized#retention}
+     *             or directly configure the retention in a store supplier and 
use
+     *             {@link Materialized#as(SessionBytesStoreSupplier)}.
      */
+    @Deprecated
     public SessionWindows until(final long durationMs) throws 
IllegalArgumentException {
         if (durationMs < gapMs) {
             throw new IllegalArgumentException("Window retention time 
(durationMs) cannot be smaller than window gap.");
         }
 
-        return new SessionWindows(gapMs, durationMs);
+        return new SessionWindows(gapMs, durationMs, null);
+    }
+
+    /**
+     * Reject late events that arrive more than {@code afterWindowEnd}
+     * after the end of its window.
+     *
+     * Note that new events may change the boundaries of session windows, so 
aggressive
+     * close times can lead to surprising results in which a too-late event is 
rejected and then
+     * a subsequent event moves the window boundary forward.
+     *
+     * @param millisAfterWindowEnd The grace period to admit late-arriving 
events to a window.
+     * @return this updated builder
+     */
+    public SessionWindows grace(final long millisAfterWindowEnd) {
+        if (millisAfterWindowEnd < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
+        return new SessionWindows(
+            gapMs,
+            maintainDurationMs,
+            Duration.ofMillis(millisAfterWindowEnd)
+        );
+    }
+
+    @SuppressWarnings("deprecation") // continuing to support 
Windows#maintainMs/segmentInterval in fallback mode
+    public long gracePeriodMs() {
+
+        // NOTE: in the future, when we remove maintainMs,
+        // we should default the grace period to 24h to maintain the default 
behavior,
+        // or we can default to (24h - gapMs) if you want to be super accurate.
+        return grace != null ? grace.toMillis() : maintainMs() - 
inactivityGap();
     }
 
     /**
@@ -119,22 +161,11 @@ public long inactivityGap() {
      * For {@code SessionWindows} the maintain duration is at least as small 
as the window gap.
      *
      * @return the window maintain duration
+     * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
+    @Deprecated
     public long maintainMs() {
         return Math.max(maintainDurationMs, gapMs);
     }
 
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        final SessionWindows that = (SessionWindows) o;
-        return gapMs == that.gapMs &&
-                maintainDurationMs == that.maintainDurationMs;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(gapMs, maintainDurationMs);
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 03bc986bc66..d6f40825925 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -25,6 +25,8 @@
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.WindowStore;
 
+import java.time.Duration;
+
 /**
  * {@code TimeWindowedKStream} is an abstraction of a <i>windowed</i> record 
stream of {@link KeyValue} pairs.
  * It is an intermediate representation of a {@link KStream} in order to apply 
a windowed aggregation operation on the original
@@ -38,7 +40,9 @@
  * {@link TimeWindows}) or they define landmark windows (c.f. {@link 
UnlimitedWindows}).
  * The result is written into a local windowed {@link KeyValueStore} (which is 
basically an ever-updating
  * materialized view) that can be queried using the name provided in the 
{@link Materialized} instance.
- * Windows are retained until their retention time expires (c.f. {@link 
Windows#until(long)}).
+ *
+ * New events are added to windows until their grace period ends (see {@link 
Windows#grace(Duration)}).
+ *
  * Furthermore, updates to the store are sent downstream into a windowed 
{@link KTable} changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index c2b910df5b1..808b00691c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -119,12 +120,22 @@ public long size() {
         return sizeMs;
     }
 
+    @Override
+    public TimeWindows grace(final long millisAfterWindowEnd) {
+        super.grace(millisAfterWindowEnd);
+        return this;
+    }
+
     /**
      * @param durationMs the window retention time
      * @return itself
      * @throws IllegalArgumentException if {@code duration} is smaller than 
the window size
+     *
+     * @deprecated since 2.1. Use {@link Materialized#retention} or directly 
configure the retention in a store supplier
+     *             and use {@link Materialized#as(WindowBytesStoreSupplier)}.
      */
     @Override
+    @Deprecated
     public TimeWindows until(final long durationMs) throws 
IllegalArgumentException {
         if (durationMs < sizeMs) {
             throw new IllegalArgumentException("Window retention time 
(durationMs) cannot be smaller than the window size.");
@@ -139,29 +150,11 @@ public TimeWindows until(final long durationMs) throws 
IllegalArgumentException
      * For {@code TimeWindows} the maintain duration is at least as small as 
the window size.
      *
      * @return the window maintain duration
+     * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
     @Override
+    @Deprecated
     public long maintainMs() {
         return Math.max(super.maintainMs(), sizeMs);
     }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof TimeWindows)) {
-            return false;
-        }
-        final TimeWindows other = (TimeWindows) o;
-        return sizeMs == other.sizeMs && advanceMs == other.advanceMs;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = (int) (sizeMs ^ (sizeMs >>> 32));
-        result = 31 * result + (int) (advanceMs ^ (advanceMs >>> 32));
-        return result;
-    }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index a3b93381952..e795a2ce320 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -96,9 +96,11 @@ public long size() {
      * Throws an {@link IllegalArgumentException} because the retention time 
for unlimited windows is always infinite
      * and cannot be changed.
      *
-     * @throws IllegalArgumentException on every invocation
+     * @throws IllegalArgumentException on every invocation.
+     * @deprecated since 2.1.
      */
     @Override
+    @Deprecated
     public UnlimitedWindows until(final long durationMs) {
         throw new IllegalArgumentException("Window retention time (durationMs) 
cannot be set for UnlimitedWindows.");
     }
@@ -108,29 +110,23 @@ public UnlimitedWindows until(final long durationMs) {
      * The retention time for unlimited windows in infinite and thus 
represented as {@link Long#MAX_VALUE}.
      *
      * @return the window retention time that is {@link Long#MAX_VALUE}
+     * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
     @Override
+    @Deprecated
     public long maintainMs() {
         return Long.MAX_VALUE;
     }
 
+    /**
+     * Throws an {@link IllegalArgumentException} because the window never 
ends and the
+     * grace period is therefore meaningless.
+     *
+     * @throws IllegalArgumentException on every invocation
+     */
     @Override
-    public boolean equals(final Object o) {
-        if (o == this) {
-            return true;
-        }
-
-        if (!(o instanceof UnlimitedWindows)) {
-            return false;
-        }
-
-        final UnlimitedWindows other = (UnlimitedWindows) o;
-        return startMs == other.startMs;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) (startMs ^ (startMs >>> 32));
+    public UnlimitedWindows grace(final long millisAfterWindowEnd) {
+        throw new IllegalArgumentException("Grace period cannot be set for 
UnlimitedWindows.");
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 53ead1e9b73..adfc88a3493 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -17,15 +17,15 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
+import java.time.Duration;
 import java.util.Map;
 
 /**
- * The window specification interface for fixed size windows that is used to 
define window boundaries and window
- * maintain duration.
- * <p>
- * If not explicitly specified, the default maintain duration is 1 day.
- * For time semantics, see {@link TimestampExtractor}.
+ * The window specification interface for fixed size windows that is used to 
define window boundaries and grace period.
+ *
+ * Grace period defines how long to wait on late events, where lateness is 
defined as (stream_time - record_timestamp).
  *
  * @param <W> type of the window instance
  * @see TimeWindows
@@ -39,8 +39,43 @@
     private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day
     @Deprecated public int segments = 3;
 
+    private Duration grace;
+
     protected Windows() {}
 
+    /**
+     * Reject late events that arrive more than {@code millisAfterWindowEnd}
+     * after the end of its window.
+     *
+     * Lateness is defined as (stream_time - record_timestamp).
+     *
+     * @param millisAfterWindowEnd The grace period to admit late-arriving 
events to a window.
+     * @return this updated builder
+     */
+    public Windows<W> grace(final long millisAfterWindowEnd) {
+        if (millisAfterWindowEnd < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
+        grace = Duration.ofMillis(millisAfterWindowEnd);
+
+        return this;
+    }
+
+    /**
+     * Return the window grace period (the time to admit
+     * late-arriving events after the end of the window.)
+     *
+     * Lateness is defined as (stream_time - record_timestamp).
+     */
+    @SuppressWarnings("deprecation") // continuing to support 
Windows#maintainMs/segmentInterval in fallback mode
+    public long gracePeriodMs() {
+        // NOTE: in the future, when we remove maintainMs,
+        // we should default the grace period to 24h to maintain the default 
behavior,
+        // or we can default to (24h - size) if you want to be super accurate.
+        return grace != null ? grace.toMillis() : maintainMs() - size();
+    }
+
     /**
      * Set the window maintain duration (retention time) in milliseconds.
      * This retention time is a guaranteed <i>lower bound</i> for how long a 
window will be maintained.
@@ -48,8 +83,10 @@ protected Windows() {}
      * @param durationMs the window retention time in milliseconds
      * @return itself
      * @throws IllegalArgumentException if {@code durationMs} is negative
+     * @deprecated since 2.1. Use {@link Materialized#withRetention(long)}
+     *             or directly configure the retention in a store supplier and 
use {@link Materialized#as(WindowBytesStoreSupplier)}.
      */
-    // This should always get overridden to provide the correct return type 
and thus to avoid a cast
+    @Deprecated
     public Windows<W> until(final long durationMs) throws 
IllegalArgumentException {
         if (durationMs < 0) {
             throw new IllegalArgumentException("Window retention time 
(durationMs) cannot be negative.");
@@ -63,7 +100,9 @@ protected Windows() {}
      * Return the window maintain duration (retention time) in milliseconds.
      *
      * @return the window maintain duration
+     * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
+    @Deprecated
     public long maintainMs() {
         return maintainDurationMs;
     }
@@ -72,8 +111,9 @@ public long maintainMs() {
      * Return the segment interval in milliseconds.
      *
      * @return the segment interval
+     * @deprecated since 2.1. Instead, directly configure the segment interval 
in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
      */
-    @SuppressWarnings("deprecation") // The deprecation is on the public 
visibility of segments. We intend to make the field private later.
+    @Deprecated
     public long segmentInterval() {
         // Pinned arbitrarily to a minimum of 60 seconds. Profiling may 
indicate a different value is more efficient.
         final long minimumSegmentInterval = 60_000L;
@@ -81,6 +121,7 @@ public long segmentInterval() {
         return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
     }
 
+
     /**
      * Set the number of segments to be used for rolling the window store.
      * This function is not exposed to users but can be called by developers 
that extend this class.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index becb03db24c..803faf66e5a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -194,7 +194,7 @@
 
         final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new 
ProcessorGraphNode<>(name,
                                                                                
                    processorParameters,
-                                                                               
      true);
+                                                                               
                    true);
 
         mapProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
@@ -216,7 +216,7 @@
         final ProcessorParameters<? super K, ? super V> processorParameters = 
new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
 
 
-        final ProcessorGraphNode<? super  K, ? super V> mapValuesProcessorNode 
= new ProcessorGraphNode<>(name,
+        final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode 
= new ProcessorGraphNode<>(name,
                                                                                
                          processorParameters,
                                                                                
                          repartitionRequired);
         builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
@@ -235,7 +235,7 @@ public void print(final Printed<K, V> printed) {
 
         final ProcessorGraphNode<? super K, ? super V> printNode = new 
ProcessorGraphNode<>(name,
                                                                                
             processorParameters,
-                                                                            
false);
+                                                                               
             false);
         builder.addGraphNode(this.streamsGraphNode, printNode);
     }
 
@@ -250,7 +250,7 @@ public void print(final Printed<K, V> printed) {
 
         final ProcessorGraphNode<? super K, ? super V> flatMapNode = new 
ProcessorGraphNode<>(name,
                                                                                
               processorParameters,
-                                                                               
 true);
+                                                                               
               true);
         flatMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, flatMapNode);
@@ -375,8 +375,8 @@ public void foreach(final ForeachAction<? super K, ? super 
V> action) {
 
 
         final ProcessorGraphNode<? super K, ? super V> foreachNode = new 
ProcessorGraphNode<>(name,
-                                                                              
processorParameters,
-                                                                              
repartitionRequired);
+                                                                               
               processorParameters,
+                                                                               
               repartitionRequired);
         builder.addGraphNode(this.streamsGraphNode, foreachNode);
     }
 
@@ -595,7 +595,7 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         final String repartitionedSourceName = 
createRepartitionedSource(builder,
                                                                          
keySerde,
                                                                          
valSerde,
-                                                                          null,
+                                                                         null,
                                                                          name,
                                                                          
optimizableRepartitionNodeBuilder);
 
@@ -716,9 +716,9 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         final ProcessorParameters<K, V> processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
 
         final StreamTableJoinNode<K, V> streamTableJoinNode = new 
StreamTableJoinNode<>(name,
-                                                                               
   processorParameters,
-                                                                               
   new String[]{},
-                                                                               
   null);
+                                                                               
         processorParameters,
+                                                                               
         new String[] {},
+                                                                               
         null);
         builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, false, 
streamTableJoinNode);
@@ -819,17 +819,17 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
 
     }
 
-    private static <K, V> StoreBuilder<WindowStore<K, V>> 
createWindowedStateStore(final JoinWindows windows,
-                                                                               
    final Serde<K> keySerde,
-                                                                               
    final Serde<V> valueSerde,
-                                                                               
    final String storeName) {
+    @SuppressWarnings("deprecation") // continuing to support 
Windows#maintainMs/segmentInterval in fallback mode
+    private static <K, V> StoreBuilder<WindowStore<K, V>> 
joinWindowStoreBuilder(final String joinName,
+                                                                               
  final JoinWindows windows,
+                                                                               
  final Serde<K> keySerde,
+                                                                               
  final Serde<V> valueSerde) {
         return Stores.windowStoreBuilder(
             Stores.persistentWindowStore(
-                storeName,
-                windows.maintainMs(),
+                joinName + "-store",
+                windows.size() + windows.gracePeriodMs(),
                 windows.size(),
-                true,
-                windows.segmentInterval()
+                true
             ),
             keySerde,
             valueSerde
@@ -865,11 +865,11 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
             final StreamsGraphNode thisStreamsGraphNode = ((AbstractStream) 
lhs).streamsGraphNode;
             final StreamsGraphNode otherStreamsGraphNode = ((AbstractStream) 
other).streamsGraphNode;
 
-            final StoreBuilder<WindowStore<K1, V1>> thisWindowStore =
-                createWindowedStateStore(windows, joined.keySerde(), 
joined.valueSerde(), joinThisName + "-store");
 
+            final StoreBuilder<WindowStore<K1, V1>> thisWindowStore =
+                joinWindowStoreBuilder(joinThisName, windows, 
joined.keySerde(), joined.valueSerde());
             final StoreBuilder<WindowStore<K1, V2>> otherWindowStore =
-                createWindowedStateStore(windows, joined.keySerde(), 
joined.otherValueSerde(), joinOtherName + "-store");
+                joinWindowStoreBuilder(joinOtherName, windows, 
joined.keySerde(), joined.otherValueSerde());
 
             final KStreamJoinWindow<K1, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindowStore.name());
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 35dd7a640b0..5a3c897f781 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -23,9 +24,11 @@
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -73,12 +76,17 @@ public void enableSendingOldValues() {
         private SessionStore<K, Agg> store;
         private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
         private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
             metrics = (StreamsMetricsImpl) context.metrics();
+            lateRecordDropSensor = 
Sensors.lateRecordDropSensor(internalProcessorContext);
+
             store = (SessionStore<K, Agg>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
@@ -96,6 +104,8 @@ public void process(final K key, final V value) {
                 return;
             }
 
+            final long closeTime = internalProcessorContext.streamTime() - 
windows.gracePeriodMs();
+
             final long timestamp = context().timestamp();
             final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
             final SessionWindow newSessionWindow = new 
SessionWindow(timestamp, timestamp);
@@ -117,16 +127,25 @@ public void process(final K key, final V value) {
                 }
             }
 
-            agg = aggregator.apply(key, value, agg);
-            final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
-            if (!mergedWindow.equals(newSessionWindow)) {
-                for (final KeyValue<Windowed<K>, Agg> session : merged) {
-                    store.remove(session.key);
-                    tupleForwarder.maybeForward(session.key, null, 
session.value);
+            if (mergedWindow.end() > closeTime) {
+                if (!mergedWindow.equals(newSessionWindow)) {
+                    for (final KeyValue<Windowed<K>, Agg> session : merged) {
+                        store.remove(session.key);
+                        tupleForwarder.maybeForward(session.key, null, 
session.value);
+                    }
                 }
+
+                agg = aggregator.apply(key, value, agg);
+                final Windowed<K> sessionKey = new Windowed<>(key, 
mergedWindow);
+                store.put(sessionKey, agg);
+                tupleForwarder.maybeForward(sessionKey, agg, null);
+            } else {
+                LOG.debug(
+                    "Skipping record for expired window. key=[{}] topic=[{}] 
partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
+                    key, context().topic(), context().partition(), 
context().offset(), context().timestamp(), mergedWindow.start(), 
mergedWindow.end(), closeTime
+                );
+                lateRecordDropSensor.record();
             }
-            store.put(sessionKey, agg);
-            tupleForwarder.maybeForward(sessionKey, agg, null);
         }
 
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 40702f0fc8c..57542847b96 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -68,23 +70,24 @@ public void enableSendingOldValues() {
         private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            this.internalProcessorContext = (InternalProcessorContext) context;
+            internalProcessorContext = (InternalProcessorContext) context;
 
             metrics = (StreamsMetricsImpl) context.metrics();
 
+            lateRecordDropSensor = 
Sensors.lateRecordDropSensor(internalProcessorContext);
+
             windowStore = (WindowStore<K, Agg>) 
context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(windowStore, context, new 
ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), 
sendOldValues);
         }
 
         @Override
         public void process(final K key, final V value) {
-            // if the key is null, we do not need proceed aggregating the 
record
-            // the record with the table
             if (key == null) {
                 log.warn(
                     "Skipping record due to null key. value=[{}] topic=[{}] 
partition=[{}] offset=[{}]",
@@ -96,14 +99,15 @@ public void process(final K key, final V value) {
 
             // first get the matching windows
             final long timestamp = context().timestamp();
-            final long expiryTime = internalProcessorContext.streamTime() - 
windows.maintainMs();
+            final long closeTime = internalProcessorContext.streamTime() - 
windows.gracePeriodMs();
 
             final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
 
             // try update the window, and create the new window for the rest 
of unmatched window that do not exist yet
             for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
                 final Long windowStart = entry.getKey();
-                if (windowStart > expiryTime) {
+                final long windowEnd = entry.getValue().end();
+                if (windowEnd > closeTime) {
                     Agg oldAgg = windowStore.fetch(key, windowStart);
 
                     if (oldAgg == null) {
@@ -116,11 +120,11 @@ public void process(final K key, final V value) {
                     windowStore.put(key, newAgg, windowStart);
                     tupleForwarder.maybeForward(new Windowed<>(key, 
entry.getValue()), newAgg, oldAgg);
                 } else {
-                    log.warn(
-                        "Skipping record for expired window. key=[{}] 
topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{}] 
expiration=[{}]",
-                        key, context().topic(), context().partition(), 
context().offset(), context().timestamp(), windowStart, expiryTime
+                    log.debug(
+                        "Skipping record for expired window. key=[{}] 
topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) 
expiration=[{}]",
+                        key, context().topic(), context().partition(), 
context().offset(), context().timestamp(), windowStart, windowEnd, closeTime
                     );
-                    metrics.skippedRecordsSensor().record();
+                    lateRecordDropSensor.record();
                 }
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 2bc7326cd62..1955dea86a8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StoreSupplier;
 
+import java.time.Duration;
 import java.util.Map;
 
 public class MaterializedInternal<K, V, S extends StateStore> extends 
Materialized<K, V, S> {
@@ -73,4 +74,8 @@ public boolean cachingEnabled() {
     public boolean isQueryable() {
         return queriable;
     }
+
+    Duration retention() {
+        return retention;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 54611f7a814..98076e068ac 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -174,12 +174,26 @@
         );
     }
 
+    @SuppressWarnings("deprecation") // continuing to support 
SessionWindows#maintainMs in fallback mode
     private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final 
MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materialized) {
         SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) 
materialized.storeSupplier();
         if (supplier == null) {
+            // NOTE: in the future, when we remove Windows#maintainMs(), we 
should set the default retention
+            // to be (windows.inactivityGap() + windows.grace()). This will 
yield the same default behavior.
+            final long retentionPeriod = materialized.retention() != null ? 
materialized.retention().toMillis() : windows.maintainMs();
+
+            if ((windows.inactivityGap() + windows.gracePeriodMs()) > 
retentionPeriod) {
+                throw new IllegalArgumentException("The retention period of 
the session store "
+                                                       + 
materialized.storeName()
+                                                       + " must be no smaller 
than the session inactivity gap plus the"
+                                                       + " grace period."
+                                                       + " Got gap=[" + 
windows.inactivityGap() + "],"
+                                                       + " grace=[" + 
windows.gracePeriodMs() + "],"
+                                                       + " retention=[" + 
retentionPeriod + "]");
+            }
             supplier = Stores.persistentSessionStore(
                 materialized.storeName(),
-                windows.maintainMs()
+                retentionPeriod
             );
         }
         final StoreBuilder<SessionStore<K, VR>> builder = 
Stores.sessionStoreBuilder(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 0daaf0d9b98..5c5cfb2bed7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -56,10 +56,9 @@
                             final boolean repartitionRequired,
                             final StreamsGraphNode streamsGraphNode) {
         super(builder, name, sourceNodes, streamsGraphNode);
-        Objects.requireNonNull(windows, "windows can't be null");
         this.valSerde = valSerde;
         this.keySerde = keySerde;
-        this.windows = windows;
+        this.windows = Objects.requireNonNull(windows, "windows can't be 
null");
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, 
keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode);
     }
 
@@ -164,16 +163,51 @@
         );
     }
 
+    @SuppressWarnings("deprecation") // continuing to support 
Windows#maintainMs/segmentInterval in fallback mode
     private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final 
MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
         WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
         if (supplier == null) {
-            supplier = Stores.persistentWindowStore(
-                materialized.storeName(),
-                windows.maintainMs(),
-                windows.size(),
-                false,
-                windows.segmentInterval()
-            );
+            if (materialized.retention() != null) {
+                // new style retention: use Materialized retention and default 
segmentInterval
+                final long retentionPeriod = 
materialized.retention().toMillis();
+
+                if ((windows.size() + windows.gracePeriodMs()) > 
retentionPeriod) {
+                    throw new IllegalArgumentException("The retention period 
of the window store "
+                                                           + name + " must be 
no smaller than its window size plus the grace period."
+                                                           + " Got size=[" + 
windows.size() + "],"
+                                                           + " grace=[" + 
windows.gracePeriodMs() + "],"
+                                                           + " retention=[" + 
retentionPeriod + "]");
+                }
+
+                supplier = Stores.persistentWindowStore(
+                    materialized.storeName(),
+                    retentionPeriod,
+                    windows.size(),
+                    false
+                );
+
+            } else {
+                // old style retention: use deprecated Windows 
retention/segmentInterval.
+
+                // NOTE: in the future, when we remove Windows#maintainMs(), 
we should set the default retention
+                // to be (windows.size() + windows.grace()). This will yield 
the same default behavior.
+
+                if ((windows.size() + windows.gracePeriodMs()) > 
windows.maintainMs()) {
+                    throw new IllegalArgumentException("The retention period 
of the window store "
+                                                           + name + " must be 
no smaller than its window size plus the grace period."
+                                                           + " Got size=[" + 
windows.size() + "],"
+                                                           + " grace=[" + 
windows.gracePeriodMs() + "],"
+                                                           + " retention=[" + 
windows.maintainMs() + "]");
+                }
+
+                supplier = Stores.persistentWindowStore(
+                    materialized.storeName(),
+                    windows.maintainMs(),
+                    windows.size(),
+                    false,
+                    windows.segmentInterval()
+                );
+            }
         }
         final StoreBuilder<WindowStore<K, VR>> builder = 
Stores.windowStoreBuilder(
             supplier,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
new file mode 100644
index 00000000000..04c7150b9f7
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.metrics;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+public class Sensors {
+    private Sensors() {}
+
+    public static Sensor lateRecordDropSensor(final InternalProcessorContext 
context) {
+        final StreamsMetricsImpl metrics = context.metrics();
+        final Sensor sensor = metrics.nodeLevelSensor(
+            context.taskId().toString(),
+            context.currentNode().name(),
+            "late-record-drop",
+            Sensor.RecordingLevel.INFO
+        );
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            sensor,
+            "stream-processor-node-metrics",
+            metrics.tagMap("task-id", context.taskId().toString(), 
"processor-node-id", context.currentNode().name()),
+            "late-record-drop"
+        );
+        return sensor;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 3bda28d6572..6e965fb27fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -142,7 +142,10 @@ public String metricsScope() {
     /**
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
-     * @param retentionPeriod       length of time to retain data in the store 
(cannot be negative)
+     * @param retentionPeriod       length of time to retain data in the store 
(cannot be negative).
+     *                              Note that the retention period must be at 
least long enough to contain the
+     *                              windowed data's entire life cycle, from 
window-start through window-end,
+     *                              and for the entire grace period.
      * @param numSegments           number of db segments (cannot be zero or 
negative)
      * @param windowSize            size of the windows that are stored 
(cannot be negative). Note: the window size
      *                              is not stored with the records, so this 
value is used to compute the keys that
@@ -177,6 +180,9 @@ public static WindowBytesStoreSupplier 
persistentWindowStore(final String name,
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store 
(cannot be negative)
+     *                              Note that the retention period must be at 
least long enough to contain the
+     *                              windowed data's entire life cycle, from 
window-start through window-end,
+     *                              and for the entire grace period.
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
@@ -194,6 +200,9 @@ public static WindowBytesStoreSupplier 
persistentWindowStore(final String name,
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store 
(cannot be negative)
+     *                              Note that the retention period must be at 
least long enough to contain the
+     *                              windowed data's entire life cycle, from 
window-start through window-end,
+     *                              and for the entire grace period.
      * @param segmentInterval       size of segments in ms (cannot be negative)
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
@@ -227,6 +236,9 @@ public static WindowBytesStoreSupplier 
persistentWindowStore(final String name,
      * Create a persistent {@link SessionBytesStoreSupplier}.
      * @param name              name of the store (cannot be {@code null})
      * @param retentionPeriod   length ot time to retain data in the store 
(cannot be negative)
+     *                          Note that the retention period must be at 
least long enough to contain the
+     *                          windowed data's entire life cycle, from 
window-start through window-end,
+     *                          and for the entire grace period.
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      */
     public static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 6a9284cbf6c..fccb6c1cb59 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -25,6 +26,7 @@
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
@@ -38,20 +40,26 @@
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+
 class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
     private final String name;
     private final Segments segments;
+    private final String metricScope;
     private final KeySchema keySchema;
     private InternalProcessorContext context;
     private volatile boolean open;
     private Set<Segment> bulkLoadSegments;
+    private Sensor expiredRecordSensor;
 
     RocksDBSegmentedBytesStore(final String name,
+                               final String metricScope,
                                final long retention,
                                final long segmentInterval,
                                final KeySchema keySchema) {
         this.name = name;
+        this.metricScope = metricScope;
         this.keySchema = keySchema;
         this.segments = new Segments(name, retention, segmentInterval);
     }
@@ -79,26 +87,26 @@
                                    keySchema.hasNextCondition(keyFrom, keyTo, 
from, to),
                                    binaryFrom, binaryTo);
     }
-    
+
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
-        
+
         final List<Segment> searchSpace = segments.allSegments();
-        
+
         return new SegmentIterator(searchSpace.iterator(),
                                    keySchema.hasNextCondition(null, null, 0, 
Long.MAX_VALUE),
                                    null, null);
     }
-    
+
     @Override
     public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom, final 
long timeTo) {
         final List<Segment> searchSpace = segments.segments(timeFrom, timeTo);
-        
+
         return new SegmentIterator(searchSpace.iterator(),
                                    keySchema.hasNextCondition(null, null, 
timeFrom, timeTo),
                                    null, null);
     }
-    
+
     @Override
     public void remove(final Bytes key) {
         final Segment segment = 
segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
@@ -110,9 +118,11 @@ public void remove(final Bytes key) {
 
     @Override
     public void put(final Bytes key, final byte[] value) {
-        final long segmentId = 
segments.segmentId(keySchema.segmentTimestamp(key));
+        final long timestamp = keySchema.segmentTimestamp(key);
+        final long segmentId = segments.segmentId(timestamp);
         final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, 
context);
         if (segment == null) {
+            expiredRecordSensor.record();
             LOG.debug("Skipping record for expired segment.");
         } else {
             segment.put(key, value);
@@ -137,6 +147,23 @@ public String name() {
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = (InternalProcessorContext) context;
 
+        final StreamsMetricsImpl metrics = this.context.metrics();
+
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = metrics.storeLevelSensor(
+            taskName,
+            name(),
+            "expired-window-record-drop",
+            Sensor.RecordingLevel.INFO
+        );
+        addInvocationRateAndCount(
+            expiredRecordSensor,
+            "stream-" + metricScope + "-metrics",
+            metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
+            "expired-window-record-drop"
+        );
+
         
keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 root.name()));
 
         segments.openExisting(this.context);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 5610fb2b619..e88755b2fa1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -40,6 +40,7 @@ public String name() {
     public SessionStore<Bytes, byte[]> get() {
         final RocksDBSegmentedBytesStore segmented = new 
RocksDBSegmentedBytesStore(
             name,
+            metricsScope(),
             retentionPeriod,
             segmentIntervalMs(),
             new SessionKeySchema());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 5c7b09980a9..b9b72792d8b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -49,6 +49,7 @@ public String name() {
     public WindowStore<Bytes, byte[]> get() {
         final RocksDBSegmentedBytesStore segmentedBytesStore = new 
RocksDBSegmentedBytesStore(
                 name,
+                metricsScope(),
                 retentionPeriod,
                 segmentInterval,
                 new WindowKeySchema()
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 8e6c524c555..cf858b094c4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -72,7 +72,8 @@ Segment getSegmentForTimestamp(final long timestamp) {
     }
 
     Segment getOrCreateSegmentIfLive(final long segmentId, final 
InternalProcessorContext context) {
-        final long minLiveSegment = segmentId(context.streamTime() - 
retentionPeriod);
+        final long minLiveTimestamp = context.streamTime() - retentionPeriod;
+        final long minLiveSegment = segmentId(minLiveTimestamp);
 
         final Segment toReturn;
         if (segmentId >= minLiveSegment) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index ece157cd02e..49292b49acb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -187,7 +187,7 @@ public void shouldNotAllowToAddProcessorWithEmptyParents() {
     public void shouldNotAllowToAddProcessorWithNullParents() {
         topology.addSource("source", "topic-1");
         try {
-            topology.addProcessor("processor", new MockProcessorSupplier(), 
null);
+            topology.addProcessor("processor", new MockProcessorSupplier(), 
(String) null);
             fail("Should throw NullPointerException for processor when null 
parent names are provided");
         } catch (final NullPointerException expected) { }
     }
@@ -227,7 +227,7 @@ public void shouldNotAllowToAddSinkWithNullParents() {
         topology.addSource("source", "topic-1");
         topology.addProcessor("processor", new MockProcessorSupplier(), 
"source");
         try {
-            topology.addSink("sink", "topic-2", null);
+            topology.addSink("sink", "topic-2", (String) null);
             fail("Should throw NullPointerException for sink when null parent 
names are provided");
         } catch (final NullPointerException expected) { }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index a099ceab43f..fe7ee266657 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -197,9 +197,13 @@ public void 
shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Except
                 return 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
             }
         })
-                .groupBy(MockMapper.<String, String>selectValueMapper())
-                .windowedBy(TimeWindows.of(1000).until(2000))
-                .count(Materialized.<String, Long, 
WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows"));
+            .groupBy(MockMapper.<String, String>selectValueMapper())
+            .windowedBy(TimeWindows.of(1000).grace(0L))
+            .count(
+                Materialized
+                    .<String, Long, 
WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows")
+                    .withRetention(2_000L)
+            );
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsProp);
         streams.start();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index fecb8eaf7c5..c16950e02d7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -455,7 +455,6 @@ public void shouldGroupByKey() throws Exception {
     @Test
     public void shouldCountSessionWindows() throws Exception {
         final long sessionGap = 5 * 60 * 1000L;
-        final long maintainMillis = sessionGap * 3;
 
         final long t1 = mockTime.milliseconds() - 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
         final List<KeyValue<String, String>> t1Messages = Arrays.asList(new 
KeyValue<>("bob", "start"),
@@ -518,7 +517,7 @@ public void shouldCountSessionWindows() throws Exception {
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-                
.windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
+                .windowedBy(SessionWindows.with(sessionGap))
                 .count()
                 .toStream()
                 .transform(() -> new Transformer<Windowed<String>, Long, 
KeyValue<Object, Object>>() {
@@ -554,7 +553,6 @@ public void close() {}
     @Test
     public void shouldReduceSessionWindows() throws Exception {
         final long sessionGap = 1000L; // something to do with time
-        final long maintainMillis = sessionGap * 3;
 
         final long t1 = mockTime.milliseconds();
         final List<KeyValue<String, String>> t1Messages = Arrays.asList(new 
KeyValue<>("bob", "start"),
@@ -617,7 +615,7 @@ public void shouldReduceSessionWindows() throws Exception {
         final String userSessionsStore = "UserSessionsStore";
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-                
.windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
+                .windowedBy(SessionWindows.with(sessionGap))
                 .reduce((value1, value2) -> value1 + ":" + value2, 
Materialized.as(userSessionsStore))
                 .toStream()
                 .foreach((key, value) -> {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index ab973e8d291..5576b93becf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -19,63 +19,27 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 
 public class JoinWindowsTest {
 
-    private static long anySize = 123L;
-    private static long anyOtherSize = 456L; // should be larger than anySize
-
-    @Test
-    public void shouldHaveSaneEqualsAndHashCode() {
-        final JoinWindows w1 = JoinWindows.of(anySize);
-        final JoinWindows w2 = JoinWindows.of(anySize);
-
-        // Reflexive
-        assertEquals(w1, w1);
-        assertEquals(w1.hashCode(), w1.hashCode());
-
-        // Symmetric
-        assertEquals(w1, w2);
-        assertEquals(w2, w1);
-        assertEquals(w1.hashCode(), w2.hashCode());
-
-        final JoinWindows w3 = JoinWindows.of(w2.afterMs).before(anyOtherSize);
-        final JoinWindows w4 = JoinWindows.of(anyOtherSize).after(w2.afterMs);
-        assertEquals(w3, w4);
-        assertEquals(w4, w3);
-        assertEquals(w3.hashCode(), w4.hashCode());
-
-        // Inequality scenarios
-        assertNotEquals("must be false for null", null, w1);
-        assertNotEquals("must be false for different window types", 
UnlimitedWindows.of(), w1);
-        assertNotEquals("must be false for different types", new Object(), w1);
-
-        final JoinWindows differentWindowSize = JoinWindows.of(w1.afterMs + 1);
-        assertNotEquals("must be false when window sizes are different", 
differentWindowSize, w1);
-
-        final JoinWindows differentWindowSize2 = 
JoinWindows.of(w1.afterMs).after(w1.afterMs + 1);
-        assertNotEquals("must be false when window sizes are different", 
differentWindowSize2, w1);
-
-        final JoinWindows differentWindowSize3 = 
JoinWindows.of(w1.afterMs).before(w1.beforeMs + 1);
-        assertNotEquals("must be false when window sizes are different", 
differentWindowSize3, w1);
-    }
+    private static final long ANY_SIZE = 123L;
+    private static final long ANY_OTHER_SIZE = 456L; // should be larger than 
anySize
 
     @Test
     public void validWindows() {
-        JoinWindows.of(anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
-            .before(anySize)                    // [ -anySize ; anyOtherSize ]
-            .before(0)                          // [ 0 ; anyOtherSize ]
-            .before(-anySize)                   // [ anySize ; anyOtherSize ]
-            .before(-anyOtherSize);             // [ anyOtherSize ; 
anyOtherSize ]
-
-        JoinWindows.of(anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
-            .after(anySize)                     // [ -anyOtherSize ; anySize ]
-            .after(0)                           // [ -anyOtherSize ; 0 ]
-            .after(-anySize)                    // [ -anyOtherSize ; -anySize ]
-            .after(-anyOtherSize);              // [ -anyOtherSize ; 
-anyOtherSize ]
+        JoinWindows.of(ANY_OTHER_SIZE)   // [ -anyOtherSize ; anyOtherSize ]
+                   .before(ANY_SIZE)                    // [ -anySize ; 
anyOtherSize ]
+                   .before(0)                          // [ 0 ; anyOtherSize ]
+                   .before(-ANY_SIZE)                   // [ anySize ; 
anyOtherSize ]
+                   .before(-ANY_OTHER_SIZE);             // [ anyOtherSize ; 
anyOtherSize ]
+
+        JoinWindows.of(ANY_OTHER_SIZE)   // [ -anyOtherSize ; anyOtherSize ]
+                   .after(ANY_SIZE)                     // [ -anyOtherSize ; 
anySize ]
+                   .after(0)                           // [ -anyOtherSize ; 0 ]
+                   .after(-ANY_SIZE)                    // [ -anyOtherSize ; 
-anySize ]
+                   .after(-ANY_OTHER_SIZE);              // [ -anyOtherSize ; 
-anyOtherSize ]
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -85,9 +49,9 @@ public void timeDifferenceMustNotBeNegative() {
 
     @Test
     public void endTimeShouldNotBeBeforeStart() {
-        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final JoinWindows windowSpec = JoinWindows.of(ANY_SIZE);
         try {
-            windowSpec.after(-anySize - 1);
+            windowSpec.after(-ANY_SIZE - 1);
             fail("window end time should not be before window start time");
         } catch (final IllegalArgumentException e) {
             // expected
@@ -96,25 +60,27 @@ public void endTimeShouldNotBeBeforeStart() {
 
     @Test
     public void startTimeShouldNotBeAfterEnd() {
-        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final JoinWindows windowSpec = JoinWindows.of(ANY_SIZE);
         try {
-            windowSpec.before(-anySize - 1);
+            windowSpec.before(-ANY_SIZE - 1);
             fail("window start time should not be after window end time");
         } catch (final IllegalArgumentException e) {
             // expected
         }
     }
 
+    @Deprecated
     @Test
     public void untilShouldSetMaintainDuration() {
-        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final JoinWindows windowSpec = JoinWindows.of(ANY_SIZE);
         final long windowSize = windowSpec.size();
         assertEquals(windowSize, windowSpec.until(windowSize).maintainMs());
     }
 
+    @Deprecated
     @Test
     public void retentionTimeMustNoBeSmallerThanWindowSize() {
-        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final JoinWindows windowSpec = JoinWindows.of(ANY_SIZE);
         final long windowSize = windowSpec.size();
         try {
             windowSpec.until(windowSize - 1);
@@ -124,4 +90,16 @@ public void retentionTimeMustNoBeSmallerThanWindowSize() {
         }
     }
 
+    @Test
+    public void gracePeriodShouldEnforceBoundaries() {
+        JoinWindows.of(3L).grace(0L);
+
+        try {
+            JoinWindows.of(3L).grace(-1L);
+            fail("should not accept negatives");
+        } catch (final IllegalArgumentException e) {
+            //expected
+        }
+    }
+
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index d0e5996a481..c464c75d5b4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -18,9 +18,6 @@
 
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -32,12 +29,26 @@ public void shouldSetWindowGap() {
         assertEquals(anyGap, SessionWindows.with(anyGap).inactivityGap());
     }
 
+    @Deprecated
     @Test
     public void shouldSetWindowRetentionTime() {
         final long anyRetentionTime = 42L;
         assertEquals(anyRetentionTime, 
SessionWindows.with(1).until(anyRetentionTime).maintainMs());
     }
 
+
+    @Test
+    public void gracePeriodShouldEnforceBoundaries() {
+        SessionWindows.with(3L).grace(0L);
+
+        try {
+            SessionWindows.with(3L).grace(-1L);
+            fail("should not accept negatives");
+        } catch (final IllegalArgumentException e) {
+            //expected
+        }
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void windowSizeMustNotBeNegative() {
         SessionWindows.with(-1);
@@ -48,12 +59,14 @@ public void windowSizeMustNotBeZero() {
         SessionWindows.with(0);
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated apis
     @Test
     public void 
retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() {
         final long windowGap = 2 * SessionWindows.with(1).maintainMs();
         assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs());
     }
 
+    @Deprecated
     @Test
     public void retentionTimeMustNotBeNegative() {
         final SessionWindows windowSpec = SessionWindows.with(42);
@@ -64,19 +77,4 @@ public void retentionTimeMustNotBeNegative() {
             // expected
         }
     }
-
-    @Test
-    public void shouldBeEqualWhenGapAndMaintainMsAreTheSame() {
-        assertThat(SessionWindows.with(5), equalTo(SessionWindows.with(5)));
-    }
-
-    @Test
-    public void shouldNotBeEqualWhenMaintainMsDifferent() {
-        assertThat(SessionWindows.with(5), 
not(equalTo(SessionWindows.with(5).until(10))));
-    }
-
-    @Test
-    public void shouldNotBeEqualWhenGapIsDifferent() {
-        assertThat(SessionWindows.with(5), 
not(equalTo(SessionWindows.with(10))));
-    }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index a90c86fbe7e..b8d3bfd0cc2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -22,7 +22,6 @@
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class TimeWindowsTest {
@@ -40,49 +39,19 @@ public void shouldSetWindowAdvance() {
         assertEquals(anyAdvance, 
TimeWindows.of(ANY_SIZE).advanceBy(anyAdvance).advanceMs);
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
     public void shouldSetWindowRetentionTime() {
         assertEquals(ANY_SIZE, 
TimeWindows.of(ANY_SIZE).until(ANY_SIZE).maintainMs());
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
     public void 
shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime()
 {
         final long windowSize = 2 * TimeWindows.of(1).maintainMs();
         assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs());
     }
 
-    @Test
-    public void shouldHaveSaneEqualsAndHashCode() {
-        final TimeWindows w1 = TimeWindows.of(ANY_SIZE);
-        final TimeWindows w2 = TimeWindows.of(w1.sizeMs);
-
-        // Reflexive
-        assertEquals(w1, w1);
-        assertEquals(w1.hashCode(), w1.hashCode());
-
-        // Symmetric
-        assertEquals(w1, w2);
-        assertEquals(w2, w1);
-        assertEquals(w1.hashCode(), w2.hashCode());
-
-        // Transitive
-        final TimeWindows w3 = TimeWindows.of(w2.sizeMs);
-        assertEquals(w2, w3);
-        assertEquals(w1, w3);
-        assertEquals(w1.hashCode(), w3.hashCode());
-
-        // Inequality scenarios
-        assertNotEquals("must be false for null", null, w1);
-        assertNotEquals("must be false for different window types", 
UnlimitedWindows.of(), w1);
-        assertNotEquals("must be false for different types", new Object(), w1);
-
-        final TimeWindows differentWindowSize = TimeWindows.of(w1.sizeMs + 1);
-        assertNotEquals("must be false when window sizes are different", 
differentWindowSize, w1);
-
-        final TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advanceMs 
- 1);
-        assertNotEquals("must be false when advance intervals are different", 
differentAdvanceInterval, w1);
-    }
-
     @Test(expected = IllegalArgumentException.class)
     public void windowSizeMustNotBeZero() {
         TimeWindows.of(0);
@@ -115,6 +84,7 @@ public void advanceIntervalMustNotBeNegative() {
         }
     }
 
+    @Deprecated
     @Test
     public void advanceIntervalMustNotBeLargerThanWindowSize() {
         final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE);
@@ -126,6 +96,7 @@ public void advanceIntervalMustNotBeLargerThanWindowSize() {
         }
     }
 
+    @Deprecated
     @Test
     public void retentionTimeMustNoBeSmallerThanWindowSize() {
         final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE);
@@ -137,6 +108,18 @@ public void retentionTimeMustNoBeSmallerThanWindowSize() {
         }
     }
 
+    @Test
+    public void gracePeriodShouldEnforceBoundaries() {
+        TimeWindows.of(3L).grace(0L);
+
+        try {
+            TimeWindows.of(3L).grace(-1L);
+            fail("should not accept negatives");
+        } catch (final IllegalArgumentException e) {
+            //expected
+        }
+    }
+
     @Test
     public void shouldComputeWindowsForHoppingWindows() {
         final TimeWindows windows = TimeWindows.of(12L).advanceBy(5L);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
index c5e1ce58924..9798a8128cb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
@@ -50,6 +50,16 @@ public void shouldThrowOnUntil() {
         }
     }
 
+    @Test
+    public void gracePeriodShouldNotBeSettable() {
+        try {
+            UnlimitedWindows.of().grace(0L);
+            fail("should not be able to set grace period");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
+    }
+
     @Test
     public void shouldIncludeRecordsThatHappenedOnWindowStart() {
         final UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
index fc097ca4c03..12ff16612c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class WindowsTest {
 
@@ -37,6 +38,7 @@ public long size() {
         }
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
     public void shouldSetNumberOfSegments() {
         final int anySegmentSizeLargerThanOne = 5;
@@ -49,17 +51,33 @@ public void shouldSetNumberOfSegments() {
         );
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
     public void shouldSetWindowRetentionTime() {
         final int anyNotNegativeRetentionTime = 42;
         assertEquals(anyNotNegativeRetentionTime, new 
TestWindows().until(anyNotNegativeRetentionTime).maintainMs());
     }
 
+
+    @Test
+    public void gracePeriodShouldEnforceBoundaries() {
+        new TestWindows().grace(0L);
+
+        try {
+            new TestWindows().grace(-1L);
+            fail("should not accept negatives");
+        } catch (final IllegalArgumentException e) {
+            //expected
+        }
+    }
+
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test(expected = IllegalArgumentException.class)
     public void numberOfSegmentsMustBeAtLeastTwo() {
         new TestWindows().segments(1);
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test(expected = IllegalArgumentException.class)
     public void retentionTimeMustNotBeNegative() {
         new TestWindows().until(-1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index c41b19b700f..354fa0a147d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -19,11 +19,11 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
@@ -60,6 +60,7 @@
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -243,34 +244,69 @@ public void shouldSendDataToDynamicTopics() {
         assertThat(mockProcessors.get(1).processed, 
equalTo(Collections.singletonList("b:v1")));
     }
 
+    @SuppressWarnings("deprecation") // specifically testing the deprecated 
variant
     @Test
-    public void 
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated()
 {
+    public void 
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreatedWithRetention()
 {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> kStream = builder.stream("topic-1", 
stringConsumed);
         final ValueJoiner<String, String, String> valueJoiner = 
MockValueJoiner.instance(":");
         final long windowSize = TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.DAYS);
         final KStream<String, String> stream = kStream
-                        .map(new KeyValueMapper<String, String, KeyValue<? 
extends String, ? extends String>>() {
-                            @Override
-                            public KeyValue<? extends String, ? extends 
String> apply(final String key, final String value) {
-                                return KeyValue.pair(value, value);
-                            }
-                        });
+            .map(new KeyValueMapper<String, String, KeyValue<? extends String, 
? extends String>>() {
+                @Override
+                public KeyValue<? extends String, ? extends String> 
apply(final String key, final String value) {
+                    return KeyValue.pair(value, value);
+                }
+            });
         stream.join(kStream,
                     valueJoiner,
                     JoinWindows.of(windowSize).until(3 * windowSize),
                     Joined.with(Serdes.String(),
                                 Serdes.String(),
                                 Serdes.String()))
-                .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+              .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        final ProcessorTopology topology = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
+
+        final SourceNode originalSourceNode = topology.source("topic-1");
+
+        for (final SourceNode sourceNode : topology.sources()) {
+            if (sourceNode.name().equals(originalSourceNode.name())) {
+                assertNull(sourceNode.getTimestampExtractor());
+            } else {
+                assertThat(sourceNode.getTimestampExtractor(), 
instanceOf(FailOnInvalidTimestamp.class));
+            }
+        }
+    }
+
+    @Test
+    public void 
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated()
 {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> kStream = builder.stream("topic-1", 
stringConsumed);
+        final ValueJoiner<String, String, String> valueJoiner = 
MockValueJoiner.instance(":");
+        final long windowSize = TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.DAYS);
+        final KStream<String, String> stream = kStream
+            .map(new KeyValueMapper<String, String, KeyValue<? extends String, 
? extends String>>() {
+                @Override
+                public KeyValue<? extends String, ? extends String> 
apply(final String key, final String value) {
+                    return KeyValue.pair(value, value);
+                }
+            });
+        stream.join(
+            kStream,
+            valueJoiner,
+            JoinWindows.of(windowSize).grace(3L * windowSize),
+            Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
+        )
+              .to("output-topic", Produced.with(Serdes.String(), 
Serdes.String()));
 
         final ProcessorTopology topology = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
 
         final SourceNode originalSourceNode = topology.source("topic-1");
 
-        for (final SourceNode sourceNode: topology.sources()) {
+        for (final SourceNode sourceNode : topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
-                assertEquals(sourceNode.getTimestampExtractor(), null);
+                assertNull(sourceNode.getTimestampExtractor());
             } else {
                 assertThat(sourceNode.getTimestampExtractor(), 
instanceOf(FailOnInvalidTimestamp.class));
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 6b5e5773b20..74cd7bdb4eb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
@@ -36,6 +39,7 @@
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -46,6 +50,8 @@
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -78,7 +84,7 @@ public Long apply(final String aggKey, final Long aggOne, 
final Long aggTwo) {
     };
     private final KStreamSessionWindowAggregate<String, String, Long> 
sessionAggregator =
         new KStreamSessionWindowAggregate<>(
-            SessionWindows.with(GAP_MS).until(3 * GAP_MS),
+            SessionWindows.with(GAP_MS),
             STORE_NAME,
             initializer,
             aggregator,
@@ -88,13 +94,23 @@ public Long apply(final String aggKey, final Long aggOne, 
final Long aggTwo) {
     private final Processor<String, String> processor = 
sessionAggregator.get();
     private SessionStore<String, Long> sessionStore;
     private InternalMockProcessorContext context;
+    private Metrics metrics;
 
 
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(stateDir,
-            Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new 
ThreadCache(new LogContext("testCache "), 100000, new MockStreamsMetrics(new 
Metrics()))) {
+        metrics = new Metrics();
+        final MockStreamsMetrics metrics = new 
MockStreamsMetrics(KStreamSessionWindowAggregateProcessorTest.this.metrics);
+        context = new InternalMockProcessorContext(
+            stateDir,
+            Serdes.String(),
+            Serdes.String(),
+            metrics,
+            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 100000, metrics)
+        ) {
             @Override
             public <K, V> void forward(final K key, final V value) {
                 results.add(KeyValue.pair(key, value));
@@ -107,9 +123,9 @@ public void initializeStore() {
 
     private void initStore(final boolean enableCaching) {
         final StoreBuilder<SessionStore<String, Long>> storeBuilder = 
Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, GAP_MS * 
3),
-                Serdes.String(),
-                Serdes.Long())
-                .withLoggingDisabled();
+                                                                               
                  Serdes.String(),
+                                                                               
                  Serdes.Long())
+            .withLoggingDisabled();
 
         if (enableCaching) {
             storeBuilder.withCachingEnabled();
@@ -316,4 +332,37 @@ public void shouldLogAndMeterWhenSkippingNullKey() {
         assertEquals(1.0, getMetricByName(context.metrics().metrics(), 
"skipped-records-total", "stream-metrics").metricValue());
         assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
     }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingLateRecord() {
+        
LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
+        final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
+        final Processor<String, String> processor = new 
KStreamSessionWindowAggregate<>(
+            SessionWindows.with(10L).grace(10L),
+            STORE_NAME,
+            initializer,
+            aggregator,
+            sessionMerger
+        ).get();
+
+        initStore(false);
+        processor.init(context);
+        context.setStreamTime(20);
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", null));
+        processor.process("A", "1");
+        LogCaptureAppender.unregister(appender);
+
+        final Metric dropMetric = metrics.metrics().get(new MetricName(
+            "late-record-drop-total",
+            "stream-processor-node-metrics",
+            "The total number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "test"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "TESTING_NODE")
+            )
+        ));
+        assertEquals(1.0, dropMetric.metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record for 
expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] 
window=[0,0) expiration=[10]"));
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index dd2cf050bac..af7cff6d17a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,6 +47,8 @@
 import java.util.List;
 import java.util.Properties;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -246,6 +249,7 @@ public void shouldLogAndMeterWhenSkippingNullKey() {
         }
     }
 
+    @Deprecated // testing deprecated functionality (behavior of until)
     @Test
     public void shouldLogAndMeterWhenSkippingExpiredWindow() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -263,6 +267,7 @@ public void shouldLogAndMeterWhenSkippingExpiredWindow() {
             .map((key, value) -> new KeyValue<>(key.toString(), value))
             .to("output");
 
+        LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
             driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
@@ -275,15 +280,84 @@ public void shouldLogAndMeterWhenSkippingExpiredWindow() {
             driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
             LogCaptureAppender.unregister(appender);
 
-            assertThat(getMetricByName(driver.metrics(), 
"skipped-records-total", "stream-metrics").metricValue(), equalTo(7.0));
+            final MetricName metricName = new MetricName(
+                "late-record-drop-total",
+                "stream-processor-node-metrics",
+                "The total number of occurrence of late-record-drop 
operations.",
+                mkMap(
+                    mkEntry("client-id", 
"topology-test-driver-virtual-thread"),
+                    mkEntry("task-id", "0_0"),
+                    mkEntry("processor-node-id", 
"KSTREAM-AGGREGATE-0000000001")
+                )
+            );
+            assertThat(driver.metrics().get(metricName).metricValue(), 
equalTo(7.0));
+            assertThat(appender.getMessages(), hasItems(
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
+            ));
+
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@95/105]", "+100", 100);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@100/110]", "+100", 100);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@5/15]", "+5", 5);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@5/15]", "+5+6", 6);
+            assertThat(driver.readOutput("output"), nullValue());
+        }
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KStream<String, String> stream1 = builder.stream(topic, 
Consumed.with(Serdes.String(), Serdes.String()));
+        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .windowedBy(TimeWindows.of(10).advanceBy(5).grace(90L))
+            .aggregate(
+                () -> "",
+                MockAggregator.toStringInstance("+"),
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+            )
+            .toStream()
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to("output");
+
+        LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
+        final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
+            driver.pipeInput(recordFactory.create(topic, "k", "0", 0L));
+            driver.pipeInput(recordFactory.create(topic, "k", "1", 1L));
+            driver.pipeInput(recordFactory.create(topic, "k", "2", 2L));
+            driver.pipeInput(recordFactory.create(topic, "k", "3", 3L));
+            driver.pipeInput(recordFactory.create(topic, "k", "4", 4L));
+            driver.pipeInput(recordFactory.create(topic, "k", "5", 5L));
+            driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
+            LogCaptureAppender.unregister(appender);
+
+            final MetricName metricName = new MetricName(
+                "late-record-drop-total",
+                "stream-processor-node-metrics",
+                "The total number of occurrence of late-record-drop 
operations.",
+                mkMap(
+                    mkEntry("client-id", 
"topology-test-driver-virtual-thread"),
+                    mkEntry("task-id", "0_0"),
+                    mkEntry("processor-node-id", 
"KSTREAM-AGGREGATE-0000000001")
+                )
+            );
+            assertThat(driver.metrics().get(metricName).metricValue(), 
equalTo(7.0));
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[0] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[1] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[2] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[3] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[4] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[5] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0] expiration=[0]"
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@95/105]", "+100", 100);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index 4cf9324caa0..bc8ca95187a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -34,6 +36,8 @@
 
 import java.util.Properties;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -68,6 +72,7 @@ public void shouldLogAndMeterOnNullKey() {
         }
     }
 
+    @Deprecated // testing deprecated functionality (behavior of until)
     @Test
     public void shouldLogAndMeterOnExpiredEvent() {
 
@@ -83,6 +88,7 @@ public void shouldLogAndMeterOnExpiredEvent() {
 
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            
LogCaptureAppender.setClassLoggerToDebug(KStreamWindowReduce.class);
             final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
             driver.pipeInput(recordFactory.create("TOPIC", "k", "100", 100L));
             driver.pipeInput(recordFactory.create("TOPIC", "k", "0", 0L));
@@ -93,13 +99,24 @@ public void shouldLogAndMeterOnExpiredEvent() {
             driver.pipeInput(recordFactory.create("TOPIC", "k", "5", 5L));
             LogCaptureAppender.unregister(appender);
 
-            assertThat(getMetricByName(driver.metrics(), 
"skipped-records-total", "stream-metrics").metricValue(), equalTo(5.0));
+            final Metric dropMetric = driver.metrics().get(new MetricName(
+                "late-record-drop-total",
+                "stream-processor-node-metrics",
+                "The total number of occurrence of late-record-drop 
operations.",
+                mkMap(
+                    mkEntry("client-id", 
"topology-test-driver-virtual-thread"),
+                    mkEntry("task-id", "0_0"),
+                    mkEntry("processor-node-id", "KSTREAM-REDUCE-0000000002")
+                )
+            ));
+
+            assertThat(dropMetric.metricValue(), equalTo(5.0));
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[1] timestamp=[0] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[2] timestamp=[1] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[3] timestamp=[2] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[4] timestamp=[3] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[5] timestamp=[4] window=[0] expiration=[0]"
+                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] 
partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@100/105]", "100", 100);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b27e16f15fb..c822fc345d8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -229,10 +229,12 @@ public void shouldRestoreToWindowedStores() throws 
IOException {
         final InternalTopologyBuilder internalTopologyBuilder = new 
InternalTopologyBuilder().setApplicationId(applicationId);
 
         final InternalStreamsBuilder builder = new 
InternalStreamsBuilder(internalTopologyBuilder);
-        builder.stream(Collections.singleton("topic"), new 
ConsumedInternal<>())
+
+        builder
+            .stream(Collections.singleton("topic"), new ConsumedInternal<>())
             .groupByKey()
-            .windowedBy(TimeWindows.of(60_000).until(120_000))
-            .count(Materialized.as(storeName));
+            .windowedBy(TimeWindows.of(60_000).grace(0L))
+            .count(Materialized.<Object, Long, WindowStore<Bytes, 
byte[]>>as(storeName).withRetention(120_000L));
 
         builder.buildAndOptimizeTopology();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
index b6f5769149d..ffb8799ad12 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
@@ -18,6 +18,7 @@
 
 
 import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 
@@ -33,6 +34,10 @@ public static LogCaptureAppender createAndRegister() {
         return logCaptureAppender;
     }
 
+    public static void setClassLoggerToDebug(final Class<?> clazz) {
+        Logger.getLogger(clazz).setLevel(Level.DEBUG);
+    }
+
     public static void unregister(final LogCaptureAppender logCaptureAppender) 
{
         Logger.getRootLogger().removeAppender(logCaptureAppender);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 47e79c96f8e..f06e129c41a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -68,7 +68,7 @@
     public void setUp() {
         final SessionKeySchema schema = new SessionKeySchema();
         schema.init("topic");
-        underlying = new RocksDBSegmentedBytesStore("test", 0L, 
SEGMENT_INTERVAL, schema);
+        underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 
0L, SEGMENT_INTERVAL, schema);
         final RocksDBSessionStore<Bytes, byte[]> sessionStore = new 
RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
         cachingStore = new CachingSessionStore<>(sessionStore, 
Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), 
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 0e7f88ad63c..1c8dd7b8a5a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -79,7 +79,7 @@
     @Before
     public void setUp() {
         keySchema = new WindowKeySchema();
-        underlying = new RocksDBSegmentedBytesStore("test", 0, 
SEGMENT_INTERVAL, keySchema);
+        underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 
0, SEGMENT_INTERVAL, keySchema);
         final RocksDBWindowStore<Bytes, byte[]> windowStore = new 
RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, 
WINDOW_SIZE);
         cacheListener = new 
CachingKeyValueStoreTest.CacheFlushListenerStub<>();
         cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), 
Serdes.String(), WINDOW_SIZE, SEGMENT_INTERVAL);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index cffd73f05a2..93e3452ad0c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -27,6 +29,7 @@
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -55,11 +58,15 @@
 import java.util.Set;
 import java.util.SimpleTimeZone;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -88,22 +95,22 @@ public void before() {
         schema.init("topic");
 
         if (schema instanceof SessionKeySchema) {
-            windows[0] = new SessionWindow(10, 10);
-            windows[1] = new SessionWindow(500, 1000);
-            windows[2] = new SessionWindow(1000, 1500);
+            windows[0] = new SessionWindow(10L, 10L);
+            windows[1] = new SessionWindow(500L, 1000L);
+            windows[2] = new SessionWindow(1_000L, 1_500L);
             windows[3] = new SessionWindow(30_000L, 60_000L);
         }
         if (schema instanceof WindowKeySchema) {
-
-            windows[0] = timeWindowForSize(10, windowSizeForTimeWindow);
-            windows[1] = timeWindowForSize(500, windowSizeForTimeWindow);
-            windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow);
+            windows[0] = timeWindowForSize(10L, windowSizeForTimeWindow);
+            windows[1] = timeWindowForSize(500L, windowSizeForTimeWindow);
+            windows[2] = timeWindowForSize(1_000L, windowSizeForTimeWindow);
             windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow);
         }
 
 
         bytesStore = new RocksDBSegmentedBytesStore(
             storeName,
+            "metrics-scope",
             retention,
             segmentInterval,
             schema
@@ -276,6 +283,7 @@ public void 
shouldLoadSegmentsWithOldStyleDateFormattedName() {
 
         bytesStore = new RocksDBSegmentedBytesStore(
             storeName,
+            "metrics-scope",
             retention,
             segmentInterval,
             schema
@@ -312,6 +320,7 @@ public void 
shouldLoadSegmentsWithOldStyleColonFormattedName() {
 
         bytesStore = new RocksDBSegmentedBytesStore(
             storeName,
+            "metrics-scope",
             retention,
             segmentInterval,
             schema
@@ -402,6 +411,46 @@ public void shouldRespectBulkLoadOptionsDuringInit() {
         }
     }
 
+    @Test
+    public void shouldLogAndMeasureExpiredRecords() {
+        
LogCaptureAppender.setClassLoggerToDebug(RocksDBSegmentedBytesStore.class);
+        final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
+
+        context.setStreamTime(Math.max(retention, segmentInterval) * 2);
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), 
serializeValue(5));
+
+        LogCaptureAppender.unregister(appender);
+
+        final Map<MetricName, ? extends Metric> metrics = 
context.metrics().metrics();
+
+        final Metric dropTotal = metrics.get(new MetricName(
+            "expired-window-record-drop-total",
+            "stream-metrics-scope-metrics",
+            "The total number of occurrence of expired-window-record-drop 
operations.",
+            mkMap(
+                mkEntry("client-id", "mock"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("metrics-scope-id", "bytes-store")
+            )
+        ));
+
+        final Metric dropRate = metrics.get(new MetricName(
+            "expired-window-record-drop-rate",
+            "stream-metrics-scope-metrics",
+            "The average number of occurrence of expired-window-record-drop 
operation per second.",
+            mkMap(
+                mkEntry("client-id", "mock"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("metrics-scope-id", "bytes-store")
+            )
+        ));
+
+        assertEquals(1.0, dropTotal.metricValue());
+        assertNotEquals(0.0, dropRate.metricValue());
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Skipping record for expired segment."));
+    }
+
     private Set<String> segmentDirs() {
         final File windowDir = new File(stateDir, storeName);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index cf4b90d3a58..a80e28bf14f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -53,7 +53,7 @@ public void before() {
         schema.init("topic");
 
         final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", 10_000L, 
60_000L, schema);
+                new RocksDBSegmentedBytesStore("session-store", 
"metrics-scope", 10_000L, 60_000L, schema);
 
         sessionStore = new RocksDBSessionStore<>(bytesStore,
                                                  Serdes.String(),
@@ -154,7 +154,7 @@ public void shouldFindSessionsToMerge() {
     @Test
     public void shouldFetchExactKeys() {
         final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", 
0x7a00000000000000L, 0x7a00000000000000L, new SessionKeySchema());
+                new RocksDBSegmentedBytesStore("session-store", 
"metrics-scope", 0x7a00000000000000L, 0x7a00000000000000L, new 
SessionKeySchema());
 
         sessionStore = new RocksDBSessionStore<>(bytesStore,
                                                  Serdes.String(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index b0057e5495f..b49a5c00d17 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -828,7 +828,7 @@ public void 
shouldThrowNullPointerExceptionOnRangeNullToKey() {
     @Test
     public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
         windowStore = new RocksDBWindowStore<>(
-            new RocksDBSegmentedBytesStore(windowName, retentionPeriod, 
segmentInterval, new WindowKeySchema()),
+            new RocksDBSegmentedBytesStore(windowName, "metrics-scope", 
retentionPeriod, segmentInterval, new WindowKeySchema()),
             Serdes.Integer(),
             new SerdeThatDoesntHandleNull(),
             false,
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index e3a9ce7c644..ec8d3280a4d 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -136,6 +136,7 @@ public InternalMockProcessorContext(final File stateDir,
             metrics,
             null,
             cache);
+        super.setCurrentNode(new ProcessorNode("TESTING_NODE"));
         this.stateDir = stateDir;
         this.keySerde = keySerde;
         this.valSerde = valSerde;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> KIP-328: Add Window Grace Period (and deprecate Window Retention)
> -----------------------------------------------------------------
>
>                 Key: KAFKA-7222
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7222
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]
>  
> This ticket only covers the grace period portion of the work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to