mjsax commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r661136400



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving

Review comment:
       `out-of-order` (with `-`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long 
advanceMs, final long graceMs)
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+     * Using the method implicitly sets the grace period to zero which means
+     * that out of order records arriving after the window end will be dropped
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period. Note that 
this means out of order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws 
IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the 
advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, 
N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
+     * Using the method explicitly sets the grace period to the duration 
specified by {@code afterWindowEnd} which means
+     * that out of order records arriving after the window end will be dropped.
+     *
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param size The size of the window. Must be larger than zero
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window. Must be non-negative.
+     * @return a TimeWindows object with the specified size and the specified 
grace period
+     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeAndGrace(final Duration size, final 
Duration afterWindowEnd)
+            throws IllegalArgumentException {

Review comment:
       nit: move the previous line?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving
+     * after the window end will be dropped. The delay is defined as 
(stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition 
and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end 
will be dropped.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @return a new JoinWindows object with the window definition and no 
grace period. Note that this means out of order records arriving after the 
window end will be dropped
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference

Review comment:
       Why is `join window interval` removed?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code 
afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition 
and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end 
will be dropped.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @return a new JoinWindows object with the window definition and no 
grace period. Note that this means out of order records arriving after the 
window end will be dropped
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return a new JoinWindows object with the window definition with and 
grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead

Review comment:
       nit. I think there is `.` missing `since 3.0[.] Use`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving
+     * after the window end will be dropped. The delay is defined as 
(stream_time - record_timestamp).

Review comment:
       `window end` -> `window closed`
   
   The window ends when `afterMs` passed, but we keep the window open until 
`afterMs + grace` which we call the "close time" of the window.
   
   > The delay is defined as (stream_time - record_timestamp).
   I think we can omit this?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -108,13 +154,12 @@ public static SessionWindows with(final Duration 
inactivityGap) {
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, 
Duration)} instead

Review comment:
       Missing `.` after `since 3.0`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code 
afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition 
and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end 
will be dropped.

Review comment:
       `out-of-order`
   `window closed`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than

Review comment:
       `earlier or later` -> `before or after` (to avoid confusion with the 
term "late data")

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -177,7 +204,9 @@ public long size() {
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} instead
      */
+    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {

Review comment:
       I am just wondering about `return new JoinWindows(beforeMs, afterMs, 
afterWindowEndMs, false);` at the end of this method. Should it really 
_disable_ the fix by passing hard-coded `false`? It might be better to update 
`false` to `enableSpuriousResultFix`? If the old `of(size)` was called, the 
flag is set to false already correctly, but if the new `ofTimeDifferenceXxx()` 
is called, it might be weird to disable the fix when `grace` is called?
   
   Or we do a check if the new API was used originally, and disallow calling 
`grace()` for this case?
   ```
   if (enableSpuriousResultFix) {
       throw new IllegalStateExecption("You can use grace() only if you create 
the JoinWindow using of(size) method. If you use ofTimeDifferenceAndGrace() or 
ofTimeDifferenceNoGrace() it is not allowed to change the grace period 
afterwards.");
   }
   ```
   
   \cc @ableegoldman WDYT?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -78,23 +79,68 @@
     private SessionWindows(final long gapMs, final long graceMs) {
         this.gapMs = gapMs;
         this.graceMs = graceMs;
+
+        if (gapMs <= 0) {
+            throw new IllegalArgumentException("Gap time cannot be zero or 
negative.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
     }
 
     /**
-     * Create a new window specification with the specified inactivity gap.
+     * Creates a new window specification with the specified inactivity gap.
+     * Using the method implicitly sets the grace period to zero which
+     * means that out of order records arriving after the window end will be 
dropped

Review comment:
       `out-of-order` (this may be a typo on other places, too) Can you fix 
everywhere?
   
   `window end` -> `window closed` (same -- please also fix elsewhere)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving
+     * after the window end will be dropped. The delay is defined as 
(stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}

Review comment:
       `of` -> `or`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out 
of order records arriving
+     * after the window end will be dropped. The delay is defined as 
(stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition 
and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than

Review comment:
       `before or after`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to