cadonna commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r662830942



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long 
advanceMs, final long graceMs)
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+     * Using the method implicitly sets the grace period to zero which means
+     * that out of order records arriving after the window end will be dropped
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period. Note that 
this means out of order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws 
IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the 
advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, 
N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+     * Using the method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd} which means
+     * that out of order records arriving after the window end will be dropped.
+     *
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param size The size of the window. Must be larger than zero
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window. Must be non-negative.
+     * @return a TimeWindows object with the specified size and the specified 
grace period
+     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeAndGrace(final Duration size, final 
Duration afterWindowEnd)
+            throws IllegalArgumentException {
+
+        final long sizeMs = size.toMillis();
+        final long afterWindowEndMs = afterWindowEnd.toMillis();
+
+        return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the 
advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, 
N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+     *
+     * @param size The size of the window
+     * @return a new window definition without specifying the grace period 
(uses old default of 24 hours)
      * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
      */
+    @Deprecated
     public static TimeWindows of(final Duration size) throws 
IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
         final long sizeMs = validateMillisecondDuration(size, msgPrefix);
-        if (sizeMs <= 0) {
-            throw new IllegalArgumentException("Window size (sizeMs) must be 
larger than zero.");
-        }
-        return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS);
+
+        return new TimeWindows(sizeMs, sizeMs, 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);

Review comment:
       This is not completely compatible with the behavior of older Streams 
apps. See #10953 for a fix and more details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to