This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 905f813 MINOR: default implementation for new window store overloads
(#5759)
905f813 is described below
commit 905f8135078127e08633400277c5829b10971d42
Author: John Roesler <[email protected]>
AuthorDate: Wed Oct 10 21:08:21 2018 -0500
MINOR: default implementation for new window store overloads (#5759)
Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
<[email protected]>, Guozhang Wang <[email protected]>, Nikolay Izhikov
<[email protected]>
---
.../apache/kafka/streams/state/WindowStore.java | 24 ++++++++++++++++++++
.../state/internals/CachingWindowStore.java | 26 ----------------------
.../internals/ChangeLoggingWindowBytesStore.java | 26 ----------------------
.../state/internals/MeteredWindowStore.java | 23 -------------------
.../state/internals/RocksDBWindowStore.java | 23 -------------------
5 files changed, 24 insertions(+), 98 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ad74ae1..50ce386 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -17,9 +17,12 @@
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
+import java.time.Instant;
+
/**
* A windowed store interface extending {@link StateStore}.
*
@@ -87,6 +90,13 @@ public interface WindowStore<K, V> extends StateStore,
ReadOnlyWindowStore<K, V>
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
+ @Override
+ default WindowStoreIterator<V> fetch(final K key, final Instant from,
final Instant to) {
+ ApiUtils.validateMillisecondInstant(from, "from");
+ ApiUtils.validateMillisecondInstant(to, "to");
+ return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+ }
+
/**
* Get all the key-value pairs in the given key range and time range from
all the existing windows.
* <p>
@@ -102,6 +112,13 @@ public interface WindowStore<K, V> extends StateStore,
ReadOnlyWindowStore<K, V>
*/
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long
timeTo);
+ @Override
+ default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final Instant fromTime, final Instant toTime) {
+ ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
+ ApiUtils.validateMillisecondInstant(toTime, "toTime");
+ return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+ }
+
/**
* Gets all the key-value pairs that belong to the windows within in the
given time range.
*
@@ -112,4 +129,11 @@ public interface WindowStore<K, V> extends StateStore,
ReadOnlyWindowStore<K, V>
* @throws NullPointerException if {@code null} is used for any key
*/
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
+
+ @Override
+ default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from,
final Instant to) {
+ ApiUtils.validateMillisecondInstant(from, "from");
+ ApiUtils.validateMillisecondInstant(to, "to");
+ return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index f6b62b2..b55e544 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -16,11 +16,9 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -206,13 +204,6 @@ class CachingWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore impl
}
@Override
- public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant
from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to, final long timeFrom, final long timeTo) {
// since this function may not access the underlying inner store, we
need to validate
// if store is open outside as well.
@@ -241,16 +232,6 @@ class CachingWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore impl
);
}
- @Override
- public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
- final Bytes to,
- final Instant
fromTime,
- final Instant
toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
- }
-
private V fetchPrevious(final Bytes key, final long timestamp) {
final byte[] value = underlying.fetch(key, timestamp);
if (value != null) {
@@ -294,11 +275,4 @@ class CachingWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore impl
cacheFunction
);
}
-
- @Override
- public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant
from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
- }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index d4e47c6..9808ca9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -59,28 +57,11 @@ class ChangeLoggingWindowBytesStore extends
WrappedStateStore.AbstractStateStore
}
@Override
- public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant
from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes
keyFrom, final Bytes keyTo, final long from, final long to) {
return bytesStore.fetch(keyFrom, keyTo, from, to);
}
@Override
- public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
- final Bytes to,
- final Instant
fromTime,
- final Instant
toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
return bytesStore.all();
}
@@ -91,13 +72,6 @@ class ChangeLoggingWindowBytesStore extends
WrappedStateStore.AbstractStateStore
}
@Override
- public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant
from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public void put(final Bytes key, final byte[] value) {
put(key, value, context.timestamp());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e1b6cd1..5162eac 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -16,12 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -150,13 +148,6 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
@Override
- public WindowStoreIterator<V> fetch(final K key, final Instant from, final
Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<K>, V> all() {
return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime,
metrics, serdes, time);
}
@@ -171,13 +162,6 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final
Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final long timeFrom, final long timeTo) {
return new
MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to),
timeFrom, timeTo),
fetchTime,
@@ -187,13 +171,6 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
- }
-
- @Override
public void flush() {
final long startNs = time.nanoseconds();
try {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index e8037bc..d7bb523 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -94,26 +92,12 @@ public class RocksDBWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
@Override
- public WindowStoreIterator<V> fetch(final K key, final Instant from, final
Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator =
bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)),
Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes,
windowSize).keyValueIterator();
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<K>, V> all() {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes,
windowSize).keyValueIterator();
@@ -125,13 +109,6 @@ public class RocksDBWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes,
windowSize).keyValueIterator();
}
- @Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final
Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
- }
-
private void maybeUpdateSeqnumForDups() {
if (retainDuplicates) {
seqnum = (seqnum + 1) & 0x7FFFFFFF;