Repository: kafka Updated Branches: refs/heads/trunk 0e5700fb6 -> 999108667
KAFKA-3740: Enable configuration of RocksDBStores Add new config StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG to enable advanced RocksDB users to override default RocksDB configuration Author: Damian Guy <[email protected]> Reviewers: Roger Hoover, Dan Norwood, Eno Thereska, Guozhang Wang Closes #1640 from dguy/kafka-3740-listener Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99910866 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99910866 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99910866 Branch: refs/heads/trunk Commit: 9991086671b0a74bc5fd5698ba81aaf9a2985404 Parents: 0e5700f Author: Damian Guy <[email protected]> Authored: Thu Jul 21 11:04:08 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Jul 21 11:04:08 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/StreamsConfig.java | 11 +++++- .../streams/state/RocksDBConfigSetter.java | 37 ++++++++++++++++++++ .../streams/state/internals/RocksDBStore.java | 12 ++++++- .../streams/state/KeyValueStoreTestDriver.java | 15 ++++++-- .../internals/RocksDBKeyValueStoreTest.java | 28 +++++++++++++++ .../StreamThreadStateStoreProviderTest.java | 12 ++++++- .../apache/kafka/test/MockProcessorContext.java | 4 +-- 7 files changed, 111 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 7f32434..a68de4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -118,6 +118,10 @@ public class StreamsConfig extends AbstractConfig { /** <code>client.id</code> */ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; + public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter"; + public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface"; + + static { CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value Type.STRING, @@ -213,7 +217,12 @@ public class StreamsConfig extends AbstractConfig { 2, atLeast(1), Importance.LOW, - CommonClientConfigs.METRICS_NUM_SAMPLES_DOC); + CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) + .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, + Type.CLASS, + null, + Importance.LOW, + ROCKSDB_CONFIG_SETTER_CLASS_DOC); } // this is the list of configs for underlying clients http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java new file mode 100644 index 0000000..20a65f1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.rocksdb.Options; + +import java.util.Map; + +/** + * An interface to that allows developers to customize the RocksDB settings + * for a given Store. Please read the <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB Tuning Guide</a>. + */ +public interface RocksDBConfigSetter { + + /** + * Set the rocks db options for the provided storeName. + * + * @param storeName the name of the store being configured + * @param options the Rocks DB options + * @param configs the configuration supplied to {@link org.apache.kafka.streams.StreamsConfig} + */ + void setConfig(final String storeName, final Options options, final Map<String, Object> configs); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index a8badcd..4993173 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -19,13 +19,16 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.StateSerdes; import org.rocksdb.BlockBasedTableConfig; @@ -44,6 +47,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; @@ -121,7 +125,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { this.keySerde = keySerde; this.valueSerde = valueSerde; - // initialize the rocksdb options + // initialize the default rocksdb options BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); tableConfig.setBlockSize(BLOCK_SIZE); @@ -158,6 +162,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @SuppressWarnings("unchecked") public void openDB(ProcessorContext context) { + final Map<String, Object> configs = context.appConfigs(); + final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG); + if (configSetterClass != null) { + final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass); + configSetter.setConfig(name, options, configs); + } // we need to construct the serde while opening DB since // it is also triggered by windowed DB segments without initialization this.serdes = new StateSerdes<>(name, http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 1861e06..5519ab4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -131,6 +131,8 @@ import java.util.Set; */ public class KeyValueStoreTestDriver<K, V> { + private final Properties props; + /** * Create a driver object that will have a {@link #context()} that records messages * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and @@ -214,12 +216,15 @@ public class KeyValueStoreTestDriver<K, V> { this.stateDir = TestUtils.tempDirectory(); this.stateDir.mkdirs(); - Properties props = new Properties(); + props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationId"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); + + this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector) { @Override public TaskId taskId() { @@ -254,12 +259,12 @@ public class KeyValueStoreTestDriver<K, V> { @Override public Map<String, Object> appConfigs() { - return null; + return new StreamsConfig(props).originals(); } @Override public Map<String, Object> appConfigsWithPrefix(String prefix) { - return null; + return new StreamsConfig(props).originalsWithPrefix(prefix); } }; } @@ -419,4 +424,8 @@ public class KeyValueStoreTestDriver<K, V> { flushedEntries.clear(); flushedRemovals.clear(); } + + public void setConfig(final String configName, final Object configValue) { + props.put(configName, configValue); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 280255a..6b8f3f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -16,10 +16,19 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.KeyValueStoreTestDriver; +import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.Stores; +import org.junit.Test; +import org.rocksdb.Options; + +import java.util.Map; + +import static org.junit.Assert.assertTrue; public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -43,4 +52,23 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { return store; } + + public static class TheRocksDbConfigSetter implements RocksDBConfigSetter { + + static boolean called = false; + + @Override + public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) { + called = true; + } + } + + @Test + public void shouldUseCustomRocksDbConfigSetter() throws Exception { + final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + driver.setConfig(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TheRocksDbConfigSetter.class); + createKeyValueStore(driver.context(), Integer.class, String.class, false); + assertTrue(TheRocksDbConfigSetter.called); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index c105790..f2dbcff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; @@ -35,9 +36,11 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -57,6 +60,7 @@ public class StreamThreadStateStoreProviderTest { private StreamTask taskTwo; private StreamThreadStateStoreProvider provider; private StateDirectory stateDirectory; + private File stateDir; @Before public void before() throws IOException { @@ -77,7 +81,8 @@ public class StreamThreadStateStoreProviderTest { final String applicationId = "applicationId"; properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - final String stateConfigDir = TestUtils.tempDirectory().getPath(); + stateDir = TestUtils.tempDirectory(); + final String stateConfigDir = stateDir.getPath(); properties.put(StreamsConfig.STATE_DIR_CONFIG, stateConfigDir); @@ -111,6 +116,11 @@ public class StreamThreadStateStoreProviderTest { } + @After + public void cleanUp() { + Utils.delete(stateDir); + } + @Test public void shouldFindKeyValueStores() throws Exception { List<ReadOnlyKeyValueStore<String, String>> kvStores = http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index dba82ca..d82580d 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -191,12 +191,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S @Override public Map<String, Object> appConfigs() { - return null; + return Collections.emptyMap(); } @Override public Map<String, Object> appConfigsWithPrefix(String prefix) { - return null; + return Collections.emptyMap(); } public Map<String, StateStore> allStateStores() {
