http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java deleted file mode 100644 index 743a110..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.streams.StreamingMetrics; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.utils.Time; - -import java.util.List; - -public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { - - protected final KeyValueStore<K, V> inner; - protected final StoreChangeLogger.ValueGetter getter; - protected final Serdes<K, V> serialization; - protected final String metricScope; - protected final Time time; - - private Sensor putTime; - private Sensor getTime; - private Sensor deleteTime; - private Sensor putAllTime; - private Sensor allTime; - private Sensor rangeTime; - private Sensor flushTime; - private Sensor restoreTime; - private StreamingMetrics metrics; - - private boolean loggingEnabled = true; - private StoreChangeLogger<K, V> changeLogger = null; - - // always wrap the store with the metered store - public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) { - this.inner = inner; - this.getter = new StoreChangeLogger.ValueGetter<K, V>() { - public V get(K key) { - return inner.get(key); - } - }; - this.serialization = serialization; - this.metricScope = metricScope; - this.time = time != null ? time : new SystemTime(); - } - - public MeteredKeyValueStore<K, V> disableLogging() { - loggingEnabled = false; - return this; - } - - @Override - public String name() { - return inner.name(); - } - - @Override - public void init(ProcessorContext context) { - final String name = name(); - this.metrics = context.metrics(); - this.putTime = this.metrics.addLatencySensor(metricScope, name, "put"); - this.getTime = this.metrics.addLatencySensor(metricScope, name, "get"); - this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete"); - this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all"); - this.allTime = this.metrics.addLatencySensor(metricScope, name, "all"); - this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range"); - this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush"); - this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore"); - - serialization.init(context); - this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null; - - // register and possibly restore the state from the logs - long startNs = time.nanoseconds(); - inner.init(context); - try { - final Deserializer<K> keyDeserializer = serialization.keyDeserializer(); - final Deserializer<V> valDeserializer = serialization.valueDeserializer(); - - context.register(this, loggingEnabled, new StateRestoreCallback() { - @Override - public void restore(byte[] key, byte[] value) { - inner.put(keyDeserializer.deserialize(name, key), - valDeserializer.deserialize(name, value)); - } - }); - } finally { - this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds()); - } - } - - @Override - public boolean persistent() { - return inner.persistent(); - } - - @Override - public V get(K key) { - long startNs = time.nanoseconds(); - try { - return this.inner.get(key); - } finally { - this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds()); - } - } - - @Override - public void put(K key, V value) { - long startNs = time.nanoseconds(); - try { - this.inner.put(key, value); - - if (loggingEnabled) { - changeLogger.add(key); - changeLogger.maybeLogChange(this.getter); - } - } finally { - this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds()); - } - } - - @Override - public void putAll(List<Entry<K, V>> entries) { - long startNs = time.nanoseconds(); - try { - this.inner.putAll(entries); - - if (loggingEnabled) { - for (Entry<K, V> entry : entries) { - K key = entry.key(); - changeLogger.add(key); - } - changeLogger.maybeLogChange(this.getter); - } - } finally { - this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds()); - } - } - - @Override - public V delete(K key) { - long startNs = time.nanoseconds(); - try { - V value = this.inner.delete(key); - - removed(key); - - return value; - } finally { - this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds()); - } - } - - /** - * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this - * store. - * - * @param key the key for the entry that the inner store removed - */ - protected void removed(K key) { - if (loggingEnabled) { - changeLogger.delete(key); - changeLogger.maybeLogChange(this.getter); - } - } - - @Override - public KeyValueIterator<K, V> range(K from, K to) { - return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime); - } - - @Override - public KeyValueIterator<K, V> all() { - return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime); - } - - @Override - public void close() { - inner.close(); - } - - @Override - public void flush() { - long startNs = time.nanoseconds(); - try { - this.inner.flush(); - - if (loggingEnabled) - changeLogger.logChange(this.getter); - } finally { - this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds()); - } - } - - private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> { - - private final KeyValueIterator<K1, V1> iter; - private final Sensor sensor; - private final long startNs; - - public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) { - this.iter = iter; - this.sensor = sensor; - this.startNs = time.nanoseconds(); - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public Entry<K1, V1> next() { - return iter.next(); - } - - @Override - public void remove() { - iter.remove(); - } - - @Override - public void close() { - try { - iter.close(); - } finally { - metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds()); - } - } - - } - -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java deleted file mode 100644 index cfcfb00..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.StreamingMetrics; -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateRestoreCallback; - -public class MeteredWindowStore<K, V> implements WindowStore<K, V> { - - protected final WindowStore<K, V> inner; - protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter; - protected final String metricScope; - protected final Time time; - - private Sensor putTime; - private Sensor getTime; - private Sensor rangeTime; - private Sensor flushTime; - private Sensor restoreTime; - private StreamingMetrics metrics; - - private boolean loggingEnabled = true; - private StoreChangeLogger<byte[], byte[]> changeLogger = null; - - // always wrap the store with the metered store - public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) { - this.inner = inner; - this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() { - public byte[] get(byte[] key) { - return inner.getInternal(key); - } - }; - this.metricScope = metricScope; - this.time = time != null ? time : new SystemTime(); - } - - public MeteredWindowStore<K, V> disableLogging() { - loggingEnabled = false; - return this; - } - - @Override - public String name() { - return inner.name(); - } - - @Override - public void init(ProcessorContext context) { - final String name = name(); - this.metrics = context.metrics(); - this.putTime = this.metrics.addLatencySensor(metricScope, name, "put"); - this.getTime = this.metrics.addLatencySensor(metricScope, name, "get"); - this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range"); - this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush"); - this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore"); - - this.changeLogger = this.loggingEnabled ? - new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null; - - // register and possibly restore the state from the logs - long startNs = time.nanoseconds(); - inner.init(context); - try { - context.register(this, loggingEnabled, new StateRestoreCallback() { - @Override - public void restore(byte[] key, byte[] value) { - inner.putInternal(key, value); - } - }); - } finally { - this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds()); - } - } - - @Override - public boolean persistent() { - return inner.persistent(); - } - - @Override - public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) { - return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime); - } - - @Override - public void put(K key, V value) { - putAndReturnInternalKey(key, value, -1L); - } - - @Override - public void put(K key, V value, long timestamp) { - putAndReturnInternalKey(key, value, timestamp); - } - - @Override - public byte[] putAndReturnInternalKey(K key, V value, long timestamp) { - long startNs = time.nanoseconds(); - try { - byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp); - - if (loggingEnabled) { - changeLogger.add(binKey); - changeLogger.maybeLogChange(this.getter); - } - - return binKey; - } finally { - this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds()); - } - } - - @Override - public void putInternal(byte[] binaryKey, byte[] binaryValue) { - inner.putInternal(binaryKey, binaryValue); - } - - @Override - public byte[] getInternal(byte[] binaryKey) { - long startNs = time.nanoseconds(); - try { - return this.inner.getInternal(binaryKey); - } finally { - this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds()); - } - } - - @Override - public void close() { - inner.close(); - } - - @Override - public void flush() { - long startNs = time.nanoseconds(); - try { - this.inner.flush(); - - if (loggingEnabled) - changeLogger.logChange(this.getter); - } finally { - this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds()); - } - } - - private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> { - - private final WindowStoreIterator<E> iter; - private final Sensor sensor; - private final long startNs; - - public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) { - this.iter = iter; - this.sensor = sensor; - this.startNs = time.nanoseconds(); - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public KeyValue<Long, E> next() { - return iter.next(); - } - - @Override - public void remove() { - iter.remove(); - } - - @Override - public void close() { - try { - iter.close(); - } finally { - metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds()); - } - } - - } - - WindowStore<K, V> inner() { - return inner; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java deleted file mode 100644 index d748aac..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Utils; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.EOFException; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * This class saves out a map of topic/partition=>offsets to a file. The format of the file is UTF-8 text containing the following: - * <pre> - * <version> - * <n> - * <topic_name_1> <partition_1> <offset_1> - * . - * . - * . - * <topic_name_n> <partition_n> <offset_n> - * </pre> - * The first line contains a number designating the format version (currently 0), the get line contains - * a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple - * separated by spaces. - */ -public class OffsetCheckpoint { - - private static final int VERSION = 0; - - private final File file; - private final Object lock; - - public OffsetCheckpoint(File file) throws IOException { - this.file = file; - this.lock = new Object(); - } - - public void write(Map<TopicPartition, Long> offsets) throws IOException { - synchronized (lock) { - // write to temp file and then swap with the existing file - File temp = new File(file.getAbsolutePath() + ".tmp"); - - FileOutputStream fileOutputStream = new FileOutputStream(temp); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)); - try { - writeIntLine(writer, VERSION); - writeIntLine(writer, offsets.size()); - - for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) - writeEntry(writer, entry.getKey(), entry.getValue()); - - writer.flush(); - fileOutputStream.getFD().sync(); - } finally { - writer.close(); - } - - Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); - } - } - - private void writeIntLine(BufferedWriter writer, int number) throws IOException { - writer.write(Integer.toString(number)); - writer.newLine(); - } - - private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException { - writer.write(part.topic()); - writer.write(' '); - writer.write(Integer.toString(part.partition())); - writer.write(' '); - writer.write(Long.toString(offset)); - writer.newLine(); - } - - public Map<TopicPartition, Long> read() throws IOException { - synchronized (lock) { - BufferedReader reader = null; - try { - reader = new BufferedReader(new FileReader(file)); - } catch (FileNotFoundException e) { - return Collections.emptyMap(); - } - - try { - int version = readInt(reader); - switch (version) { - case 0: - int expectedSize = readInt(reader); - Map<TopicPartition, Long> offsets = new HashMap<>(); - String line = reader.readLine(); - while (line != null) { - String[] pieces = line.split("\\s+"); - if (pieces.length != 3) - throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.", - line)); - - String topic = pieces[0]; - int partition = Integer.parseInt(pieces[1]); - long offset = Long.parseLong(pieces[2]); - offsets.put(new TopicPartition(topic, partition), offset); - line = reader.readLine(); - } - if (offsets.size() != expectedSize) - throw new IOException(String.format("Expected %d entries but found only %d", - expectedSize, - offsets.size())); - return offsets; - - default: - throw new IllegalArgumentException("Unknown offset checkpoint version: " + version); - } - } finally { - if (reader != null) - reader.close(); - } - } - } - - private int readInt(BufferedReader reader) throws IOException { - String line = reader.readLine(); - if (line == null) - throw new EOFException("File ended prematurely."); - int val = Integer.parseInt(line); - return val; - } - - public void delete() throws IOException { - file.delete(); - } - - @Override - public String toString() { - return this.file.getAbsolutePath(); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java deleted file mode 100644 index 41314b9..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; - -/** - * A {@link KeyValueStore} that stores all entries in a local RocksDB database. - * - * @param <K> the type of keys - * @param <V> the type of values - * - * @see Stores#create(String) - */ -public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier { - - private final String name; - private final Serdes serdes; - private final Time time; - - protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) { - this.name = name; - this.serdes = serdes; - this.time = time; - } - - public String name() { - return name; - } - - public StateStore get() { - return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java deleted file mode 100644 index 62b9f2c..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.rocksdb.BlockBasedTableConfig; -import org.rocksdb.CompactionStyle; -import org.rocksdb.CompressionType; -import org.rocksdb.FlushOptions; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.rocksdb.WriteOptions; - -import java.io.File; -import java.util.Comparator; -import java.util.List; -import java.util.NoSuchElementException; - -public class RocksDBStore<K, V> implements KeyValueStore<K, V> { - - private static final int TTL_NOT_USED = -1; - - // TODO: these values should be configurable - private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION; - private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL; - private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L; - private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L; - private static final long BLOCK_SIZE = 4096L; - private static final int TTL_SECONDS = TTL_NOT_USED; - private static final int MAX_WRITE_BUFFERS = 3; - private static final String DB_FILE_DIR = "rocksdb"; - - private final String name; - - private final Options options; - private final WriteOptions wOptions; - private final FlushOptions fOptions; - - private Serdes<K, V> serdes; - private ProcessorContext context; - protected File dbDir; - private RocksDB db; - - public RocksDBStore(String name, Serdes<K, V> serdes) { - this.name = name; - this.serdes = serdes; - - // initialize the rocksdb options - BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); - tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); - tableConfig.setBlockSize(BLOCK_SIZE); - - options = new Options(); - options.setTableFormatConfig(tableConfig); - options.setWriteBufferSize(WRITE_BUFFER_SIZE); - options.setCompressionType(COMPRESSION_TYPE); - options.setCompactionStyle(COMPACTION_STYLE); - options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); - options.setCreateIfMissing(true); - options.setErrorIfExists(false); - - wOptions = new WriteOptions(); - wOptions.setDisableWAL(true); - - fOptions = new FlushOptions(); - fOptions.setWaitForFlush(true); - } - - public void init(ProcessorContext context) { - serdes.init(context); - - this.context = context; - this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name); - this.db = openDB(this.dbDir, this.options, TTL_SECONDS); - } - - private RocksDB openDB(File dir, Options options, int ttl) { - try { - if (ttl == TTL_NOT_USED) { - dir.getParentFile().mkdirs(); - return RocksDB.open(options, dir.toString()); - } else { - throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based."); - // TODO: support TTL with change log? - // return TtlDB.open(options, dir.toString(), ttl, false); - } - } catch (RocksDBException e) { - // TODO: this needs to be handled more accurately - throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e); - } - } - - @Override - public String name() { - return this.name; - } - - @Override - public boolean persistent() { - return false; - } - - @Override - public V get(K key) { - try { - return serdes.valueFrom(this.db.get(serdes.rawKey(key))); - } catch (RocksDBException e) { - // TODO: this needs to be handled more accurately - throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e); - } - } - - @Override - public void put(K key, V value) { - try { - if (value == null) { - db.remove(wOptions, serdes.rawKey(key)); - } else { - db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value)); - } - } catch (RocksDBException e) { - // TODO: this needs to be handled more accurately - throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e); - } - } - - @Override - public void putAll(List<Entry<K, V>> entries) { - for (Entry<K, V> entry : entries) - put(entry.key(), entry.value()); - } - - @Override - public V delete(K key) { - V value = get(key); - put(key, null); - return value; - } - - @Override - public KeyValueIterator<K, V> range(K from, K to) { - return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to); - } - - @Override - public KeyValueIterator<K, V> all() { - RocksIterator innerIter = db.newIterator(); - innerIter.seekToFirst(); - return new RocksDbIterator<K, V>(innerIter, serdes); - } - - @Override - public void flush() { - try { - db.flush(fOptions); - } catch (RocksDBException e) { - // TODO: this needs to be handled more accurately - throw new KafkaException("Error while executing flush from store " + this.name, e); - } - } - - @Override - public void close() { - flush(); - db.close(); - } - - private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> { - private final RocksIterator iter; - private final Serdes<K, V> serdes; - - public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) { - this.iter = iter; - this.serdes = serdes; - } - - protected byte[] peekRawKey() { - return iter.key(); - } - - protected Entry<K, V> getEntry() { - return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value())); - } - - @Override - public boolean hasNext() { - return iter.isValid(); - } - - @Override - public Entry<K, V> next() { - if (!hasNext()) - throw new NoSuchElementException(); - - Entry<K, V> entry = this.getEntry(); - iter.next(); - return entry; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("RocksDB iterator does not support remove"); - } - - @Override - public void close() { - iter.dispose(); - } - - } - - private static class LexicographicComparator implements Comparator<byte[]> { - - @Override - public int compare(byte[] left, byte[] right) { - for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { - int leftByte = left[i] & 0xff; - int rightByte = right[j] & 0xff; - if (leftByte != rightByte) { - return leftByte - rightByte; - } - } - return left.length - right.length; - } - } - - private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> { - // RocksDB's JNI interface does not expose getters/setters that allow the - // comparator to be pluggable, and the default is lexicographic, so it's - // safe to just force lexicographic comparator here for now. - private final Comparator<byte[]> comparator = new LexicographicComparator(); - byte[] rawToKey; - - public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes, - K from, K to) { - super(iter, serdes); - iter.seek(serdes.rawKey(from)); - this.rawToKey = serdes.rawKey(to); - } - - @Override - public boolean hasNext() { - return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0; - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java deleted file mode 100644 index 2f30712..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java +++ /dev/null @@ -1,289 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.processor.ProcessorContext; - -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashSet; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.SimpleTimeZone; - -public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { - - public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute - - private static final long USE_CURRENT_TIMESTAMP = -1L; - - private static class Segment extends RocksDBStore<byte[], byte[]> { - public final long id; - - Segment(String name, long id) { - super(name, WindowStoreUtil.INNER_SERDES); - this.id = id; - } - - public void destroy() { - Utils.delete(dbDir); - } - } - - private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> { - private final Serdes<?, V> serdes; - private final KeyValueIterator<byte[], byte[]>[] iterators; - private int index = 0; - - RocksDBWindowStoreIterator(Serdes<?, V> serdes) { - this(serdes, WindowStoreUtil.NO_ITERATORS); - } - - RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) { - this.serdes = serdes; - this.iterators = iterators; - } - - @Override - public boolean hasNext() { - while (index < iterators.length) { - if (iterators[index].hasNext()) - return true; - - index++; - } - return false; - } - - @Override - public KeyValue<Long, V> next() { - if (index >= iterators.length) - throw new NoSuchElementException(); - - Entry<byte[], byte[]> entry = iterators[index].next(); - - return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()), - serdes.valueFrom(entry.value())); - } - - @Override - public void remove() { - if (index < iterators.length) - iterators[index].remove(); - } - - @Override - public void close() { - for (KeyValueIterator<byte[], byte[]> iterator : iterators) { - iterator.close(); - } - } - } - - private final String name; - private final long segmentInterval; - private final boolean retainDuplicates; - private final Segment[] segments; - private final Serdes<K, V> serdes; - private final SimpleDateFormat formatter; - - private ProcessorContext context; - private long currentSegmentId = -1L; - private int seqnum = 0; - - public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) { - this.name = name; - - // The segment interval must be greater than MIN_SEGMENT_INTERVAL - this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); - - this.segments = new Segment[numSegments]; - this.serdes = serdes; - - this.retainDuplicates = retainDuplicates; - - // Create a date formatter. Formatted timestamps are used as segment name suffixes - this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); - this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT")); - } - - @Override - public String name() { - return name; - } - - @Override - public void init(ProcessorContext context) { - this.context = context; - } - - @Override - public boolean persistent() { - return true; - } - - @Override - public void flush() { - for (Segment segment : segments) { - if (segment != null) - segment.flush(); - } - } - - @Override - public void close() { - for (Segment segment : segments) { - if (segment != null) - segment.close(); - } - } - - @Override - public void put(K key, V value) { - putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP); - } - - @Override - public void put(K key, V value, long timestamp) { - putAndReturnInternalKey(key, value, timestamp); - } - - @Override - public byte[] putAndReturnInternalKey(K key, V value, long t) { - long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t; - - long segmentId = segmentId(timestamp); - - if (segmentId > currentSegmentId) { - // A new segment will be created. Clean up old segments first. - currentSegmentId = segmentId; - cleanup(); - } - - // If the record is within the retention period, put it in the store. - if (segmentId > currentSegmentId - segments.length) { - if (retainDuplicates) - seqnum = (seqnum + 1) & 0x7FFFFFFF; - byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes); - getSegment(segmentId).put(binaryKey, serdes.rawValue(value)); - return binaryKey; - } else { - return null; - } - } - - @Override - public void putInternal(byte[] binaryKey, byte[] binaryValue) { - long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey)); - - if (segmentId > currentSegmentId) { - // A new segment will be created. Clean up old segments first. - currentSegmentId = segmentId; - cleanup(); - } - - // If the record is within the retention period, put it in the store. - if (segmentId > currentSegmentId - segments.length) - getSegment(segmentId).put(binaryKey, binaryValue); - } - - @Override - public byte[] getInternal(byte[] binaryKey) { - long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey)); - - Segment segment = segments[(int) (segmentId % segments.length)]; - - if (segment != null && segment.id == segmentId) { - return segment.get(binaryKey); - } else { - return null; - } - } - - @SuppressWarnings("unchecked") - @Override - public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) { - long segFrom = segmentId(timeFrom); - long segTo = segmentId(Math.max(0L, timeTo)); - - byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes); - byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes); - - ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>(); - - for (long segmentId = segFrom; segmentId <= segTo; segmentId++) { - Segment segment = segments[(int) (segmentId % segments.length)]; - - if (segment != null && segment.id == segmentId) - iterators.add(segment.range(binaryFrom, binaryUntil)); - } - - if (iterators.size() > 0) { - return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()])); - } else { - return new RocksDBWindowStoreIterator<>(serdes); - } - } - - private Segment getSegment(long segmentId) { - int index = (int) (segmentId % segments.length); - - if (segments[index] == null) { - segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId); - segments[index].init(context); - } - - return segments[index]; - } - - private void cleanup() { - for (int i = 0; i < segments.length; i++) { - if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) { - segments[i].close(); - segments[i].destroy(); - segments[i] = null; - } - } - } - - public long segmentId(long timestamp) { - return timestamp / segmentInterval; - } - - public String directorySuffix(long segmentId) { - return formatter.format(new Date(segmentId * segmentInterval)); - } - - // this method is used by a test - public Set<Long> segmentIds() { - HashSet<Long> segmentIds = new HashSet<>(); - - for (Segment segment : segments) { - if (segment != null) - segmentIds.add(segment.id); - } - - return segmentIds; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java deleted file mode 100644 index fcdcb9b..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreSupplier; - -/** - * A {@link KeyValueStore} that stores all entries in a local RocksDB database. - * - * @param <K> the type of keys - * @param <V> the type of values - * - * @see Stores#create(String) - */ -public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { - - private final String name; - private final long retentionPeriod; - private final boolean retainDuplicates; - private final int numSegments; - private final Serdes serdes; - private final Time time; - - public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) { - this.name = name; - this.retentionPeriod = retentionPeriod; - this.retainDuplicates = retainDuplicates; - this.numSegments = numSegments; - this.serdes = serdes; - this.time = time; - } - - public String name() { - return name; - } - - public StateStore get() { - return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java deleted file mode 100644 index ee6624e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.internals.RecordCollector; - -import java.util.HashSet; -import java.util.Set; - -public class StoreChangeLogger<K, V> { - - public interface ValueGetter<K, V> { - V get(K key); - } - - protected final Serdes<K, V> serialization; - - private final Set<K> dirty; - private final Set<K> removed; - private final int maxDirty; - private final int maxRemoved; - - private final String topic; - private int partition; - private ProcessorContext context; - - // always wrap the logged store with the metered store - public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) { - this.topic = topic; - this.serialization = serialization; - this.context = context; - this.partition = context.id().partition; - - this.dirty = new HashSet<>(); - this.removed = new HashSet<>(); - this.maxDirty = 100; // TODO: this needs to be configurable - this.maxRemoved = 100; // TODO: this needs to be configurable - } - - public void add(K key) { - this.dirty.add(key); - this.removed.remove(key); - } - - public void delete(K key) { - this.dirty.remove(key); - this.removed.add(key); - } - - public void maybeLogChange(ValueGetter<K, V> getter) { - if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved) - logChange(getter); - } - - public void logChange(ValueGetter<K, V> getter) { - RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); - if (collector != null) { - Serializer<K> keySerializer = serialization.keySerializer(); - Serializer<V> valueSerializer = serialization.valueSerializer(); - - for (K k : this.removed) { - collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer); - } - for (K k : this.dirty) { - V v = getter.get(k); - collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); - } - this.removed.clear(); - this.dirty.clear(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/Stores.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 5452040..46b2592 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -27,6 +27,9 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier; +import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; /** * Factory for creating key-value stores. http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java new file mode 100644 index 0000000..286db1b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.Entry; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Serdes; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * An in-memory key-value store based on a TreeMap. + * + * @param <K> The key type + * @param <V> The value type + * + * @see org.apache.kafka.streams.state.Stores#create(String) + */ +public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { + + private final String name; + private final Serdes serdes; + private final Time time; + + public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) { + this.name = name; + this.serdes = serdes; + this.time = time; + } + + public String name() { + return name; + } + + public StateStore get() { + return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time); + } + + private static class MemoryStore<K, V> implements KeyValueStore<K, V> { + + private final String name; + private final NavigableMap<K, V> map; + + public MemoryStore(String name) { + super(); + this.name = name; + this.map = new TreeMap<>(); + } + + @Override + public String name() { + return this.name; + } + + @Override + public void init(ProcessorContext context) { + // do-nothing since it is in-memory + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public V get(K key) { + return this.map.get(key); + } + + @Override + public void put(K key, V value) { + this.map.put(key, value); + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + for (Entry<K, V> entry : entries) + put(entry.key(), entry.value()); + } + + @Override + public V delete(K key) { + return this.map.remove(key); + } + + @Override + public KeyValueIterator<K, V> range(K from, K to) { + return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator()); + } + + @Override + public KeyValueIterator<K, V> all() { + return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator()); + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + @Override + public void close() { + // do-nothing + } + + private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> { + private final Iterator<Map.Entry<K, V>> iter; + + public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Entry<K, V> next() { + Map.Entry<K, V> entry = iter.next(); + return new Entry<>(entry.getKey(), entry.getValue()); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + } + + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java new file mode 100644 index 0000000..6a38423 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.Entry; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Serdes; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; + +/** + * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries. + * + * @param <K> The key type + * @param <V> The value type + * + */ +public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier { + + private final String name; + private final int capacity; + private final Serdes serdes; + private final Time time; + + public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) { + this.name = name; + this.capacity = capacity; + this.serdes = serdes; + this.time = time; + } + + public String name() { + return name; + } + + public StateStore get() { + MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity); + final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time); + cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() { + @Override + public void apply(K key, V value) { + store.removed(key); + } + }); + return store; + } + + private static interface EldestEntryRemovalListener<K, V> { + public void apply(K key, V value); + } + + protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { + + private final String name; + private final Map<K, V> map; + private final NavigableSet<K> keys; + private EldestEntryRemovalListener<K, V> listener; + + public MemoryLRUCache(String name, final int maxCacheSize) { + this.name = name; + this.keys = new TreeSet<>(); + // leave room for one extra entry to handle adding an entry before the oldest can be removed + this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { + if (size() > maxCacheSize) { + K key = eldest.getKey(); + keys.remove(key); + if (listener != null) listener.apply(key, eldest.getValue()); + return true; + } + return false; + } + }; + } + + protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) { + this.listener = listener; + } + + @Override + public String name() { + return this.name; + } + + @Override + public void init(ProcessorContext context) { + // do-nothing since it is in-memory + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public V get(K key) { + return this.map.get(key); + } + + @Override + public void put(K key, V value) { + this.map.put(key, value); + this.keys.add(key); + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + for (Entry<K, V> entry : entries) + put(entry.key(), entry.value()); + } + + @Override + public V delete(K key) { + V value = this.map.remove(key); + this.keys.remove(key); + return value; + } + + @Override + public KeyValueIterator<K, V> range(K from, K to) { + return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map); + } + + @Override + public KeyValueIterator<K, V> all() { + return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map); + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + @Override + public void close() { + // do-nothing + } + + private static class CacheIterator<K, V> implements KeyValueIterator<K, V> { + private final Iterator<K> keys; + private final Map<K, V> entries; + private K lastKey; + + public CacheIterator(Iterator<K> keys, Map<K, V> entries) { + this.keys = keys; + this.entries = entries; + } + + @Override + public boolean hasNext() { + return keys.hasNext(); + } + + @Override + public Entry<K, V> next() { + lastKey = keys.next(); + return new Entry<>(lastKey, entries.get(lastKey)); + } + + @Override + public void remove() { + keys.remove(); + entries.remove(lastKey); + } + + @Override + public void close() { + // do nothing + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java new file mode 100644 index 0000000..21f73b0 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.Entry; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Serdes; + +import java.util.List; + +public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { + + protected final KeyValueStore<K, V> inner; + protected final StoreChangeLogger.ValueGetter getter; + protected final Serdes<K, V> serialization; + protected final String metricScope; + protected final Time time; + + private Sensor putTime; + private Sensor getTime; + private Sensor deleteTime; + private Sensor putAllTime; + private Sensor allTime; + private Sensor rangeTime; + private Sensor flushTime; + private Sensor restoreTime; + private StreamingMetrics metrics; + + private boolean loggingEnabled = true; + private StoreChangeLogger<K, V> changeLogger = null; + + // always wrap the store with the metered store + public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) { + this.inner = inner; + this.getter = new StoreChangeLogger.ValueGetter<K, V>() { + public V get(K key) { + return inner.get(key); + } + }; + this.serialization = serialization; + this.metricScope = metricScope; + this.time = time != null ? time : new SystemTime(); + } + + public MeteredKeyValueStore<K, V> disableLogging() { + loggingEnabled = false; + return this; + } + + @Override + public String name() { + return inner.name(); + } + + @Override + public void init(ProcessorContext context) { + final String name = name(); + this.metrics = context.metrics(); + this.putTime = this.metrics.addLatencySensor(metricScope, name, "put"); + this.getTime = this.metrics.addLatencySensor(metricScope, name, "get"); + this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete"); + this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all"); + this.allTime = this.metrics.addLatencySensor(metricScope, name, "all"); + this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range"); + this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush"); + this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore"); + + serialization.init(context); + this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null; + + // register and possibly restore the state from the logs + long startNs = time.nanoseconds(); + inner.init(context); + try { + final Deserializer<K> keyDeserializer = serialization.keyDeserializer(); + final Deserializer<V> valDeserializer = serialization.valueDeserializer(); + + context.register(this, loggingEnabled, new StateRestoreCallback() { + @Override + public void restore(byte[] key, byte[] value) { + inner.put(keyDeserializer.deserialize(name, key), + valDeserializer.deserialize(name, value)); + } + }); + } finally { + this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds()); + } + } + + @Override + public boolean persistent() { + return inner.persistent(); + } + + @Override + public V get(K key) { + long startNs = time.nanoseconds(); + try { + return this.inner.get(key); + } finally { + this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds()); + } + } + + @Override + public void put(K key, V value) { + long startNs = time.nanoseconds(); + try { + this.inner.put(key, value); + + if (loggingEnabled) { + changeLogger.add(key); + changeLogger.maybeLogChange(this.getter); + } + } finally { + this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds()); + } + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + long startNs = time.nanoseconds(); + try { + this.inner.putAll(entries); + + if (loggingEnabled) { + for (Entry<K, V> entry : entries) { + K key = entry.key(); + changeLogger.add(key); + } + changeLogger.maybeLogChange(this.getter); + } + } finally { + this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds()); + } + } + + @Override + public V delete(K key) { + long startNs = time.nanoseconds(); + try { + V value = this.inner.delete(key); + + removed(key); + + return value; + } finally { + this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds()); + } + } + + /** + * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this + * store. + * + * @param key the key for the entry that the inner store removed + */ + protected void removed(K key) { + if (loggingEnabled) { + changeLogger.delete(key); + changeLogger.maybeLogChange(this.getter); + } + } + + @Override + public KeyValueIterator<K, V> range(K from, K to) { + return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime); + } + + @Override + public KeyValueIterator<K, V> all() { + return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime); + } + + @Override + public void close() { + inner.close(); + } + + @Override + public void flush() { + long startNs = time.nanoseconds(); + try { + this.inner.flush(); + + if (loggingEnabled) + changeLogger.logChange(this.getter); + } finally { + this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds()); + } + } + + private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> { + + private final KeyValueIterator<K1, V1> iter; + private final Sensor sensor; + private final long startNs; + + public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) { + this.iter = iter; + this.sensor = sensor; + this.startNs = time.nanoseconds(); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Entry<K1, V1> next() { + return iter.next(); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + try { + iter.close(); + } finally { + metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds()); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java new file mode 100644 index 0000000..821927d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; + +public class MeteredWindowStore<K, V> implements WindowStore<K, V> { + + protected final WindowStore<K, V> inner; + protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter; + protected final String metricScope; + protected final Time time; + + private Sensor putTime; + private Sensor getTime; + private Sensor rangeTime; + private Sensor flushTime; + private Sensor restoreTime; + private StreamingMetrics metrics; + + private boolean loggingEnabled = true; + private StoreChangeLogger<byte[], byte[]> changeLogger = null; + + // always wrap the store with the metered store + public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) { + this.inner = inner; + this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() { + public byte[] get(byte[] key) { + return inner.getInternal(key); + } + }; + this.metricScope = metricScope; + this.time = time != null ? time : new SystemTime(); + } + + public MeteredWindowStore<K, V> disableLogging() { + loggingEnabled = false; + return this; + } + + @Override + public String name() { + return inner.name(); + } + + @Override + public void init(ProcessorContext context) { + final String name = name(); + this.metrics = context.metrics(); + this.putTime = this.metrics.addLatencySensor(metricScope, name, "put"); + this.getTime = this.metrics.addLatencySensor(metricScope, name, "get"); + this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range"); + this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush"); + this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore"); + + this.changeLogger = this.loggingEnabled ? + new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null; + + // register and possibly restore the state from the logs + long startNs = time.nanoseconds(); + inner.init(context); + try { + context.register(this, loggingEnabled, new StateRestoreCallback() { + @Override + public void restore(byte[] key, byte[] value) { + inner.putInternal(key, value); + } + }); + } finally { + this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds()); + } + } + + @Override + public boolean persistent() { + return inner.persistent(); + } + + @Override + public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) { + return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime); + } + + @Override + public void put(K key, V value) { + putAndReturnInternalKey(key, value, -1L); + } + + @Override + public void put(K key, V value, long timestamp) { + putAndReturnInternalKey(key, value, timestamp); + } + + @Override + public byte[] putAndReturnInternalKey(K key, V value, long timestamp) { + long startNs = time.nanoseconds(); + try { + byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp); + + if (loggingEnabled) { + changeLogger.add(binKey); + changeLogger.maybeLogChange(this.getter); + } + + return binKey; + } finally { + this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds()); + } + } + + @Override + public void putInternal(byte[] binaryKey, byte[] binaryValue) { + inner.putInternal(binaryKey, binaryValue); + } + + @Override + public byte[] getInternal(byte[] binaryKey) { + long startNs = time.nanoseconds(); + try { + return this.inner.getInternal(binaryKey); + } finally { + this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds()); + } + } + + @Override + public void close() { + inner.close(); + } + + @Override + public void flush() { + long startNs = time.nanoseconds(); + try { + this.inner.flush(); + + if (loggingEnabled) + changeLogger.logChange(this.getter); + } finally { + this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds()); + } + } + + private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> { + + private final WindowStoreIterator<E> iter; + private final Sensor sensor; + private final long startNs; + + public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) { + this.iter = iter; + this.sensor = sensor; + this.startNs = time.nanoseconds(); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue<Long, E> next() { + return iter.next(); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + try { + iter.close(); + } finally { + metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds()); + } + } + + } + + WindowStore<K, V> inner() { + return inner; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java new file mode 100644 index 0000000..e276f83 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * This class saves out a map of topic/partition=>offsets to a file. The format of the file is UTF-8 text containing the following: + * <pre> + * <version> + * <n> + * <topic_name_1> <partition_1> <offset_1> + * . + * . + * . + * <topic_name_n> <partition_n> <offset_n> + * </pre> + * The first line contains a number designating the format version (currently 0), the get line contains + * a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple + * separated by spaces. + */ +public class OffsetCheckpoint { + + private static final int VERSION = 0; + + private final File file; + private final Object lock; + + public OffsetCheckpoint(File file) throws IOException { + this.file = file; + this.lock = new Object(); + } + + public void write(Map<TopicPartition, Long> offsets) throws IOException { + synchronized (lock) { + // write to temp file and then swap with the existing file + File temp = new File(file.getAbsolutePath() + ".tmp"); + + FileOutputStream fileOutputStream = new FileOutputStream(temp); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)); + try { + writeIntLine(writer, VERSION); + writeIntLine(writer, offsets.size()); + + for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) + writeEntry(writer, entry.getKey(), entry.getValue()); + + writer.flush(); + fileOutputStream.getFD().sync(); + } finally { + writer.close(); + } + + Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); + } + } + + private void writeIntLine(BufferedWriter writer, int number) throws IOException { + writer.write(Integer.toString(number)); + writer.newLine(); + } + + private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException { + writer.write(part.topic()); + writer.write(' '); + writer.write(Integer.toString(part.partition())); + writer.write(' '); + writer.write(Long.toString(offset)); + writer.newLine(); + } + + public Map<TopicPartition, Long> read() throws IOException { + synchronized (lock) { + BufferedReader reader = null; + try { + reader = new BufferedReader(new FileReader(file)); + } catch (FileNotFoundException e) { + return Collections.emptyMap(); + } + + try { + int version = readInt(reader); + switch (version) { + case 0: + int expectedSize = readInt(reader); + Map<TopicPartition, Long> offsets = new HashMap<>(); + String line = reader.readLine(); + while (line != null) { + String[] pieces = line.split("\\s+"); + if (pieces.length != 3) + throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.", + line)); + + String topic = pieces[0]; + int partition = Integer.parseInt(pieces[1]); + long offset = Long.parseLong(pieces[2]); + offsets.put(new TopicPartition(topic, partition), offset); + line = reader.readLine(); + } + if (offsets.size() != expectedSize) + throw new IOException(String.format("Expected %d entries but found only %d", + expectedSize, + offsets.size())); + return offsets; + + default: + throw new IllegalArgumentException("Unknown offset checkpoint version: " + version); + } + } finally { + if (reader != null) + reader.close(); + } + } + } + + private int readInt(BufferedReader reader) throws IOException { + String line = reader.readLine(); + if (line == null) + throw new EOFException("File ended prematurely."); + int val = Integer.parseInt(line); + return val; + } + + public void delete() throws IOException { + file.delete(); + } + + @Override + public String toString() { + return this.file.getAbsolutePath(); + } + +}
