ableegoldman commented on a change in pull request #10740:
URL: https://github.com/apache/kafka/pull/10740#discussion_r636513301



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -67,6 +68,7 @@
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
  * @see TimestampExtractor
  */
+@SuppressWarnings("deprecation")

Review comment:
       Why do we need this?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -168,6 +172,50 @@ public JoinWindows grace(final Duration afterWindowEnd) 
throws IllegalArgumentEx
         return new JoinWindows(beforeMs, afterMs, afterWindowEndMs);
     }
 
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @since 3.0
+     */
+    public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
+        return ofTimeDifferenceAndGrace(timeDifference, 
ofMillis(DEFAULT_GRACE_PERIOD_MS));

Review comment:
       I think it would be more clear to specify this as follows. The whole 
point of this KIP is to get rid of the concept of a "default grace period" 
   ```suggestion
           return ofTimeDifferenceAndGrace(timeDifference, ofMillis(0));
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -69,6 +70,7 @@
  * @see KGroupedStream#windowedBy(SessionWindows)
  * @see TimestampExtractor
  */
+@SuppressWarnings("deprecation")

Review comment:
       Same here, I don't think we should need to suppress any deprecation 
warnings in this class as we aren't actually using any of the deprecated 
methods here, right? (And if we are, it's better to suppress at the smallest 
scope possible, ie only to specific individual methods rather than the whole 
class)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -168,6 +172,50 @@ public JoinWindows grace(final Duration afterWindowEnd) 
throws IllegalArgumentEx
         return new JoinWindows(beforeMs, afterMs, afterWindowEndMs);
     }
 
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @since 3.0

Review comment:
       nit: delete this line (here and elsewhere in the PR), we don't need this 
for new APIs in Kafka (only for deprecated APIs we do the `@deprecated since 
x.y` thing)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -84,17 +86,14 @@ private SessionWindows(final long gapMs, final long 
graceMs) {
      * Create a new window specification with the specified inactivity gap.
      *
      * @param inactivityGap the gap of inactivity between sessions
-     * @return a new window specification with default maintain duration of 1 
day
+     * @return a new window specification with default without any grace period

Review comment:
       I might have forgotten to specify this in the KIP, but we should make 
sure not to change the behavior of these old deprecated constructors. Maybe we 
can phrase this as something like:
   ```suggestion
        * @return a new window specification without specifying a grace period 
(uses the old default grace period of 24hr)
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -119,6 +120,50 @@ public SessionWindows grace(final Duration afterWindowEnd) 
throws IllegalArgumen
         return new SessionWindows(gapMs, afterWindowEndMs);
     }
 
+    /**
+     * Create a new window specification with the specified inactivity gap.
+     *
+     * @param inactivityGap the gap of inactivity between sessions
+     * @return a new window specification without any grace period
+     *
+     * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
+     * @since 3.0
+     */
+    public static SessionWindows ofInactivityGapWithNoGrace(final Duration 
inactivityGap) {
+        return ofInactivityGapAndGrace(inactivityGap, 
ofMillis(DEFAULT_GRACE_PERIOD_MS));
+    }
+
+    /**
+     * Reject out-of-order events that arrive more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Note that new events may change the boundaries of session windows, so 
aggressive
+     * close times can lead to surprising results in which an out-of-order 
event is rejected and then
+     * a subsequent event moves the window boundary forward.
+     *
+     * @param inactivityGap the gap of inactivity between sessions
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @return A SessionWindows object with the specified inactivity gap and 
grace period
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @since 3.0
+     */
+    public static SessionWindows ofInactivityGapAndGrace(final Duration 
inactivityGap, final Duration afterWindowEnd) {
+
+        final String msgPrefixInactivityGapMs = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
+        final long inactivityGapMs = 
validateMillisecondDuration(inactivityGap, msgPrefixInactivityGapMs);
+        if (inactivityGapMs <= 0) {
+            throw new IllegalArgumentException("Gap time (inactivityGapMs) 
cannot be zero or negative.");
+        }
+
+        final String msgPrefixAfterWindowEnd = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefixAfterWindowEnd);
+        if (afterWindowEndMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }

Review comment:
       I think it might be cleaner to move all checks into the constructor, and 
then each static constructor method just has to invoke that, rather than having 
one of them implement everything and the others rely on that. Thoughts?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -168,6 +172,50 @@ public JoinWindows grace(final Duration afterWindowEnd) 
throws IllegalArgumentEx
         return new JoinWindows(beforeMs, afterMs, afterWindowEndMs);
     }
 
+
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     * @since 3.0
+     */
+    public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
+        return ofTimeDifferenceAndGrace(timeDifference, 
ofMillis(DEFAULT_GRACE_PERIOD_MS));
+    }
+
+    /**
+     * Reject out-of-order events that are delayed more than {@code 
afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @return this updated builder
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     * @since 3.0
+     */
+    public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {

Review comment:
       super nit: can you move these two new APIs up higher in this class, to 
above the deprecated `of()`? It's just easier for users to find and understand 
the entry point of this class that way, and not be greeted immediately by the 
deprecated versions
   
   Same for the other Windows classes, let's keep the preferred APIs at the top 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to