Repository: incubator-samza Updated Branches: refs/heads/master d7bd2df7a -> e63cfd39c
SAMZA-434; rocksdb docs Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e63cfd39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e63cfd39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e63cfd39 Branch: refs/heads/master Commit: e63cfd39ca85a78fd429c39ec5f2fbc98f30489a Parents: d7bd2df Author: Chris Riccomini <[email protected]> Authored: Mon Oct 27 12:17:45 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Oct 27 12:17:45 2014 -0700 ---------------------------------------------------------------------- .../versioned/container/state-management.md | 20 ++- .../versioned/jobs/configuration-table.html | 158 +++++++++++++++++-- .../RocksDbKeyValueStorageEngineFactory.scala | 2 +- 3 files changed, 159 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e63cfd39/docs/learn/documentation/versioned/container/state-management.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/state-management.md b/docs/learn/documentation/versioned/container/state-management.md index d71de0b..79067bb 100644 --- a/docs/learn/documentation/versioned/container/state-management.md +++ b/docs/learn/documentation/versioned/container/state-management.md @@ -126,17 +126,17 @@ Nothing prevents you from using an external database if you want to, but for man ### Key-value storage -Any storage engine can be plugged into Samza, as described below. Out of the box, Samza ships with a key-value store implementation that is built on [LevelDB](https://code.google.com/p/leveldb) using a [JNI API](https://github.com/fusesource/leveldbjni). +Any storage engine can be plugged into Samza, as described below. Out of the box, Samza ships with a key-value store implementation that is built on [RocksDB](http://rocksdb.org) using a [JNI API](https://github.com/facebook/rocksdb/wiki/RocksJava-Basics). -LevelDB has several nice properties. Its memory allocation is outside of the Java heap, which makes it more memory-efficient and less prone to garbage collection pauses than a Java-based storage engine. It is very fast for small datasets that fit in memory; datasets larger than memory are slower but still possible. It is [log-structured](http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/), allowing very fast writes. It also includes support for block compression, which helps to reduce I/O and memory usage. +RocksDB has several nice properties. Its memory allocation is outside of the Java heap, which makes it more memory-efficient and less prone to garbage collection pauses than a Java-based storage engine. It is very fast for small datasets that fit in memory; datasets larger than memory are slower but still possible. It is [log-structured](http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/), allowing very fast writes. It also includes support for block compression, which helps to reduce I/O and memory usage. -Samza includes an additional in-memory caching layer in front of LevelDB, which avoids the cost of deserialization for frequently-accessed objects and batches writes. If the same key is updated multiple times in quick succession, the batching coalesces those updates into a single write. The writes are flushed to the changelog when a task [commits](checkpointing.html). +Samza includes an additional in-memory caching layer in front of RocksDB, which avoids the cost of deserialization for frequently-accessed objects and batches writes. If the same key is updated multiple times in quick succession, the batching coalesces those updates into a single write. The writes are flushed to the changelog when a task [commits](checkpointing.html). To use a key-value store in your job, add the following to your job config: {% highlight jproperties %} # Use the key-value store implementation for a store called "my-store" -stores.my-store.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory +stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory # Use the Kafka topic "my-store-changelog" as the changelog stream for this store. # This enables automatic recovery of the store after a failure. If you don't @@ -182,11 +182,19 @@ public interface KeyValueStore<K, V> { } {% endhighlight %} -Additional configuration properties for the key-value store are documented in the [configuration reference](../jobs/configuration-table.html#keyvalue). +Additional configuration properties for the key-value store are documented in the [configuration reference](../jobs/configuration-table.html#keyvalue-rocksdb). + +#### Known Issues + +RocksDB has several rough edges. It's recommended that you read the RocksDB [tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide). Some other notes to be aware of are: + +1. RocksDB is heavily optimized to run with SSD hard disks. Performance on non-SSDs degrades significantly. +2. Samza's KeyValueStorageEngine.putAll() method does not currently use RocksDB's batching-put API because it's [non-functional in Java](https://github.com/facebook/rocksdb/issues/262). +3. Calling iterator.seekToFirst() is very slow [if there are a lot of deletes in the store](https://github.com/facebook/rocksdb/issues/261). ### Implementing common use cases with the key-value store -Earlier in this section we discussed some example use cases for stateful stream processing. Let's look at how each of these could be implemented using a key-value storage engine such as Samza's LevelDB. +Earlier in this section we discussed some example use cases for stateful stream processing. Let's look at how each of these could be implemented using a key-value storage engine such as Samza's RocksDB store. #### Windowed aggregation http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e63cfd39/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 3875145..e3251a6 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -844,13 +844,18 @@ <a href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.config.Config, org.apache.samza.task.TaskContext)">init()</a> method). The value of this property is the fully-qualified name of a Java class that implements <a href="../api/javadocs/org/apache/samza/storage/StorageEngineFactory.html">StorageEngineFactory</a>. - Samza currently ships with one storage engine implementation: + Samza currently ships with two storage engine implementations: <dl> - <dt><code>org.apache.samza.storage.kv.KeyValueStorageEngineFactory</code></dt> + <dt><code>org.apache.samza.storage.kv.LevelDbKeyValueStorageEngineFactory</code></dt> <dd>An on-disk storage engine with a key-value interface, implemented using <a href="https://code.google.com/p/leveldb/">LevelDB</a>. It supports fast random-access reads and writes, as well as range queries on keys. LevelDB can be configured with - various <a href="#keyvalue">additional tuning parameters</a>.</dd> + various <a href="#keyvalue-leveldb">additional tuning parameters</a>.</dd> + <dt><code>org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory</code></dt> + <dd>An on-disk storage engine with a key-value interface, implemented using + <a href="http://rocksdb.org/">RocksDB</a>. It supports fast random-access + reads and writes, as well as range queries on keys. RocksDB can be configured with + various <a href="#keyvalue-rocksdb">additional tuning parameters</a>.</dd> </dl> </td> </tr> @@ -900,18 +905,143 @@ </tr> <tr> - <th colspan="3" class="section" id="keyvalue"> + <th colspan="3" class="section" id="keyvalue-rocksdb"> + Using RocksDB for key-value storage<br> + <span class="subtitle"> + (This section applies if you have set + <a href="#stores-factory" class="property">stores.*.factory</a> + <code>= org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory</code>) + </span> + </th> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-write-batch-size">stores.<span class="store">store-name</span>.<br>write.batch.size</td> + <td class="default">500</td> + <td class="description"> + For better write performance, the storage engine buffers writes and applies them + to the underlying store in a batch. If the same key is written multiple times + in quick succession, this buffer also deduplicates writes to the same key. This + property is set to the number of key/value pairs that should be kept in this + in-memory buffer, per task instance. The number cannot be greater than + <a href="#stores-rocksdb-object-cache-size" class="property">stores.*.object.cache.size</a>. + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-object-cache-size">stores.<span class="store">store-name</span>.<br>object.cache.size</td> + <td class="default">1000</td> + <td class="description"> + Samza maintains an additional cache in front of RocksDB for frequently-accessed + objects. This cache contains deserialized objects (avoiding the deserialization + overhead on cache hits), in contrast to the RocksDB block cache + (<a href="#stores-rocksdb-container-cache-size-bytes" class="property">stores.*.container.cache.size.bytes</a>), + which caches serialized objects. This property determines the number of objects + to keep in Samza's cache, per task instance. This same cache is also used for + write buffering (see <a href="#stores-rocksdb-write-batch-size" class="property">stores.*.write.batch.size</a>). + A value of 0 disables all caching and batching. + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-container-cache-size-bytes">stores.<span class="store">store-name</span>.container.<br>cache.size.bytes</td> + <td class="default">104857600</td> + <td class="description"> + The size of RocksDB's block cache in bytes, per container. If there are several + task instances within one container, each is given a proportional share of this cache. + Note that this is an off-heap memory allocation, so the container's total memory use + is the maximum JVM heap size <em>plus</em> the size of this cache. + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-container-write-buffer-size-bytes">stores.<span class="store">store-name</span>.container.<br>write.buffer.size.bytes</td> + <td class="default">33554432</td> + <td class="description"> + The amount of memory (in bytes) that RocksDB uses for buffering writes before they are + written to disk, per container. If there are several task instances within one + container, each is given a proportional share of this buffer. This setting also + determines the size of RocksDB's segment files. + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-compression">stores.<span class="store">store-name</span>.<br>rocksdb.compression</td> + <td class="default">snappy</td> + <td class="description"> + This property controls whether RocksDB should compress data on disk and in the + block cache. The following values are valid: + <dl> + <dt><code>snappy</code></dt> + <dd>Compress data using the <a href="https://code.google.com/p/snappy/">Snappy</a> codec.</dd> + <dt><code>bzip2</code></dt> + <dd>Compress data using the <a href="http://en.wikipedia.org/wiki/Bzip2">bzip2</a> codec.</dd> + <dt><code>zlib</code></dt> + <dd>Compress data using the <a href="http://en.wikipedia.org/wiki/Zlib">zlib</a> codec.</dd> + <dt><code>lz4</code></dt> + <dd>Compress data using the <a href="https://code.google.com/p/lz4/">lz4</a> codec.</dd> + <dt><code>lz4hc</code></dt> + <dd>Compress data using the <a href="https://code.google.com/p/lz4/">lz4hc</a> (high compression) codec.</dd> + <dt><code>none</code></dt> + <dd>Do not compress data.</dd> + </dl> + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-block-size-bytes">stores.<span class="store">store-name</span>.<br>rocksdb.block.size.bytes</td> + <td class="default">4096</td> + <td class="description"> + If compression is enabled, RocksDB groups approximately this many uncompressed bytes + into one compressed block. You probably don't need to change this property. + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-bloomfilter-bits">stores.<span class="store">store-name</span>.<br>rocksdb.bloomfilter.bits</td> + <td class="default">10</td> + <td class="description"> + In RocksDB, every SST file <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter">contains a Bloom filter</a>, which is used to determine if the file may contain a given key. Setting the bloom filter bit size allows developers to make the trade-off between the accuracy of the bloom filter, and its memory usage. + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-compaction-style">stores.<span class="store">store-name</span>.<br>rocksdb.compaction.style</td> + <td class="default">universal</td> + <td class="description"> + This property controls the compaction style that RocksDB will employ when compacting its levels. The following values are valid: + <dl> + <dt><code>universal</code></dt> + <dd>Use <a href="https://github.com/facebook/rocksdb/wiki/Universal-Compaction">universal</a> compaction.</dd> + <dt><code>fifo</code></dt> + <dd>Use <a href="https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style">FIFO</a> compaction.</dd> + <dt><code>level</code></dt> + <dd>Use LevelDB's standard leveled compaction.</dd> + </dl> + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-num-write-buffers">stores.<span class="store">store-name</span>.<br>rocksdb.num.write.buffers</td> + <td class="default">3</td> + <td class="description"> + Configures the <a href="https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer">number of write buffers</a> that a RocksDB store uses. This allows RocksDB to continue taking writes to other buffers even while a given write buffer is being flushed to disk. + </td> + </tr> + + <tr> + <th colspan="3" class="section" id="keyvalue-leveldb"> Using LevelDB for key-value storage<br> <span class="subtitle"> (This section applies if you have set <a href="#stores-factory" class="property">stores.*.factory</a> - <code>= org.apache.samza.storage.kv.KeyValueStorageEngineFactory</code>) + <code>= org.apache.samza.storage.kv.LevelDbKeyValueStorageEngineFactory</code>) </span> </th> </tr> <tr> - <td class="property" id="stores-write-batch-size">stores.<span class="store">store-name</span>.<br>write.batch.size</td> + <td class="property" id="stores-leveldb-write-batch-size">stores.<span class="store">store-name</span>.<br>write.batch.size</td> <td class="default">500</td> <td class="description"> For better write performance, the storage engine buffers writes and applies them @@ -919,27 +1049,27 @@ in quick succession, this buffer also deduplicates writes to the same key. This property is set to the number of key/value pairs that should be kept in this in-memory buffer, per task instance. The number cannot be greater than - <a href="#stores-object-cache-size" class="property">stores.*.object.cache.size</a>. + <a href="#stores-leveldb-object-cache-size" class="property">stores.*.object.cache.size</a>. </td> </tr> <tr> - <td class="property" id="stores-object-cache-size">stores.<span class="store">store-name</span>.<br>object.cache.size</td> + <td class="property" id="stores-leveldb-object-cache-size">stores.<span class="store">store-name</span>.<br>object.cache.size</td> <td class="default">1000</td> <td class="description"> Samza maintains an additional cache in front of LevelDB for frequently-accessed objects. This cache contains deserialized objects (avoiding the deserialization overhead on cache hits), in contrast to the LevelDB block cache - (<a href="#stores-container-cache-size-bytes" class="property">stores.*.container.cache.size.bytes</a>), + (<a href="#stores-leveldb-container-cache-size-bytes" class="property">stores.*.container.cache.size.bytes</a>), which caches serialized objects. This property determines the number of objects to keep in Samza's cache, per task instance. This same cache is also used for - write buffering (see <a href="#stores-write-batch-size" class="property">stores.*.write.batch.size</a>). + write buffering (see <a href="#stores-leveldb-write-batch-size" class="property">stores.*.write.batch.size</a>). A value of 0 disables all caching and batching. </td> </tr> <tr> - <td class="property" id="stores-container-cache-size-bytes">stores.<span class="store">store-name</span>.container.<br>cache.size.bytes</td> + <td class="property" id="stores-leveldb-container-cache-size-bytes">stores.<span class="store">store-name</span>.container.<br>cache.size.bytes</td> <td class="default">104857600</td> <td class="description"> The size of LevelDB's block cache in bytes, per container. If there are several @@ -950,7 +1080,7 @@ </tr> <tr> - <td class="property" id="stores-container-write-buffer-size-bytes">stores.<span class="store">store-name</span>.container.<br>write.buffer.size.bytes</td> + <td class="property" id="stores-leveldb-container-write-buffer-size-bytes">stores.<span class="store">store-name</span>.container.<br>write.buffer.size.bytes</td> <td class="default">33554432</td> <td class="description"> The amount of memory (in bytes) that LevelDB uses for buffering writes before they are @@ -961,7 +1091,7 @@ </tr> <tr> - <td class="property" id="stores-compaction-delete-threshold">stores.<span class="store">store-name</span>.<br>compaction.delete.threshold</td> + <td class="property" id="stores-leveldb-compaction-delete-threshold">stores.<span class="store">store-name</span>.<br>compaction.delete.threshold</td> <td class="default">-1</td> <td class="description"> Setting this property forces a LevelDB compaction to be performed after a certain @@ -1041,7 +1171,7 @@ memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with <a href="#task-opts" class="property">task.opts</a>), plus the size of any off-heap memory allocation (for example - <a href="#stores-container-cache-size-bytes" class="property">stores.*.container.cache.size.bytes</a>), + <a href="#stores-rocksdb-container-cache-size-bytes" class="property">stores.*.container.cache.size.bytes</a>), plus a safety margin to allow for JVM overheads. </td> </tr> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e63cfd39/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala index a52731b..14eeba5 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.samza.storage +package org.apache.samza.storage.kv import java.io.File import org.apache.samza.container.SamzaContainerContext
