Repository: kafka Updated Branches: refs/heads/trunk 43d5078e9 -> 5089f547d
HOTFIX: RocksDBStore must clear dirty flags after flush guozhangwang Without clearing the dirty flags, RocksDBStore will perform flush for every new record. This bug made the store performance painfully slower. Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1163 from ymatsuda/clear_dirty_flag Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5089f547 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5089f547 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5089f547 Branch: refs/heads/trunk Commit: 5089f547d5d64a0235e1b4adc327a0cb05eb4ca8 Parents: 43d5078 Author: Yasuhiro Matsuda <[email protected]> Authored: Tue Mar 29 13:30:56 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Mar 29 13:30:56 2016 -0700 ---------------------------------------------------------------------- .../streams/state/internals/RocksDBStore.java | 27 ++++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5089f547/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index b206f37..fe327f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -165,7 +165,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { public void apply(K key, RocksDBCacheEntry entry) { // flush all the dirty entries to RocksDB if this evicted entry is dirty if (entry.isDirty) { - flush(); + flushCache(); } } }); @@ -226,7 +226,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { RocksDBCacheEntry entry = cache.get(key); if (entry == null) { - byte[] rawKey = serdes.rawKey(key); V value = serdes.valueFrom(getInternal(serdes.rawKey(key))); cache.put(key, new RocksDBCacheEntry(value)); @@ -251,8 +250,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @Override public void put(K key, V value) { if (cache != null) { - cache.put(key, new RocksDBCacheEntry(value, true)); cacheDirtyKeys.add(key); + cache.put(key, new RocksDBCacheEntry(value, true)); } else { byte[] rawKey = serdes.rawKey(key); byte[] rawValue = serdes.rawValue(value); @@ -298,7 +297,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { put(entry.key, entry.value); } - // this function is only called in flush() + // this function is only called in flushCache() private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) { WriteBatch batch = new WriteBatch(); @@ -324,7 +323,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { public KeyValueIterator<K, V> range(K from, K to) { // we need to flush the cache if necessary before returning the iterator if (cache != null) - flush(); + flushCache(); return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to); } @@ -333,15 +332,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { public KeyValueIterator<K, V> all() { // we need to flush the cache if necessary before returning the iterator if (cache != null) - flush(); + flushCache(); RocksIterator innerIter = db.newIterator(); innerIter.seekToFirst(); return new RocksDbIterator<K, V>(innerIter, serdes); } - @Override - public void flush() { + private void flushCache() { // flush of the cache entries if necessary if (cache != null) { List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.keys.size()); @@ -350,7 +348,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { for (K key : cacheDirtyKeys) { RocksDBCacheEntry entry = cache.get(key); - assert entry.isDirty; + entry.isDirty = false; byte[] rawKey = serdes.rawKey(key); @@ -386,12 +384,19 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { cacheDirtyKeys.clear(); } - flushInternal(); - if (loggingEnabled) changeLogger.logChange(getter); } + @Override + public void flush() { + // flush of the cache entries if necessary + flushCache(); + + // flush RocksDB + flushInternal(); + } + public void flushInternal() { try { db.flush(fOptions);
