showuon commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r659484507



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code 
afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition 
and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end 
will be dropped.

Review comment:
       I expected that the javadoc for `ofTimeDifferenceWithNoGrace` should be 
mostly the same except `grace` parameter. Is there any reason why they are 
different?
   
   Same comment to other places.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -83,6 +83,14 @@ protected JoinWindows(final JoinWindows joinWindows) {
         afterMs = joinWindows.afterMs;
         graceMs = joinWindows.graceMs;
         enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+
+        if (beforeMs + afterMs < 0) {
+            throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");

Review comment:
       Could we make this Constructor to call another overloaded constructor, 
to avoid duplicated codes? i.e.
   ```java
   protected JoinWindows(final JoinWindows joinWindows) {
       this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, 
joinWindows.enableSpuriousResultFix);
   }
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -108,13 +147,12 @@ public static SessionWindows with(final Duration 
inactivityGap) {
      * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, 
Duration)} instead
      */
+    @Deprecated
     public SessionWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEndMs < 0) {
-            throw new IllegalArgumentException("Grace period must not be 
negative.");
-        }

Review comment:
       It's nice you move this check into the constructor. But there is also 
other validation you didn't move to constructor, ex: 
`validateMillisecondDuration(afterWindowEnd, msgPrefix)`. 
   
   We used to have 2 ways to create `SessionWindows`: `grace` and `with`. Now 
we added 2 more: `ofInactivityGapAndGrace`; and `ofInactivityGapAndNoGrace`. We 
should also validate the parameters, either in constructor, or in each method.
   
   Same comment to other places.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -89,18 +132,16 @@ private SlidingWindows(final long timeDifferenceMs, final 
long graceMs) {
      * @param grace the grace period to admit out-of-order events to a window
      * @return a new window definition
      * @throws IllegalArgumentException if the specified window size is &lt; 0 
or grace &lt; 0, or either can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} or {@link 
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
      */
+    @Deprecated
     public static SlidingWindows withTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration grace) throws IllegalArgumentException {
         final String msgPrefixSize = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefixSize);
-        if (timeDifferenceMs < 0) {
-            throw new IllegalArgumentException("Window time difference must 
not be negative.");
-        }
+
         final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, 
"grace");
         final long graceMs = validateMillisecondDuration(grace, 
msgPrefixGrace);

Review comment:
       Same as above mentioned, the validation didn't get handled in new API.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code 
afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition 
and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method 
implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end 
will be dropped.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @return a new JoinWindows object with the window definition and no 
grace period. Note that this means out of order records arriving after the 
window end will be dropped
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return a new JoinWindows object with the window definition with and 
grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead

Review comment:
       Please add a period after `@deprecated since 3.0`. i.e.
   `@deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead`
   
   Same comments to below identical places.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
##########
@@ -40,7 +40,11 @@
 
     // By default grace period is 24 hours for all windows,
     // in other words we allow out-of-order data for up to a day
-    protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+    // This behavior is now deprecated
+    protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 
60 * 1000L;

Review comment:
       This naming is confusing to uses. This will make user think that 24 
hours grace period cannot be used anymore. No I don't think that's what we 
want. 24 hours is still good to use, if user believe that's what they want. 
That means, we don't **deprecate** the 24 hours grace period, just don't set as 
default value, so we should not name it as "deprecated", "old", things.
   
   Correct me if I'm wrong, @izzyacademy @ableegoldman . Thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to