This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 16769d2 KAFKA-8215: Upgrade Rocks to v5.18.3 (#6743) 16769d2 is described below commit 16769d263e2e8fd91704d2e3519abf9dcba507df Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Fri May 17 13:32:17 2019 -0700 KAFKA-8215: Upgrade Rocks to v5.18.3 (#6743) This upgrade exposes a number of new options, including the WriteBufferManager which -- along with existing TableConfig options -- allows users to limit the total memory used by RocksDB across instances. This can alleviate some cascading OOM potential when, for example, a large number of stateful tasks are suddenly migrated to the same host. The RocksDB docs guarantee backwards format compatibility across versions Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bbej...@gmail.com>, --- checkstyle/suppressions.xml | 2 ++ gradle/dependencies.gradle | 2 +- ...tionsToDbOptionsColumnFamilyOptionsAdapter.java | 28 ++++++++++++++++++++++ .../streams/state/internals/RocksDBStore.java | 14 ++++++++--- .../state/internals/RocksDBTimestampedStore.java | 1 + ...sToDbOptionsColumnFamilyOptionsAdapterTest.java | 25 +++++++++++++++++++ .../streams/state/internals/RocksDBStoreTest.java | 7 +++++- 7 files changed, 74 insertions(+), 5 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 977a7ac..6b1cccc 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -111,6 +111,8 @@ files="KafkaConfigBackingStore.java"/> <suppress checks="CyclomaticComplexity" files="(Values|ConnectHeader|ConnectHeaders).java"/> + <suppress checks="CyclomaticComplexity" + files="RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java"/> <suppress checks="JavaNCSS" files="KafkaConfigBackingStore.java"/> diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index bed83fe..d0b897b 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -86,7 +86,7 @@ versions += [ owaspDepCheckPlugin: "4.0.2", powermock: "2.0.2", reflections: "0.9.11", - rocksDB: "5.15.10", + rocksDB: "5.18.3", scalafmt: "1.5.1", scalatest: "3.0.7", scoverage: "1.3.1", diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java index c07e43b..d28682a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.state.internals; +import org.rocksdb.AbstractCompactionFilter; +import org.rocksdb.AbstractCompactionFilterFactory; import org.rocksdb.AbstractComparator; import org.rocksdb.AbstractSlice; import org.rocksdb.AccessHint; @@ -44,6 +46,8 @@ import org.rocksdb.WALRecoveryMode; import java.util.Collection; import java.util.List; +import org.rocksdb.WriteBufferManager; +import org.slf4j.LoggerFactory; /** * The generic {@link Options} class allows users to set all configs on one object if only default column family @@ -56,6 +60,8 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options private final DBOptions dbOptions; private final ColumnFamilyOptions columnFamilyOptions; + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class); + RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(final DBOptions dbOptions, final ColumnFamilyOptions columnFamilyOptions) { this.dbOptions = dbOptions; @@ -484,6 +490,7 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options @Override public Options setWalTtlSeconds(final long walTtlSeconds) { + LOG.warn("option walTtlSeconds will be ignored: Streams does not expose RocksDB ttl functionality"); dbOptions.setWalTtlSeconds(walTtlSeconds); return this; } @@ -1355,6 +1362,27 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options } @Override + public Options setWriteBufferManager(final WriteBufferManager writeBufferManager) { + dbOptions.setWriteBufferManager(writeBufferManager); + return this; + } + + @Override + public WriteBufferManager writeBufferManager() { + return dbOptions.writeBufferManager(); + } + + public Options setCompactionFilter(final AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter) { + columnFamilyOptions.setCompactionFilter(compactionFilter); + return this; + } + + public Options setCompactionFilterFactory(final AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory) { + columnFamilyOptions.setCompactionFilterFactory(compactionFilterFactory); + return this; + } + + @Override public void close() { columnFamilyOptions.close(); dbOptions.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index ed74468..0643e64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -40,6 +41,7 @@ import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; import org.rocksdb.FlushOptions; import org.rocksdb.InfoLogLevel; +import org.rocksdb.LRUCache; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -86,10 +88,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt RocksDB db; RocksDBAccessor dbAccessor; - // the following option objects will be created in the constructor and closed in the close() method + // the following option objects will be created in openDB and closed in the close() method private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions; WriteOptions wOptions; FlushOptions fOptions; + private Cache cache; private BloomFilter filter; private RocksDBConfigSetter configSetter; @@ -121,9 +124,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt userSpecifiedOptions = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, columnFamilyOptions); final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); - tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); + cache = new LRUCache(BLOCK_CACHE_SIZE); + tableConfig.setBlockCache(cache); tableConfig.setBlockSize(BLOCK_SIZE); - + filter = new BloomFilter(); tableConfig.setFilter(filter); @@ -404,12 +408,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt fOptions.close(); db.close(); filter.close(); + cache.close(); dbAccessor = null; userSpecifiedOptions = null; wOptions = null; fOptions = null; db = null; + filter = null; + cache = null; } private void closeOpenIterators() { @@ -564,6 +571,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt } @Override + @SuppressWarnings("deprecation") public void toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index 5466ce8..05db0ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -247,6 +247,7 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped } @Override + @SuppressWarnings("deprecation") public void toggleDbForBulkLoading() { try { db.compactRange(oldColumnFamily, true, 1, 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java index 74e5cd5..62513a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java @@ -20,6 +20,9 @@ import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.junit.Test; import org.junit.runner.RunWith; +import org.rocksdb.AbstractCompactionFilter; +import org.rocksdb.AbstractCompactionFilter.Context; +import org.rocksdb.AbstractCompactionFilterFactory; import org.rocksdb.AccessHint; import org.rocksdb.BuiltinComparator; import org.rocksdb.ColumnFamilyOptions; @@ -35,11 +38,13 @@ import org.rocksdb.Logger; import org.rocksdb.Options; import org.rocksdb.PlainTableConfig; import org.rocksdb.RateLimiter; +import org.rocksdb.RemoveEmptyValueCompactionFilter; import org.rocksdb.RocksDB; import org.rocksdb.SstFileManager; import org.rocksdb.StringAppendOperator; import org.rocksdb.VectorMemTableConfig; import org.rocksdb.WALRecoveryMode; +import org.rocksdb.WriteBufferManager; import org.rocksdb.util.BytewiseComparator; import java.lang.reflect.InvocationTargetException; @@ -167,6 +172,9 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { case "org.rocksdb.WALRecoveryMode": parameters[i] = WALRecoveryMode.AbsoluteConsistency; break; + case "org.rocksdb.WriteBufferManager": + parameters[i] = new WriteBufferManager(1L, new LRUCache(1L)); + break; default: parameters[i] = parameterTypes[i].newInstance(); } @@ -229,6 +237,23 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { case "java.util.List": parameters[i] = new ArrayList<>(); break; + case "org.rocksdb.AbstractCompactionFilter": + parameters[i] = new RemoveEmptyValueCompactionFilter(); + break; + case "org.rocksdb.AbstractCompactionFilterFactory": + parameters[i] = new AbstractCompactionFilterFactory() { + + @Override + public AbstractCompactionFilter<?> createCompactionFilter(final Context context) { + return null; + } + + @Override + public String name() { + return "AbstractCompactionFilterFactory"; + } + }; + break; case "org.rocksdb.AbstractComparator": parameters[i] = new BytewiseComparator(new ComparatorOptions()); break; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index bfb1ecd..3d67866 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -38,6 +38,8 @@ import org.junit.Test; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; import org.rocksdb.Filter; +import org.rocksdb.Cache; +import org.rocksdb.LRUCache; import org.rocksdb.Options; import java.io.File; @@ -489,11 +491,13 @@ public class RocksDBStoreTest { static boolean bloomFiltersSet; static Filter filter; + static Cache cache; @Override public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) { final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); - tableConfig.setBlockCacheSize(50 * 1024 * 1024L); + cache = new LRUCache(50 * 1024 * 1024L); + tableConfig.setBlockCache(cache); tableConfig.setBlockSize(4096L); if (enableBloomFilters) { filter = new BloomFilter(); @@ -513,6 +517,7 @@ public class RocksDBStoreTest { if (filter != null) { filter.close(); } + cache.close(); } }