[
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nicolas Carlot updated KAFKA-9880:
----------------------------------
Description:
When restoring a non empty RocksDB state store, if it is customized to use
FIFOCompaction, the following exception is thrown:
{code:java}
//
org.apache.kafka.streams.errors.ProcessorStateException: Error while range
compacting during restoring store merge_store
at
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
[kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
at org.rocksdb.RocksDB.compactRange(Native Method)
~[kafka-stream-router.jar:?]
at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
~[kafka-stream-router.jar:?]
... 11 more
{code}
Compaction is configured through an implementation of RocksDBConfigSetter. The
exception si gone as soon as I remove:
{code:java}
//
CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
fifoOptions.setMaxTableFilesSize(maxSize);
fifoOptions.setAllowCompaction(true);
options.setCompactionOptionsFIFO(fifoOptions);
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
Bulk loading works fine when the store is non-existent / empty. This occurs
only when there are a minimum amount of data in it. I guess it happens when the
amount SST layers is increased.
I'm currently using a forked version of Kafka 2.4.1 customizing the
RocksDBStore class with this modification as a work around:
{code:java}
//
public void toggleDbForBulkLoading() {
try {
db.compactRange(columnFamily, true, 1, 0);
} catch (final RocksDBException e) {
try {
if (columnFamily.getDescriptor().getOptions().compactionStyle() !=
CompactionStyle.FIFO) {
throw new ProcessorStateException("Error while range compacting
during restoring store " + name, e);
}
else {
log.warn("Compaction of store " + name + " for bulk loading failed.
Will continue without compacted store, which will be slower.", e);
}
} catch (RocksDBException e1) {
throw new ProcessorStateException("Error while range compacting
during restoring store " + name, e);
}
}
}
{code}
I'm not very proud of this workaround, but it suits my use cases well.
was:
When restoring a non empty RocksDB state store, if it is customized to use
FIFOCompaction, the following exception is thrown:
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range
compacting during restoring store merge_store
at
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
[kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
at org.rocksdb.RocksDB.compactRange(Native Method)
~[kafka-stream-router.jar:?]
at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636)
~[kafka-stream-router.jar:?]
at
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
~[kafka-stream-router.jar:?]
... 11 more
{code}
Compaction is configured through an implementation of RocksDBConfigSetter. The
exception si gone as soon as I remove:
{code:java}
//
CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
fifoOptions.setMaxTableFilesSize(maxSize);
fifoOptions.setAllowCompaction(true);
options.setCompactionOptionsFIFO(fifoOptions);
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
Bulk loading works fine when the store is non-existent / empty. This occurs
only when there are a minimum amount of data in it. I guess it happens when the
amount SST layers is increased.
I'm currently using a forked version of Kafka 2.4.1 customizing the
RocksDBStore class with this modification as a work around:
{code:java}
//
public void toggleDbForBulkLoading() {
try {
db.compactRange(columnFamily, true, 1, 0);
} catch (final RocksDBException e) {
try {
if (columnFamily.getDescriptor().getOptions().compactionStyle() !=
CompactionStyle.FIFO) {
throw new ProcessorStateException("Error while range compacting
during restoring store " + name, e);
}
else {
log.warn("Compaction of store " + name + " for bulk loading failed.
Will continue without compacted store, which will be slower.", e);
}
} catch (RocksDBException e1) {
throw new ProcessorStateException("Error while range compacting
during restoring store " + name, e);
}
}
}
{code}
I'm not very proud of this workaround, but it suits my use cases well.
> Error while range compacting during bulk loading of FIFO compacted RocksDB
> Store
> --------------------------------------------------------------------------------
>
> Key: KAFKA-9880
> URL: https://issues.apache.org/jira/browse/KAFKA-9880
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.1
> Reporter: Nicolas Carlot
> Priority: Major
>
>
> When restoring a non empty RocksDB state store, if it is customized to use
> FIFOCompaction, the following exception is thrown:
>
> {code:java}
> //
> org.apache.kafka.streams.errors.ProcessorStateException: Error while range
> compacting during restoring store merge_store
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> [kafka-stream-router.jar:?]
> Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
> at org.rocksdb.RocksDB.compactRange(Native Method)
> ~[kafka-stream-router.jar:?]
> at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636)
> ~[kafka-stream-router.jar:?]
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
> ~[kafka-stream-router.jar:?]
> ... 11 more
> {code}
>
>
> Compaction is configured through an implementation of RocksDBConfigSetter.
> The exception si gone as soon as I remove:
> {code:java}
> //
> CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
> fifoOptions.setMaxTableFilesSize(maxSize);
> fifoOptions.setAllowCompaction(true);
> options.setCompactionOptionsFIFO(fifoOptions);
> options.setCompactionStyle(CompactionStyle.FIFO);
> {code}
>
>
> Bulk loading works fine when the store is non-existent / empty. This occurs
> only when there are a minimum amount of data in it. I guess it happens when
> the amount SST layers is increased.
> I'm currently using a forked version of Kafka 2.4.1 customizing the
> RocksDBStore class with this modification as a work around:
>
> {code:java}
> //
> public void toggleDbForBulkLoading() {
> try {
> db.compactRange(columnFamily, true, 1, 0);
> } catch (final RocksDBException e) {
> try {
> if (columnFamily.getDescriptor().getOptions().compactionStyle() !=
> CompactionStyle.FIFO) {
> throw new ProcessorStateException("Error while range compacting
> during restoring store " + name, e);
> }
> else {
> log.warn("Compaction of store " + name + " for bulk loading
> failed. Will continue without compacted store, which will be slower.", e);
> }
> } catch (RocksDBException e1) {
> throw new ProcessorStateException("Error while range compacting
> during restoring store " + name, e);
> }
> }
> }
> {code}
>
> I'm not very proud of this workaround, but it suits my use cases well.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)