This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 905f813  MINOR: default implementation for new window store overloads 
(#5759)
905f813 is described below

commit 905f8135078127e08633400277c5829b10971d42
Author: John Roesler <[email protected]>
AuthorDate: Wed Oct 10 21:08:21 2018 -0500

    MINOR: default implementation for new window store overloads (#5759)
    
    Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck 
<[email protected]>, Guozhang Wang <[email protected]>, Nikolay Izhikov 
<[email protected]>
---
 .../apache/kafka/streams/state/WindowStore.java    | 24 ++++++++++++++++++++
 .../state/internals/CachingWindowStore.java        | 26 ----------------------
 .../internals/ChangeLoggingWindowBytesStore.java   | 26 ----------------------
 .../state/internals/MeteredWindowStore.java        | 23 -------------------
 .../state/internals/RocksDBWindowStore.java        | 23 -------------------
 5 files changed, 24 insertions(+), 98 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ad74ae1..50ce386 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -17,9 +17,12 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStore;
 
+import java.time.Instant;
+
 /**
  * A windowed store interface extending {@link StateStore}.
  *
@@ -87,6 +90,13 @@ public interface WindowStore<K, V> extends StateStore, 
ReadOnlyWindowStore<K, V>
      */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 
+    @Override
+    default WindowStoreIterator<V> fetch(final K key, final Instant from, 
final Instant to) {
+        ApiUtils.validateMillisecondInstant(from, "from");
+        ApiUtils.validateMillisecondInstant(to, "to");
+        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+    }
+
     /**
      * Get all the key-value pairs in the given key range and time range from 
all the existing windows.
      * <p>
@@ -102,6 +112,13 @@ public interface WindowStore<K, V> extends StateStore, 
ReadOnlyWindowStore<K, V>
      */
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long 
timeTo);
 
+    @Override
+    default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final Instant fromTime, final Instant toTime) {
+        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
+        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+    }
+
     /**
      * Gets all the key-value pairs that belong to the windows within in the 
given time range.
      *
@@ -112,4 +129,11 @@ public interface WindowStore<K, V> extends StateStore, 
ReadOnlyWindowStore<K, V>
      * @throws NullPointerException if {@code null} is used for any key
      */
     KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
+
+    @Override
+    default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, 
final Instant to) {
+        ApiUtils.validateMillisecondInstant(from, "from");
+        ApiUtils.validateMillisecondInstant(to, "to");
+        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index f6b62b2..b55e544 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -16,11 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -206,13 +204,6 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
     }
 
     @Override
-    public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant 
from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, 
final Bytes to, final long timeFrom, final long timeTo) {
         // since this function may not access the underlying inner store, we 
need to validate
         // if store is open outside as well.
@@ -241,16 +232,6 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
         );
     }
 
-    @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
-                                                           final Bytes to,
-                                                           final Instant 
fromTime,
-                                                           final Instant 
toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
     private V fetchPrevious(final Bytes key, final long timestamp) {
         final byte[] value = underlying.fetch(key, timestamp);
         if (value != null) {
@@ -294,11 +275,4 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
                 cacheFunction
         );
     }
-
-    @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant 
from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index d4e47c6..9808ca9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -59,28 +57,11 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore
     }
 
     @Override
-    public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant 
from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo, final long from, final long to) {
         return bytesStore.fetch(keyFrom, keyTo, from, to);
     }
 
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
-                                                           final Bytes to,
-                                                           final Instant 
fromTime,
-                                                           final Instant 
toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         return bytesStore.all();
     }
@@ -91,13 +72,6 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore
     }
 
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant 
from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public void put(final Bytes key, final byte[] value) {
         put(key, value, context.timestamp());
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e1b6cd1..5162eac 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -16,12 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -150,13 +148,6 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public WindowStoreIterator<V> fetch(final K key, final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<K>, V> all() {
         return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, 
metrics, serdes, time);
     }
@@ -171,13 +162,6 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final long timeFrom, final long timeTo) {
         return new 
MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), 
timeFrom, timeTo),
                                                      fetchTime,
@@ -187,13 +171,6 @@ public class MeteredWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
-    @Override
     public void flush() {
         final long startNs = time.nanoseconds();
         try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index e8037bc..d7bb523 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -94,26 +92,12 @@ public class RocksDBWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public WindowStoreIterator<V> fetch(final K key, final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = 
bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), 
Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, 
final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<K>, V> all() {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
@@ -125,13 +109,6 @@ public class RocksDBWindowStore<K, V> extends 
WrappedStateStore.AbstractStateSto
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, 
windowSize).keyValueIterator();
     }
 
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final 
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
     private void maybeUpdateSeqnumForDups() {
         if (retainDuplicates) {
             seqnum = (seqnum + 1) & 0x7FFFFFFF;

Reply via email to