[ 
https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645857#comment-16645857
 ] 

ASF GitHub Bot commented on KAFKA-7277:
---------------------------------------

mjsax closed pull request #5759: KAFKA-7277: default implementation for new 
window store overloads
URL: https://github.com/apache/kafka/pull/5759
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ad74ae1e74d..50ce386f13f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -17,9 +17,12 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStore;
 
+import java.time.Instant;
+
 /**
  * A windowed store interface extending {@link StateStore}.
  *
@@ -87,6 +90,13 @@
      */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 
+    @Override
+    default WindowStoreIterator<V> fetch(final K key, final Instant from, 
final Instant to) {
+        ApiUtils.validateMillisecondInstant(from, "from");
+        ApiUtils.validateMillisecondInstant(to, "to");
+        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+    }
+
     /**
      * Get all the key-value pairs in the given key range and time range from 
all the existing windows.
      * <p>
@@ -102,6 +112,13 @@
      */
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long 
timeTo);
 
+    @Override
+    default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final Instant fromTime, final Instant toTime) {
+        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
+        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+    }
+
     /**
      * Gets all the key-value pairs that belong to the windows within in the 
given time range.
      *
@@ -112,4 +129,11 @@
      * @throws NullPointerException if {@code null} is used for any key
      */
     KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
+
+    @Override
+    default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, 
final Instant to) {
+        ApiUtils.validateMillisecondInstant(from, "from");
+        ApiUtils.validateMillisecondInstant(to, "to");
+        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index f6b62b2b935..b55e5448139 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -16,11 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -205,13 +203,6 @@ public synchronized void put(final Bytes key, final byte[] 
value, final long win
         return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, 
underlyingIterator);
     }
 
-    @Override
-    public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant 
from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, 
final Bytes to, final long timeFrom, final long timeTo) {
         // since this function may not access the underlying inner store, we 
need to validate
@@ -241,16 +232,6 @@ public synchronized void put(final Bytes key, final byte[] 
value, final long win
         );
     }
 
-    @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
-                                                           final Bytes to,
-                                                           final Instant 
fromTime,
-                                                           final Instant 
toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
     private V fetchPrevious(final Bytes key, final long timestamp) {
         final byte[] value = underlying.fetch(key, timestamp);
         if (value != null) {
@@ -294,11 +275,4 @@ private V fetchPrevious(final Bytes key, final long 
timestamp) {
                 cacheFunction
         );
     }
-
-    @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant 
from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index d4e47c6d18f..9808ca967cf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -58,28 +56,11 @@
         return bytesStore.fetch(key, from, to);
     }
 
-    @Override
-    public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant 
from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo, final long from, final long to) {
         return bytesStore.fetch(keyFrom, keyTo, from, to);
     }
 
-    @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
-                                                           final Bytes to,
-                                                           final Instant 
fromTime,
-                                                           final Instant 
toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         return bytesStore.all();
@@ -90,13 +71,6 @@
         return bytesStore.fetchAll(timeFrom, timeTo);
     }
 
-    @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant 
from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
     @Override
     public void put(final Bytes key, final byte[] value) {
         put(key, value, context.timestamp());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e1b6cd1d52e..5162eac8848 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -16,12 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -149,13 +147,6 @@ public V fetch(final K key, final long timestamp) {
                                                 time);
     }
 
-    @Override
-    public WindowStoreIterator<V> fetch(final K key, final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
         return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, 
metrics, serdes, time);
@@ -170,13 +161,6 @@ public V fetch(final K key, final long timestamp) {
                                                      time);
     }
 
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final long timeFrom, final long timeTo) {
         return new 
MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), 
timeFrom, timeTo),
@@ -186,13 +170,6 @@ public V fetch(final K key, final long timestamp) {
                                                      time);
     }
 
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
     @Override
     public void flush() {
         final long startNs = time.nanoseconds();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index e8037bc8163..d7bb523b049 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -93,26 +91,12 @@ public V fetch(final K key, final long timestamp) {
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).valuesIterator();
     }
 
-    @Override
-    public WindowStoreIterator<V> fetch(final K key, final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = 
bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), 
Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
     }
 
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
@@ -125,13 +109,6 @@ public V fetch(final K key, final long timestamp) {
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
     }
 
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
     private void maybeUpdateSeqnumForDups() {
         if (retainDuplicates) {
             seqnum = (seqnum + 1) & 0x7FFFFFFF;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Migrate Streams API to Duration instead of longMs times
> -------------------------------------------------------
>
>                 Key: KAFKA-7277
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7277
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: Nikolay Izhikov
>            Priority: Major
>              Labels: kip, newbie
>             Fix For: 2.1.0
>
>
> Right now Streams API unversally represents time as ms-since-unix-epoch.
> There's nothing wrong, per se, with this, but Duration is more ergonomic for 
> an API.
> What we don't want is to present a heterogeneous API, so we need to make sure 
> the whole Streams API is in terms of Duration.
>  
> Implementation note: Durations potentially worsen memory pressure and gc 
> performance, so internally, we will still use longMs as the representation. 
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to