[ https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708277#comment-16708277 ]
ASF GitHub Bot commented on KAFKA-7446: --------------------------------------- guozhangwang closed pull request #5930: KAFKA-7446: Fix the duration and instant validation messages URL: https://github.com/apache/kafka/pull/5930 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 819732ac6d8..fdeb8845611 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -82,6 +82,7 @@ import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; /** * A Kafka client that allows for performing continuous computation on input coming from one or more input topics and @@ -919,7 +920,8 @@ public void run() { * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds} */ public synchronized boolean close(final Duration timeout) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(timeout, "timeout"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); + ApiUtils.validateMillisecondDuration(timeout, msgPrefix); final long timeoutMs = timeout.toMillis(); if (timeoutMs < 0) { diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java index e888d7a120b..dd3b691b10c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java @@ -18,43 +18,57 @@ import java.time.Duration; import java.time.Instant; -import java.util.Objects; + +import static java.lang.String.format; public final class ApiUtils { + + private static final String MILLISECOND_VALIDATION_FAIL_MSG_FRMT = "Invalid value for parameter \"%s\" (value was: %s). "; + private ApiUtils() { } /** * Validates that milliseconds from {@code duration} can be retrieved. * @param duration Duration to check. - * @param name Name of params for an error message. + * @param messagePrefix Prefix text for an error message. * @return Milliseconds from {@code duration}. */ - public static long validateMillisecondDuration(final Duration duration, final String name) { + public static long validateMillisecondDuration(final Duration duration, final String messagePrefix) { try { if (duration == null) - throw new IllegalArgumentException("[" + Objects.toString(name) + "] shouldn't be null."); + throw new IllegalArgumentException(messagePrefix + "It shouldn't be null."); return duration.toMillis(); } catch (final ArithmeticException e) { - throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e); + throw new IllegalArgumentException(messagePrefix + "It can't be converted to milliseconds.", e); } } /** * Validates that milliseconds from {@code instant} can be retrieved. * @param instant Instant to check. - * @param name Name of params for an error message. + * @param messagePrefix Prefix text for an error message. * @return Milliseconds from {@code instant}. */ - public static long validateMillisecondInstant(final Instant instant, final String name) { + public static long validateMillisecondInstant(final Instant instant, final String messagePrefix) { try { if (instant == null) - throw new IllegalArgumentException("[" + name + "] shouldn't be null."); + throw new IllegalArgumentException(messagePrefix + "It shouldn't be null."); return instant.toEpochMilli(); } catch (final ArithmeticException e) { - throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e); + throw new IllegalArgumentException(messagePrefix + "It can't be converted to milliseconds.", e); } } + + /** + * Generates the prefix message for validateMillisecondXXXXXX() utility + * @param value Object to be converted to milliseconds + * @param name Object name + * @return Error message prefix to use in exception + */ + public static String prepareMillisCheckFailMsgPrefix(final Object value, final String name) { + return format(MILLISECOND_VALIDATION_FAIL_MSG_FRMT, name, value); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 62eade4298a..8a05c49608d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS; /** @@ -128,7 +129,8 @@ public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgument * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} */ public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); + ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix); return of(timeDifference.toMillis()); } @@ -161,7 +163,8 @@ public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentExc */ @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); + ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix); return before(timeDifference.toMillis()); } @@ -194,7 +197,8 @@ public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentExce */ @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); + ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix); return after(timeDifference.toMillis()); } @@ -226,7 +230,8 @@ public long size() { */ @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); + ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix); if (afterWindowEnd.toMillis() < 0) { throw new IllegalArgumentException("Grace period must not be negative."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java index a19412d7b13..a0d6e34cec6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * Used to describe how a {@link StateStore} should be materialized. * You can either provide a custom {@link StateStore} backend through one of the provided methods accepting a supplier @@ -247,7 +249,9 @@ protected Materialized(final Materialized<K, V, S> materialized) { * @throws IllegalArgumentException if retention is negative or can't be represented as {@code long milliseconds} */ public Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(retention, "retention"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(retention, "retention"); + ApiUtils.validateMillisecondDuration(retention, msgPrefix); + if (retention.toMillis() < 0) { throw new IllegalArgumentException("Retention must not be negative."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index 02c7cbfe795..cb84eba9a08 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS; @@ -108,7 +109,8 @@ public static SessionWindows with(final long inactivityGapMs) { * @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds} */ public static SessionWindows with(final Duration inactivityGap) { - ApiUtils.validateMillisecondDuration(inactivityGap, "inactivityGap"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap"); + ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix); return with(inactivityGap.toMillis()); } @@ -145,7 +147,9 @@ public SessionWindows until(final long durationMs) throws IllegalArgumentExcepti * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} */ public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); + ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix); + if (afterWindowEnd.toMillis() < 0) { throw new IllegalArgumentException("Grace period must not be negative."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index 46485b146d4..942b54d614a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS; /** @@ -125,7 +126,8 @@ public static TimeWindows of(final long sizeMs) throws IllegalArgumentException * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds} */ public static TimeWindows of(final Duration size) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(size, "size"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size"); + ApiUtils.validateMillisecondDuration(size, msgPrefix); return of(size.toMillis()); } @@ -138,14 +140,15 @@ public static TimeWindows of(final Duration size) throws IllegalArgumentExceptio * * @param advanceMs The advance interval ("hop") in milliseconds of the window, with the requirement that {@code 0 < advanceMs <= sizeMs}. * @return a new window definition with default maintain duration of 1 day - * @throws IllegalArgumentException if the advance interval is negative, zero, or larger-or-equal the window size + * @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size * @deprecated Use {@link #advanceBy(Duration)} instead */ @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows @Deprecated public TimeWindows advanceBy(final long advanceMs) { if (advanceMs <= 0 || advanceMs > sizeMs) { - throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval (0, %d].", sizeMs)); + throw new IllegalArgumentException(String.format("Window advancement interval should be more than zero " + + "and less than window duration which is %d ms, but given advancement interval is: %d ms", sizeMs, advanceMs)); } return new TimeWindows(sizeMs, advanceMs, grace, maintainDurationMs, segments); } @@ -159,11 +162,12 @@ public TimeWindows advanceBy(final long advanceMs) { * * @param advance The advance interval ("hop") of the window, with the requirement that {@code 0 < advance.toMillis() <= sizeMs}. * @return a new window definition with default maintain duration of 1 day - * @throws IllegalArgumentException if the advance interval is negative, zero, or larger-or-equal the window size + * @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size */ @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows public TimeWindows advanceBy(final Duration advance) { - ApiUtils.validateMillisecondDuration(advance, "advance"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance"); + ApiUtils.validateMillisecondDuration(advance, msgPrefix); return advanceBy(advance.toMillis()); } @@ -196,7 +200,8 @@ public long size() { */ @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); + ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix); if (afterWindowEnd.toMillis() < 0) { throw new IllegalArgumentException("Grace period must not be negative."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 46d7270b332..0a45d817b66 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * The unlimited window specifications used for aggregations. * <p> @@ -82,7 +84,8 @@ public UnlimitedWindows startOn(final long startMs) throws IllegalArgumentExcept * @throws IllegalArgumentException if the start time is negative or can't be represented as {@code long milliseconds} */ public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(start, "start"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start"); + ApiUtils.validateMillisecondInstant(start, msgPrefix); return startOn(start.toEpochMilli()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 21e1c17a898..37e3a2e14a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -31,6 +31,8 @@ import java.util.List; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { private final StreamTask task; @@ -161,7 +163,8 @@ public Cancellable schedule(final long interval, final PunctuationType type, fin public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException { - ApiUtils.validateMillisecondDuration(interval, "interval"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval"); + ApiUtils.validateMillisecondDuration(interval, msgPrefix); return schedule(interval.toMillis(), type, callback); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index f7a182472be..7991b0d4738 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -34,6 +34,8 @@ import java.time.Duration; import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * Factory for creating state stores in Kafka Streams. * <p> @@ -195,8 +197,10 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException { Objects.requireNonNull(name, "name cannot be null"); - ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod"); - ApiUtils.validateMillisecondDuration(windowSize, "windowSize"); + final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); + ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix); + final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); + ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix); final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L); @@ -259,7 +263,8 @@ public static SessionBytesStoreSupplier persistentSessionStore(final String name @SuppressWarnings("deprecation") public static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod) { - ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); + ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix); return persistentSessionStore(name, retentionPeriod.toMillis()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index 50ce386f13f..e13e4a98686 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -23,6 +23,8 @@ import java.time.Instant; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * A windowed store interface extending {@link StateStore}. * @@ -92,8 +94,8 @@ @Override default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); + ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); + ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); return fetch(key, from.toEpochMilli(), to.toEpochMilli()); } @@ -114,8 +116,8 @@ @Override default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); + ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")); + ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")); return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); } @@ -132,8 +134,8 @@ @Override default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); + ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); + ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); return fetchAll(from.toEpochMilli(), to.toEpochMilli()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java index d95b44222e2..17ca8afb7dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link * org.apache.kafka.streams.processor.internals.ProcessorTopology} @@ -89,8 +91,8 @@ public V fetch(final K key, final long time) { @Override public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); + ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); + ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); return fetch(key, from.toEpochMilli(), to.toEpochMilli()); } @@ -112,8 +114,8 @@ public V fetch(final K key, final long time) { @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); + ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")); + ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")); return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); } @@ -148,8 +150,8 @@ public V fetch(final K key, final long time) { @Override public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); + ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); + ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); return fetchAll(from.toEpochMilli(), to.toEpochMilli()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 99abdc4746b..5303d06bece 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -37,6 +37,8 @@ import java.util.NavigableMap; import java.util.TreeMap; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * A very simple window store stub for testing purposes. */ @@ -77,8 +79,8 @@ public V fetch(final K key, final long time) { @Override public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); + ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); + ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); return fetch(key, from.toEpochMilli(), to.toEpochMilli()); } @@ -175,8 +177,8 @@ public void remove() { @Override public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); + ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); + ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); return fetchAll(from.toEpochMilli(), to.toEpochMilli()); } @@ -229,8 +231,8 @@ public void remove() { final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); + ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")); + ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")); return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Better error message to explain the upper limit of TimeWindow > ------------------------------------------------------------- > > Key: KAFKA-7446 > URL: https://issues.apache.org/jira/browse/KAFKA-7446 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.0.0 > Reporter: Jacek Laskowski > Assignee: Srinivas Reddy > Priority: Trivial > Labels: newbie++ > > The following code throws a {{IllegalArgumentException}}. > {code:java} > import org.apache.kafka.streams.kstream.TimeWindows > import scala.concurrent.duration._ > val timeWindow = TimeWindows > .of(1.minute.toMillis) > .advanceBy(2.minutes.toMillis) > {code} > The exception is as follows and it's not clear why {{60000}} is the upper > limit (not to mention that {{AdvanceMs}} with the uppercase {{A}} did also > confuse me). > {code:java} > java.lang.IllegalArgumentException: AdvanceMs must lie within interval (0, > 60000]. > at > org.apache.kafka.streams.kstream.TimeWindows.advanceBy(TimeWindows.java:100) > ... 44 elided{code} > I think that the message should be more developer-friendly and explain the > boundaries, perhaps with an example (and a link to docs)? -- This message was sent by Atlassian JIRA (v7.6.3#76005)