http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java new file mode 100644 index 0000000..8c3b437 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.Serdes; + +/** + * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. + * + * @param <K> the type of keys + * @param <V> the type of values + * + * @see org.apache.kafka.streams.state.Stores#create(String) + */ +public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier { + + private final String name; + private final Serdes serdes; + private final Time time; + + public RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) { + this.name = name; + this.serdes = serdes; + this.time = time; + } + + public String name() { + return name; + } + + public StateStore get() { + return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time); + } + +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java new file mode 100644 index 0000000..8a600f9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.Entry; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Serdes; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.CompactionStyle; +import org.rocksdb.CompressionType; +import org.rocksdb.FlushOptions; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.File; +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; + +public class RocksDBStore<K, V> implements KeyValueStore<K, V> { + + private static final int TTL_NOT_USED = -1; + + // TODO: these values should be configurable + private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION; + private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL; + private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L; + private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L; + private static final long BLOCK_SIZE = 4096L; + private static final int TTL_SECONDS = TTL_NOT_USED; + private static final int MAX_WRITE_BUFFERS = 3; + private static final String DB_FILE_DIR = "rocksdb"; + + private final String name; + + private final Options options; + private final WriteOptions wOptions; + private final FlushOptions fOptions; + + private Serdes<K, V> serdes; + private ProcessorContext context; + protected File dbDir; + private RocksDB db; + + public RocksDBStore(String name, Serdes<K, V> serdes) { + this.name = name; + this.serdes = serdes; + + // initialize the rocksdb options + BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); + tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); + tableConfig.setBlockSize(BLOCK_SIZE); + + options = new Options(); + options.setTableFormatConfig(tableConfig); + options.setWriteBufferSize(WRITE_BUFFER_SIZE); + options.setCompressionType(COMPRESSION_TYPE); + options.setCompactionStyle(COMPACTION_STYLE); + options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); + options.setCreateIfMissing(true); + options.setErrorIfExists(false); + + wOptions = new WriteOptions(); + wOptions.setDisableWAL(true); + + fOptions = new FlushOptions(); + fOptions.setWaitForFlush(true); + } + + public void init(ProcessorContext context) { + serdes.init(context); + + this.context = context; + this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name); + this.db = openDB(this.dbDir, this.options, TTL_SECONDS); + } + + private RocksDB openDB(File dir, Options options, int ttl) { + try { + if (ttl == TTL_NOT_USED) { + dir.getParentFile().mkdirs(); + return RocksDB.open(options, dir.toString()); + } else { + throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based."); + // TODO: support TTL with change log? + // return TtlDB.open(options, dir.toString(), ttl, false); + } + } catch (RocksDBException e) { + // TODO: this needs to be handled more accurately + throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e); + } + } + + @Override + public String name() { + return this.name; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public V get(K key) { + try { + return serdes.valueFrom(this.db.get(serdes.rawKey(key))); + } catch (RocksDBException e) { + // TODO: this needs to be handled more accurately + throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e); + } + } + + @Override + public void put(K key, V value) { + try { + if (value == null) { + db.remove(wOptions, serdes.rawKey(key)); + } else { + db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value)); + } + } catch (RocksDBException e) { + // TODO: this needs to be handled more accurately + throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e); + } + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + for (Entry<K, V> entry : entries) + put(entry.key(), entry.value()); + } + + @Override + public V delete(K key) { + V value = get(key); + put(key, null); + return value; + } + + @Override + public KeyValueIterator<K, V> range(K from, K to) { + return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to); + } + + @Override + public KeyValueIterator<K, V> all() { + RocksIterator innerIter = db.newIterator(); + innerIter.seekToFirst(); + return new RocksDbIterator<K, V>(innerIter, serdes); + } + + @Override + public void flush() { + try { + db.flush(fOptions); + } catch (RocksDBException e) { + // TODO: this needs to be handled more accurately + throw new KafkaException("Error while executing flush from store " + this.name, e); + } + } + + @Override + public void close() { + flush(); + db.close(); + } + + private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> { + private final RocksIterator iter; + private final Serdes<K, V> serdes; + + public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) { + this.iter = iter; + this.serdes = serdes; + } + + protected byte[] peekRawKey() { + return iter.key(); + } + + protected Entry<K, V> getEntry() { + return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value())); + } + + @Override + public boolean hasNext() { + return iter.isValid(); + } + + @Override + public Entry<K, V> next() { + if (!hasNext()) + throw new NoSuchElementException(); + + Entry<K, V> entry = this.getEntry(); + iter.next(); + return entry; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("RocksDB iterator does not support remove"); + } + + @Override + public void close() { + iter.dispose(); + } + + } + + private static class LexicographicComparator implements Comparator<byte[]> { + + @Override + public int compare(byte[] left, byte[] right) { + for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { + int leftByte = left[i] & 0xff; + int rightByte = right[j] & 0xff; + if (leftByte != rightByte) { + return leftByte - rightByte; + } + } + return left.length - right.length; + } + } + + private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> { + // RocksDB's JNI interface does not expose getters/setters that allow the + // comparator to be pluggable, and the default is lexicographic, so it's + // safe to just force lexicographic comparator here for now. + private final Comparator<byte[]> comparator = new LexicographicComparator(); + byte[] rawToKey; + + public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes, + K from, K to) { + super(iter, serdes); + iter.seek(serdes.rawKey(from)); + this.rawToKey = serdes.rawKey(to); + } + + @Override + public boolean hasNext() { + return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java new file mode 100644 index 0000000..933ed91 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.Entry; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.WindowStoreUtil; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashSet; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.SimpleTimeZone; + +public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { + + public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute + + private static final long USE_CURRENT_TIMESTAMP = -1L; + + private static class Segment extends RocksDBStore<byte[], byte[]> { + public final long id; + + Segment(String name, long id) { + super(name, WindowStoreUtil.INNER_SERDES); + this.id = id; + } + + public void destroy() { + Utils.delete(dbDir); + } + } + + private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> { + private final Serdes<?, V> serdes; + private final KeyValueIterator<byte[], byte[]>[] iterators; + private int index = 0; + + RocksDBWindowStoreIterator(Serdes<?, V> serdes) { + this(serdes, WindowStoreUtil.NO_ITERATORS); + } + + RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) { + this.serdes = serdes; + this.iterators = iterators; + } + + @Override + public boolean hasNext() { + while (index < iterators.length) { + if (iterators[index].hasNext()) + return true; + + index++; + } + return false; + } + + @Override + public KeyValue<Long, V> next() { + if (index >= iterators.length) + throw new NoSuchElementException(); + + Entry<byte[], byte[]> entry = iterators[index].next(); + + return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()), + serdes.valueFrom(entry.value())); + } + + @Override + public void remove() { + if (index < iterators.length) + iterators[index].remove(); + } + + @Override + public void close() { + for (KeyValueIterator<byte[], byte[]> iterator : iterators) { + iterator.close(); + } + } + } + + private final String name; + private final long segmentInterval; + private final boolean retainDuplicates; + private final Segment[] segments; + private final Serdes<K, V> serdes; + private final SimpleDateFormat formatter; + + private ProcessorContext context; + private long currentSegmentId = -1L; + private int seqnum = 0; + + public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) { + this.name = name; + + // The segment interval must be greater than MIN_SEGMENT_INTERVAL + this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); + + this.segments = new Segment[numSegments]; + this.serdes = serdes; + + this.retainDuplicates = retainDuplicates; + + // Create a date formatter. Formatted timestamps are used as segment name suffixes + this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); + this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT")); + } + + @Override + public String name() { + return name; + } + + @Override + public void init(ProcessorContext context) { + this.context = context; + } + + @Override + public boolean persistent() { + return true; + } + + @Override + public void flush() { + for (Segment segment : segments) { + if (segment != null) + segment.flush(); + } + } + + @Override + public void close() { + for (Segment segment : segments) { + if (segment != null) + segment.close(); + } + } + + @Override + public void put(K key, V value) { + putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP); + } + + @Override + public void put(K key, V value, long timestamp) { + putAndReturnInternalKey(key, value, timestamp); + } + + @Override + public byte[] putAndReturnInternalKey(K key, V value, long t) { + long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t; + + long segmentId = segmentId(timestamp); + + if (segmentId > currentSegmentId) { + // A new segment will be created. Clean up old segments first. + currentSegmentId = segmentId; + cleanup(); + } + + // If the record is within the retention period, put it in the store. + if (segmentId > currentSegmentId - segments.length) { + if (retainDuplicates) + seqnum = (seqnum + 1) & 0x7FFFFFFF; + byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes); + getSegment(segmentId).put(binaryKey, serdes.rawValue(value)); + return binaryKey; + } else { + return null; + } + } + + @Override + public void putInternal(byte[] binaryKey, byte[] binaryValue) { + long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey)); + + if (segmentId > currentSegmentId) { + // A new segment will be created. Clean up old segments first. + currentSegmentId = segmentId; + cleanup(); + } + + // If the record is within the retention period, put it in the store. + if (segmentId > currentSegmentId - segments.length) + getSegment(segmentId).put(binaryKey, binaryValue); + } + + @Override + public byte[] getInternal(byte[] binaryKey) { + long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey)); + + Segment segment = segments[(int) (segmentId % segments.length)]; + + if (segment != null && segment.id == segmentId) { + return segment.get(binaryKey); + } else { + return null; + } + } + + @SuppressWarnings("unchecked") + @Override + public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) { + long segFrom = segmentId(timeFrom); + long segTo = segmentId(Math.max(0L, timeTo)); + + byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes); + byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes); + + ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>(); + + for (long segmentId = segFrom; segmentId <= segTo; segmentId++) { + Segment segment = segments[(int) (segmentId % segments.length)]; + + if (segment != null && segment.id == segmentId) + iterators.add(segment.range(binaryFrom, binaryUntil)); + } + + if (iterators.size() > 0) { + return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()])); + } else { + return new RocksDBWindowStoreIterator<>(serdes); + } + } + + private Segment getSegment(long segmentId) { + int index = (int) (segmentId % segments.length); + + if (segments[index] == null) { + segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId); + segments[index].init(context); + } + + return segments[index]; + } + + private void cleanup() { + for (int i = 0; i < segments.length; i++) { + if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) { + segments[i].close(); + segments[i].destroy(); + segments[i] = null; + } + } + } + + public long segmentId(long timestamp) { + return timestamp / segmentInterval; + } + + public String directorySuffix(long segmentId) { + return formatter.format(new Date(segmentId * segmentInterval)); + } + + // this method is used by a test + public Set<Long> segmentIds() { + HashSet<Long> segmentIds = new HashSet<>(); + + for (Segment segment : segments) { + if (segment != null) + segmentIds.add(segment.id); + } + + return segmentIds; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java new file mode 100644 index 0000000..fa85ce9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.Serdes; + +/** + * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. + * + * @param <K> the type of keys + * @param <V> the type of values + * + * @see org.apache.kafka.streams.state.Stores#create(String) + */ +public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { + + private final String name; + private final long retentionPeriod; + private final boolean retainDuplicates; + private final int numSegments; + private final Serdes serdes; + private final Time time; + + public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) { + this.name = name; + this.retentionPeriod = retentionPeriod; + this.retainDuplicates = retainDuplicates; + this.numSegments = numSegments; + this.serdes = serdes; + this.time = time; + } + + public String name() { + return name; + } + + public StateStore get() { + return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java new file mode 100644 index 0000000..da5544c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.state.Serdes; + +import java.util.HashSet; +import java.util.Set; + +public class StoreChangeLogger<K, V> { + + public interface ValueGetter<K, V> { + V get(K key); + } + + protected final Serdes<K, V> serialization; + + private final Set<K> dirty; + private final Set<K> removed; + private final int maxDirty; + private final int maxRemoved; + + private final String topic; + private int partition; + private ProcessorContext context; + + // always wrap the logged store with the metered store + public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) { + this.topic = topic; + this.serialization = serialization; + this.context = context; + this.partition = context.id().partition; + + this.dirty = new HashSet<>(); + this.removed = new HashSet<>(); + this.maxDirty = 100; // TODO: this needs to be configurable + this.maxRemoved = 100; // TODO: this needs to be configurable + } + + public void add(K key) { + this.dirty.add(key); + this.removed.remove(key); + } + + public void delete(K key) { + this.dirty.remove(key); + this.removed.add(key); + } + + public void maybeLogChange(ValueGetter<K, V> getter) { + if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved) + logChange(getter); + } + + public void logChange(ValueGetter<K, V> getter) { + RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); + if (collector != null) { + Serializer<K> keySerializer = serialization.keySerializer(); + Serializer<V> valueSerializer = serialization.valueSerializer(); + + for (K k : this.removed) { + collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer); + } + for (K k : this.dirty) { + V v = getter.get(k); + collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); + } + this.removed.clear(); + this.dirty.clear(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java index ba596a9..ecc303d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.AggregatorSupplier; import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -43,34 +42,21 @@ public class KStreamAggregateTest { private final Serializer<String> strSerializer = new StringSerializer(); private final Deserializer<String> strDeserializer = new StringDeserializer(); - private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> { + private class StringCanonizer implements Aggregator<String, String, String> { - private class StringCanonizer implements Aggregator<String, String, String> { - - @Override - public String initialValue() { - return "0"; - } - - @Override - public String add(String aggKey, String value, String aggregate) { - return aggregate + "+" + value; - } - - @Override - public String remove(String aggKey, String value, String aggregate) { - return aggregate + "-" + value; - } + @Override + public String initialValue(String aggKey) { + return "0"; + } - @Override - public String merge(String aggr1, String aggr2) { - return "(" + aggr1 + ") + (" + aggr2 + ")"; - } + @Override + public String add(String aggKey, String value, String aggregate) { + return aggregate + "+" + value; } @Override - public Aggregator<String, String, String> get() { - return new StringCanonizer(); + public String remove(String aggKey, String value, String aggregate) { + return aggregate + "-" + value; } } @@ -83,7 +69,7 @@ public class KStreamAggregateTest { String topic1 = "topic1"; KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); - KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizeSupplier(), + KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizer(), HoppingWindows.of("topic1-Canonized").with(10L).every(5L), strSerializer, strSerializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index b5037ee..439aa09 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.AggregatorSupplier; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.KStreamTestDriver; @@ -41,34 +40,21 @@ public class KTableAggregateTest { private final Serializer<String> strSerializer = new StringSerializer(); private final Deserializer<String> strDeserializer = new StringDeserializer(); - private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> { + private class StringCanonizer implements Aggregator<String, String, String> { - private class StringCanonizer implements Aggregator<String, String, String> { - - @Override - public String initialValue() { - return "0"; - } - - @Override - public String add(String aggKey, String value, String aggregate) { - return aggregate + "+" + value; - } - - @Override - public String remove(String aggKey, String value, String aggregate) { - return aggregate + "-" + value; - } + @Override + public String initialValue(String aggKey) { + return "0"; + } - @Override - public String merge(String aggr1, String aggr2) { - return "(" + aggr1 + ") + (" + aggr2 + ")"; - } + @Override + public String add(String aggKey, String value, String aggregate) { + return aggregate + "+" + value; } @Override - public Aggregator<String, String, String> get() { - return new StringCanonizer(); + public String remove(String aggKey, String value, String aggregate) { + return aggregate + "-" + value; } } @@ -81,7 +67,7 @@ public class KTableAggregateTest { String topic1 = "topic1"; KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); - KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizeSupplier(), + KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizer(), new NoOpKeyValueMapper<String, String>(), strSerializer, strSerializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 5e336cc..bc6f71b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.state.OffsetCheckpoint; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockStateStoreSupplier; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index b2f45fd..85a8a15 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamingConfig; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.state.OffsetCheckpoint; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockStateStoreSupplier; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java deleted file mode 100644 index d40f308..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -import org.apache.kafka.streams.processor.ProcessorContext; -import org.junit.Test; - -public abstract class AbstractKeyValueStoreTest { - - protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, - Class<K> keyClass, Class<V> valueClass, - boolean useContextSerdes); - - @Test - public void testPutGetRange() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false); - try { - - // Verify that the store reads and writes correctly ... - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(4, "four"); - store.put(5, "five"); - assertEquals(5, driver.sizeOf(store)); - assertEquals("zero", store.get(0)); - assertEquals("one", store.get(1)); - assertEquals("two", store.get(2)); - assertNull(store.get(3)); - assertEquals("four", store.get(4)); - assertEquals("five", store.get(5)); - store.delete(5); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertEquals(null, driver.flushedEntryStored(5)); - - assertEquals(false, driver.flushedEntryRemoved(0)); - assertEquals(false, driver.flushedEntryRemoved(1)); - assertEquals(false, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); - - // Check range iteration ... - try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) { - while (iter.hasNext()) { - Entry<Integer, String> entry = iter.next(); - if (entry.key().equals(2)) - assertEquals("two", entry.value()); - else if (entry.key().equals(4)) - assertEquals("four", entry.value()); - else - fail("Unexpected entry: " + entry); - } - } - - // Check range iteration ... - try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) { - while (iter.hasNext()) { - Entry<Integer, String> entry = iter.next(); - if (entry.key().equals(2)) - assertEquals("two", entry.value()); - else if (entry.key().equals(4)) - assertEquals("four", entry.value()); - else - fail("Unexpected entry: " + entry); - } - } - } finally { - store.close(); - } - } - - @Test - public void testPutGetRangeWithDefaultSerdes() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); - try { - - // Verify that the store reads and writes correctly ... - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(4, "four"); - store.put(5, "five"); - assertEquals(5, driver.sizeOf(store)); - assertEquals("zero", store.get(0)); - assertEquals("one", store.get(1)); - assertEquals("two", store.get(2)); - assertNull(store.get(3)); - assertEquals("four", store.get(4)); - assertEquals("five", store.get(5)); - store.delete(5); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertEquals(null, driver.flushedEntryStored(5)); - - assertEquals(false, driver.flushedEntryRemoved(0)); - assertEquals(false, driver.flushedEntryRemoved(1)); - assertEquals(false, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); - } finally { - store.close(); - } - } - - @Test - public void testRestore() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - - // Add any entries that will be restored to any store - // that uses the driver's context ... - driver.addEntryToRestoreLog(0, "zero"); - driver.addEntryToRestoreLog(1, "one"); - driver.addEntryToRestoreLog(2, "two"); - driver.addEntryToRestoreLog(4, "four"); - - // Create the store, which should register with the context and automatically - // receive the restore entries ... - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false); - try { - // Verify that the store's contents were properly restored ... - assertEquals(0, driver.checkForRestoredEntries(store)); - - // and there are no other entries ... - assertEquals(4, driver.sizeOf(store)); - } finally { - store.close(); - } - } - - @Test - public void testRestoreWithDefaultSerdes() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - - // Add any entries that will be restored to any store - // that uses the driver's context ... - driver.addEntryToRestoreLog(0, "zero"); - driver.addEntryToRestoreLog(1, "one"); - driver.addEntryToRestoreLog(2, "two"); - driver.addEntryToRestoreLog(4, "four"); - - // Create the store, which should register with the context and automatically - // receive the restore entries ... - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); - try { - // Verify that the store's contents were properly restored ... - assertEquals(0, driver.checkForRestoredEntries(store)); - - // and there are no other entries ... - assertEquals(4, driver.sizeOf(store)); - } finally { - store.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java deleted file mode 100644 index 2b90d0a..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStoreSupplier; - -public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { - - @SuppressWarnings("unchecked") - @Override - protected <K, V> KeyValueStore<K, V> createKeyValueStore( - ProcessorContext context, - Class<K> keyClass, Class<V> valueClass, - boolean useContextSerdes) { - - StateStoreSupplier supplier; - if (useContextSerdes) { - Serializer<K> keySer = (Serializer<K>) context.keySerializer(); - Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer(); - Serializer<V> valSer = (Serializer<V>) context.valueSerializer(); - Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer(); - supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build(); - } else { - supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build(); - } - - KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get(); - store.init(context); - return store; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java deleted file mode 100644 index 81adfad..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.junit.Test; - -public class InMemoryLRUCacheStoreTest { - - @SuppressWarnings("unchecked") - @Test - public void testPutGetRange() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(); - StateStoreSupplier supplier = Stores.create("my-store") - .withIntegerKeys().withStringValues() - .inMemory().maxEntries(3) - .build(); - KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get(); - store.init(driver.context()); - - // Verify that the store reads and writes correctly, keeping only the last 2 entries ... - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(3, "three"); - store.put(4, "four"); - store.put(5, "five"); - - // It should only keep the last 4 added ... - assertEquals(3, driver.sizeOf(store)); - assertNull(store.get(0)); - assertNull(store.get(1)); - assertNull(store.get(2)); - assertEquals("three", store.get(3)); - assertEquals("four", store.get(4)); - assertEquals("five", store.get(5)); - store.delete(5); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertNull(driver.flushedEntryStored(0)); - assertNull(driver.flushedEntryStored(1)); - assertNull(driver.flushedEntryStored(2)); - assertEquals("three", driver.flushedEntryStored(3)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); - - assertEquals(true, driver.flushedEntryRemoved(0)); - assertEquals(true, driver.flushedEntryRemoved(1)); - assertEquals(true, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(3)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); - } - - @SuppressWarnings("unchecked") - @Test - public void testPutGetRangeWithDefaultSerdes() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(); - - Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer(); - Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer(); - Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer(); - Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer(); - StateStoreSupplier supplier = Stores.create("my-store") - .withKeys(keySer, keyDeser) - .withValues(valSer, valDeser) - .inMemory().maxEntries(3) - .build(); - KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get(); - store.init(driver.context()); - - // Verify that the store reads and writes correctly, keeping only the last 2 entries ... - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(3, "three"); - store.put(4, "four"); - store.put(5, "five"); - - // It should only keep the last 4 added ... - assertEquals(3, driver.sizeOf(store)); - assertNull(store.get(0)); - assertNull(store.get(1)); - assertNull(store.get(2)); - assertEquals("three", store.get(3)); - assertEquals("four", store.get(4)); - assertEquals("five", store.get(5)); - store.delete(5); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertNull(driver.flushedEntryStored(0)); - assertNull(driver.flushedEntryStored(1)); - assertNull(driver.flushedEntryStored(2)); - assertEquals("three", driver.flushedEntryStored(3)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); - - assertEquals(true, driver.flushedEntryRemoved(0)); - assertEquals(true, driver.flushedEntryRemoved(1)); - assertEquals(true, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(3)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); - } - - @Test - public void testRestore() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - - // Add any entries that will be restored to any store - // that uses the driver's context ... - driver.addEntryToRestoreLog(1, "one"); - driver.addEntryToRestoreLog(2, "two"); - driver.addEntryToRestoreLog(4, "four"); - - // Create the store, which should register with the context and automatically - // receive the restore entries ... - StateStoreSupplier supplier = Stores.create("my-store") - .withIntegerKeys().withStringValues() - .inMemory().maxEntries(3) - .build(); - KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get(); - store.init(driver.context()); - - // Verify that the store's contents were properly restored ... - assertEquals(0, driver.checkForRestoredEntries(store)); - - // and there are no other entries ... - assertEquals(3, driver.sizeOf(store)); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 108797a..b0c9bd7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -48,7 +48,7 @@ import java.util.Set; * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes. * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use - * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic. + * {@link org.apache.kafka.streams.state.internals.MeteredKeyValueStore} to monitor and write its entries to the Kafka topic. * <p> * <h2>Basic usage</h2> * This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries. @@ -93,7 +93,7 @@ import java.util.Set; * <p> * <h2>Restoring a store</h2> * This component can be used to test whether a {@link KeyValueStore} implementation properly - * {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that + * {@link ProcessorContext#register(StateStore, boolean, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that * the persisted contents of a store are properly restored from the flushed entries when the store instance is started. * <p> * To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java deleted file mode 100644 index 20e92ef..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStoreSupplier; - -public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { - - @SuppressWarnings("unchecked") - @Override - protected <K, V> KeyValueStore<K, V> createKeyValueStore( - ProcessorContext context, - Class<K> keyClass, - Class<V> valueClass, - boolean useContextSerdes) { - - StateStoreSupplier supplier; - if (useContextSerdes) { - Serializer<K> keySer = (Serializer<K>) context.keySerializer(); - Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer(); - Serializer<V> valSer = (Serializer<V>) context.valueSerializer(); - Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer(); - supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build(); - } else { - supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build(); - } - - KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get(); - store.init(context); - return store; - - } -}
