[ 
https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708277#comment-16708277
 ] 

ASF GitHub Bot commented on KAFKA-7446:
---------------------------------------

guozhangwang closed pull request #5930: KAFKA-7446: Fix the duration and 
instant validation messages
URL: https://github.com/apache/kafka/pull/5930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 819732ac6d8..fdeb8845611 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -82,6 +82,7 @@
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
 /**
  * A Kafka client that allows for performing continuous computation on input 
coming from one or more input topics and
@@ -919,7 +920,8 @@ public void run() {
      * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
      */
     public synchronized boolean close(final Duration timeout) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(timeout, "timeout");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+        ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
 
         final long timeoutMs = timeout.toMillis();
         if (timeoutMs < 0) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java 
b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
index e888d7a120b..dd3b691b10c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
@@ -18,43 +18,57 @@
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Objects;
+
+import static java.lang.String.format;
 
 public final class ApiUtils {
+
+    private static final String MILLISECOND_VALIDATION_FAIL_MSG_FRMT = 
"Invalid value for parameter \"%s\" (value was: %s). ";
+
     private ApiUtils() {
     }
 
     /**
      * Validates that milliseconds from {@code duration} can be retrieved.
      * @param duration Duration to check.
-     * @param name Name of params for an error message.
+     * @param messagePrefix Prefix text for an error message.
      * @return Milliseconds from {@code duration}.
      */
-    public static long validateMillisecondDuration(final Duration duration, 
final String name) {
+    public static long validateMillisecondDuration(final Duration duration, 
final String messagePrefix) {
         try {
             if (duration == null)
-                throw new IllegalArgumentException("[" + 
Objects.toString(name) + "] shouldn't be null.");
+                throw new IllegalArgumentException(messagePrefix + "It 
shouldn't be null.");
 
             return duration.toMillis();
         } catch (final ArithmeticException e) {
-            throw new IllegalArgumentException("[" + name + "] can't be 
converted to milliseconds. ", e);
+            throw new IllegalArgumentException(messagePrefix + "It can't be 
converted to milliseconds.", e);
         }
     }
 
     /**
      * Validates that milliseconds from {@code instant} can be retrieved.
      * @param instant Instant to check.
-     * @param name Name of params for an error message.
+     * @param messagePrefix Prefix text for an error message.
      * @return Milliseconds from {@code instant}.
      */
-    public static long validateMillisecondInstant(final Instant instant, final 
String name) {
+    public static long validateMillisecondInstant(final Instant instant, final 
String messagePrefix) {
         try {
             if (instant == null)
-                throw new IllegalArgumentException("[" + name + "] shouldn't 
be null.");
+                throw new IllegalArgumentException(messagePrefix + "It 
shouldn't be null.");
 
             return instant.toEpochMilli();
         } catch (final ArithmeticException e) {
-            throw new IllegalArgumentException("[" + name + "] can't be 
converted to milliseconds. ", e);
+            throw new IllegalArgumentException(messagePrefix + "It can't be 
converted to milliseconds.", e);
         }
     }
+
+    /**
+     * Generates the prefix message for validateMillisecondXXXXXX() utility
+     * @param value Object to be converted to milliseconds
+     * @param name Object name
+     * @return Error message prefix to use in exception
+     */
+    public static String prepareMillisCheckFailMsgPrefix(final Object value, 
final String name) {
+        return format(MILLISECOND_VALIDATION_FAIL_MSG_FRMT, name, value);
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 62eade4298a..8a05c49608d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 /**
@@ -128,7 +129,8 @@ public static JoinWindows of(final long timeDifferenceMs) 
throws IllegalArgument
      * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
      */
     public static JoinWindows of(final Duration timeDifference) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
         return of(timeDifference.toMillis());
     }
 
@@ -161,7 +163,8 @@ public JoinWindows before(final long timeDifferenceMs) 
throws IllegalArgumentExc
      */
     @SuppressWarnings({"deprecation"}) // removing segments from Windows will 
fix this
     public JoinWindows before(final Duration timeDifference) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
         return before(timeDifference.toMillis());
     }
 
@@ -194,7 +197,8 @@ public JoinWindows after(final long timeDifferenceMs) 
throws IllegalArgumentExce
      */
     @SuppressWarnings({"deprecation"}) // removing segments from Windows will 
fix this
     public JoinWindows after(final Duration timeDifference) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
         return after(timeDifference.toMillis());
     }
 
@@ -226,7 +230,8 @@ public long size() {
      */
     @SuppressWarnings({"deprecation"}) // removing segments from Windows will 
fix this
     public JoinWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
         if (afterWindowEnd.toMillis() < 0) {
             throw new IllegalArgumentException("Grace period must not be 
negative.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index a19412d7b13..a0d6e34cec6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -34,6 +34,8 @@
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * Used to describe how a {@link StateStore} should be materialized.
  * You can either provide a custom {@link StateStore} backend through one of 
the provided methods accepting a supplier
@@ -247,7 +249,9 @@ protected Materialized(final Materialized<K, V, S> 
materialized) {
      * @throws IllegalArgumentException if retention is negative or can't be 
represented as {@code long milliseconds}
      */
     public Materialized<K, V, S> withRetention(final Duration retention) 
throws IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(retention, "retention");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(retention, 
"retention");
+        ApiUtils.validateMillisecondDuration(retention, msgPrefix);
+
         if (retention.toMillis() < 0) {
             throw new IllegalArgumentException("Retention must not be 
negative.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 02c7cbfe795..cb84eba9a08 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -23,6 +23,7 @@
 import java.time.Duration;
 import java.util.Objects;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 
@@ -108,7 +109,8 @@ public static SessionWindows with(final long 
inactivityGapMs) {
      * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
      */
     public static SessionWindows with(final Duration inactivityGap) {
-        ApiUtils.validateMillisecondDuration(inactivityGap, "inactivityGap");
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
+        ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix);
         return with(inactivityGap.toMillis());
     }
 
@@ -145,7 +147,9 @@ public SessionWindows until(final long durationMs) throws 
IllegalArgumentExcepti
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
      */
     public SessionWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+
         if (afterWindowEnd.toMillis() < 0) {
             throw new IllegalArgumentException("Grace period must not be 
negative.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 46485b146d4..942b54d614a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 /**
@@ -125,7 +126,8 @@ public static TimeWindows of(final long sizeMs) throws 
IllegalArgumentException
      * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
      */
     public static TimeWindows of(final Duration size) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(size, "size");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
+        ApiUtils.validateMillisecondDuration(size, msgPrefix);
         return of(size.toMillis());
     }
 
@@ -138,14 +140,15 @@ public static TimeWindows of(final Duration size) throws 
IllegalArgumentExceptio
      *
      * @param advanceMs The advance interval ("hop") in milliseconds of the 
window, with the requirement that {@code 0 < advanceMs <= sizeMs}.
      * @return a new window definition with default maintain duration of 1 day
-     * @throws IllegalArgumentException if the advance interval is negative, 
zero, or larger-or-equal the window size
+     * @throws IllegalArgumentException if the advance interval is negative, 
zero, or larger than the window size
      * @deprecated Use {@link #advanceBy(Duration)} instead
      */
     @SuppressWarnings("deprecation") // will be fixed when we remove segments 
from Windows
     @Deprecated
     public TimeWindows advanceBy(final long advanceMs) {
         if (advanceMs <= 0 || advanceMs > sizeMs) {
-            throw new IllegalArgumentException(String.format("AdvanceMs must 
lie within interval (0, %d].", sizeMs));
+            throw new IllegalArgumentException(String.format("Window 
advancement interval should be more than zero " +
+                    "and less than window duration which is %d ms, but given 
advancement interval is: %d ms", sizeMs, advanceMs));
         }
         return new TimeWindows(sizeMs, advanceMs, grace, maintainDurationMs, 
segments);
     }
@@ -159,11 +162,12 @@ public TimeWindows advanceBy(final long advanceMs) {
      *
      * @param advance The advance interval ("hop") of the window, with the 
requirement that {@code 0 < advance.toMillis() <= sizeMs}.
      * @return a new window definition with default maintain duration of 1 day
-     * @throws IllegalArgumentException if the advance interval is negative, 
zero, or larger-or-equal the window size
+     * @throws IllegalArgumentException if the advance interval is negative, 
zero, or larger than the window size
      */
     @SuppressWarnings("deprecation") // will be fixed when we remove segments 
from Windows
     public TimeWindows advanceBy(final Duration advance) {
-        ApiUtils.validateMillisecondDuration(advance, "advance");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, 
"advance");
+        ApiUtils.validateMillisecondDuration(advance, msgPrefix);
         return advanceBy(advance.toMillis());
     }
 
@@ -196,7 +200,8 @@ public long size() {
      */
     @SuppressWarnings("deprecation") // will be fixed when we remove segments 
from Windows
     public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
         if (afterWindowEnd.toMillis() < 0) {
             throw new IllegalArgumentException("Grace period must not be 
negative.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 46d7270b332..0a45d817b66 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -25,6 +25,8 @@
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * The unlimited window specifications used for aggregations.
  * <p>
@@ -82,7 +84,8 @@ public UnlimitedWindows startOn(final long startMs) throws 
IllegalArgumentExcept
      * @throws IllegalArgumentException if the start time is negative or can't 
be represented as {@code long milliseconds}
      */
     public UnlimitedWindows startOn(final Instant start) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(start, "start");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, 
"start");
+        ApiUtils.validateMillisecondInstant(start, msgPrefix);
         return startOn(start.toEpochMilli());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 21e1c17a898..37e3a2e14a3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -31,6 +31,8 @@
 
 import java.util.List;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 public class ProcessorContextImpl extends AbstractProcessorContext implements 
RecordCollector.Supplier {
 
     private final StreamTask task;
@@ -161,7 +163,8 @@ public Cancellable schedule(final long interval, final 
PunctuationType type, fin
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
                                 final Punctuator callback) throws 
IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(interval, "interval");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, 
"interval");
+        ApiUtils.validateMillisecondDuration(interval, msgPrefix);
         return schedule(interval.toMillis(), type, callback);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index f7a182472be..7991b0d4738 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -34,6 +34,8 @@
 import java.time.Duration;
 import java.util.Objects;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * Factory for creating state stores in Kafka Streams.
  * <p>
@@ -195,8 +197,10 @@ public static WindowBytesStoreSupplier 
persistentWindowStore(final String name,
                                                                  final 
Duration windowSize,
                                                                  final boolean 
retainDuplicates) throws IllegalArgumentException {
         Objects.requireNonNull(name, "name cannot be null");
-        ApiUtils.validateMillisecondDuration(retentionPeriod, 
"retentionPeriod");
-        ApiUtils.validateMillisecondDuration(windowSize, "windowSize");
+        final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+        ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
 
         final long defaultSegmentInterval = 
Math.max(retentionPeriod.toMillis() / 2, 60_000L);
 
@@ -259,7 +263,8 @@ public static SessionBytesStoreSupplier 
persistentSessionStore(final String name
     @SuppressWarnings("deprecation")
     public static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
                                                                    final 
Duration retentionPeriod) {
-        ApiUtils.validateMillisecondDuration(retentionPeriod, 
"retentionPeriod");
+        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
         return persistentSessionStore(name, retentionPeriod.toMillis());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 50ce386f13f..e13e4a98686 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -23,6 +23,8 @@
 
 import java.time.Instant;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * A windowed store interface extending {@link StateStore}.
  *
@@ -92,8 +94,8 @@
 
     @Override
     default WindowStoreIterator<V> fetch(final K key, final Instant from, 
final Instant to) {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetch(key, from.toEpochMilli(), to.toEpochMilli());
     }
 
@@ -114,8 +116,8 @@
 
     @Override
     default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final Instant fromTime, final Instant toTime) {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        ApiUtils.validateMillisecondInstant(fromTime, 
prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
+        ApiUtils.validateMillisecondInstant(toTime, 
prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
         return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
     }
 
@@ -132,8 +134,8 @@
 
     @Override
     default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, 
final Instant to) {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetchAll(from.toEpochMilli(), to.toEpochMilli());
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index d95b44222e2..17ca8afb7dd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -28,6 +28,8 @@
 import java.util.List;
 import java.util.Objects;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
  * org.apache.kafka.streams.processor.internals.ProcessorTopology}
@@ -89,8 +91,8 @@ public V fetch(final K key, final long time) {
 
     @Override
     public WindowStoreIterator<V> fetch(final K key, final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetch(key, from.toEpochMilli(), to.toEpochMilli());
     }
 
@@ -112,8 +114,8 @@ public V fetch(final K key, final long time) {
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        ApiUtils.validateMillisecondInstant(fromTime, 
prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
+        ApiUtils.validateMillisecondInstant(toTime, 
prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
         return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
     }
 
@@ -148,8 +150,8 @@ public V fetch(final K key, final long time) {
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetchAll(from.toEpochMilli(), to.toEpochMilli());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index 99abdc4746b..5303d06bece 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -37,6 +37,8 @@
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * A very simple window store stub for testing purposes.
  */
@@ -77,8 +79,8 @@ public V fetch(final K key, final long time) {
 
     @Override
     public WindowStoreIterator<V> fetch(final K key, final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetch(key, from.toEpochMilli(), to.toEpochMilli());
     }
 
@@ -175,8 +177,8 @@ public void remove() {
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetchAll(from.toEpochMilli(), to.toEpochMilli());
     }
 
@@ -229,8 +231,8 @@ public void remove() {
                                                             final K to,
                                                             final Instant 
fromTime,
                                                             final Instant 
toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        ApiUtils.validateMillisecondInstant(fromTime, 
prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
+        ApiUtils.validateMillisecondInstant(toTime, 
prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
         return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Better error message to explain the upper limit of TimeWindow
> -------------------------------------------------------------
>
>                 Key: KAFKA-7446
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7446
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Jacek Laskowski
>            Assignee: Srinivas Reddy
>            Priority: Trivial
>              Labels: newbie++
>
> The following code throws a {{IllegalArgumentException}}.
> {code:java}
> import org.apache.kafka.streams.kstream.TimeWindows
> import scala.concurrent.duration._
> val timeWindow = TimeWindows
> .of(1.minute.toMillis)
> .advanceBy(2.minutes.toMillis)
> {code}
> The exception is as follows and it's not clear why {{60000}} is the upper 
> limit (not to mention that {{AdvanceMs}} with the uppercase {{A}} did also 
> confuse me).
> {code:java}
> java.lang.IllegalArgumentException: AdvanceMs must lie within interval (0, 
> 60000].
> at 
> org.apache.kafka.streams.kstream.TimeWindows.advanceBy(TimeWindows.java:100)
> ... 44 elided{code}
> I think that the message should be more developer-friendly and explain the 
> boundaries, perhaps with an example (and a link to docs)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to