mjsax commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r661136400
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
explicitly sets the grace period to
+ * the duration specified by {@code afterWindowEnd} which means that out
of order records arriving
Review comment:
`out-of-order` (with `-`)
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long
advanceMs, final long graceMs)
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized,
gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code
advance == size}.
+ * Using the method implicitly sets the grace period to zero which means
+ * that out of order records arriving after the window end will be dropped
*
* @param size The size of the window
- * @return a new window definition with default maintain duration of 1 day
+ * @return a new window definition with default no grace period. Note that
this means out of order records arriving after the window end will be dropped
+ * @throws IllegalArgumentException if the specified window size is zero
or negative or can't be represented as {@code long milliseconds}
+ */
+ public static TimeWindows ofSizeWithNoGrace(final Duration size) throws
IllegalArgumentException {
+ return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+ }
+
+ /**
+ * Return a window definition with the given window size, and with the
advance interval being equal to the window
+ * size.
+ * The time interval represented by the N-th window is: {@code [N * size,
N * size + size)}.
+ * <p>
+ * This provides the semantics of tumbling windows, which are fixed-sized,
gap-less, non-overlapping windows.
+ * Tumbling windows are a special case of hopping windows with {@code
advance == size}.
+ * Using the method explicitly sets the grace period to the duration
specified by {@code afterWindowEnd} which means
+ * that out of order records arriving after the window end will be dropped.
+ *
+ * <p>
+ * Delay is defined as (stream_time - record_timestamp).
+ *
+ * @param size The size of the window. Must be larger than zero
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window. Must be non-negative.
+ * @return a TimeWindows object with the specified size and the specified
grace period
+ * @throws IllegalArgumentException if {@code afterWindowEnd} is negative
or can't be represented as {@code long milliseconds}
+ */
+ public static TimeWindows ofSizeAndGrace(final Duration size, final
Duration afterWindowEnd)
+ throws IllegalArgumentException {
Review comment:
nit: move the previous line?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
explicitly sets the grace period to
+ * the duration specified by {@code afterWindowEnd} which means that out
of order records arriving
+ * after the window end will be dropped. The delay is defined as
(stream_time - record_timestamp).
+ *
+ * @param timeDifference join window interval
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @return A new JoinWindows object with the specified window definition
and grace period
+ */
public static JoinWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) {
return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
implicitly sets the grace period to zero
+ * which means that out of order records arriving after the window end
will be dropped.
+ *
+ * @param timeDifference join window interval
+ * @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
+ * @return a new JoinWindows object with the window definition and no
grace period. Note that this means out of order records arriving after the
window end will be dropped
+ */
public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration
timeDifference) {
- return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), 0L, true);
+ return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
}
- /**
+ /**
* Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
* the timestamp of the record from the primary stream.
*
- * @param timeDifference join window interval
+ * @param timeDifference
Review comment:
Why is `join window interval` removed?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Reject out-of-order events that are delayed more than {@code
afterWindowEnd}
+ * after the end of its window.
+ * <p>
+ * Delay is defined as (stream_time - record_timestamp).
+ *
+ * @param timeDifference join window interval
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @return A new JoinWindows object with the specified window definition
and grace period
+ */
public static JoinWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) {
return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
implicitly sets the grace period to zero
+ * which means that out of order records arriving after the window end
will be dropped.
+ *
+ * @param timeDifference join window interval
+ * @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
+ * @return a new JoinWindows object with the window definition and no
grace period. Note that this means out of order records arriving after the
window end will be dropped
+ */
public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration
timeDifference) {
- return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), 0L, true);
+ return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
}
- /**
+ /**
* Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
* the timestamp of the record from the primary stream.
*
- * @param timeDifference join window interval
+ * @param timeDifference
+ * @return a new JoinWindows object with the window definition with and
grace period (uses old default of 24 hours)
* @throws IllegalArgumentException if {@code timeDifference} is negative
or can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration,
Duration)} instead
Review comment:
nit. I think there is `.` missing `since 3.0[.] Use`
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
explicitly sets the grace period to
+ * the duration specified by {@code afterWindowEnd} which means that out
of order records arriving
+ * after the window end will be dropped. The delay is defined as
(stream_time - record_timestamp).
Review comment:
`window end` -> `window closed`
The window ends when `afterMs` passed, but we keep the window open until
`afterMs + grace` which we call the "close time" of the window.
> The delay is defined as (stream_time - record_timestamp).
I think we can omit this?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -108,13 +154,12 @@ public static SessionWindows with(final Duration
inactivityGap) {
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration,
Duration)} instead
Review comment:
Missing `.` after `since 3.0`
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Reject out-of-order events that are delayed more than {@code
afterWindowEnd}
+ * after the end of its window.
+ * <p>
+ * Delay is defined as (stream_time - record_timestamp).
+ *
+ * @param timeDifference join window interval
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @return A new JoinWindows object with the specified window definition
and grace period
+ */
public static JoinWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) {
return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
implicitly sets the grace period to zero
+ * which means that out of order records arriving after the window end
will be dropped.
Review comment:
`out-of-order`
`window closed`
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
Review comment:
`earlier or later` -> `before or after` (to avoid confusion with the
term "late data")
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -177,7 +204,9 @@ public long size() {
* @param afterWindowEnd The grace period to admit out-of-order events to
a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @deprecated since 3.0 Use {@link
#ofTimeDifferenceWithNoGrace(Duration)} instead
*/
+ @Deprecated
public JoinWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
Review comment:
I am just wondering about `return new JoinWindows(beforeMs, afterMs,
afterWindowEndMs, false);` at the end of this method. Should it really
_disable_ the fix by passing hard-coded `false`? It might be better to update
`false` to `enableSpuriousResultFix`? If the old `of(size)` was called, the
flag is set to false already correctly, but if the new `ofTimeDifferenceXxx()`
is called, it might be weird to disable the fix when `grace` is called?
Or we do a check if the new API was used originally, and disallow calling
`grace()` for this case?
```
if (enableSpuriousResultFix) {
throw new IllegalStateExecption("You can use grace() only if you create
the JoinWindow using of(size) method. If you use ofTimeDifferenceAndGrace() or
ofTimeDifferenceNoGrace() it is not allowed to change the grace period
afterwards.");
}
```
\cc @ableegoldman WDYT?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -78,23 +79,68 @@
private SessionWindows(final long gapMs, final long graceMs) {
this.gapMs = gapMs;
this.graceMs = graceMs;
+
+ if (gapMs <= 0) {
+ throw new IllegalArgumentException("Gap time cannot be zero or
negative.");
+ }
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
}
/**
- * Create a new window specification with the specified inactivity gap.
+ * Creates a new window specification with the specified inactivity gap.
+ * Using the method implicitly sets the grace period to zero which
+ * means that out of order records arriving after the window end will be
dropped
Review comment:
`out-of-order` (this may be a typo on other places, too) Can you fix
everywhere?
`window end` -> `window closed` (same -- please also fix elsewhere)
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
explicitly sets the grace period to
+ * the duration specified by {@code afterWindowEnd} which means that out
of order records arriving
+ * after the window end will be dropped. The delay is defined as
(stream_time - record_timestamp).
+ *
+ * @param timeDifference join window interval
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
Review comment:
`of` -> `or`
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie,
beforeMs+afterMs) must not be negative.");
}
+
+ if (graceMs < 0) {
+ throw new IllegalArgumentException("Grace period must not be
negative.");
+ }
+
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream. Using the method
explicitly sets the grace period to
+ * the duration specified by {@code afterWindowEnd} which means that out
of order records arriving
+ * after the window end will be dropped. The delay is defined as
(stream_time - record_timestamp).
+ *
+ * @param timeDifference join window interval
+ * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
+ * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative of can't be represented as {@code long milliseconds}
+ * @return A new JoinWindows object with the specified window definition
and grace period
+ */
public static JoinWindows ofTimeDifferenceAndGrace(final Duration
timeDifference, final Duration afterWindowEnd) {
return new JoinWindows(timeDifference.toMillis(),
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
}
+ /**
+ * Specifies that records of the same key are joinable if their timestamps
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code
timeDifference} earlier or later than
Review comment:
`before or after`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]