[ 
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Carlot updated KAFKA-9880:
----------------------------------
    Description: 
 

When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
        at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 [kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
        at org.rocksdb.RocksDB.compactRange(Native Method) 
~[kafka-stream-router.jar:?]
        at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
 ~[kafka-stream-router.jar:?]
        ... 11 more
{code}
 

 

Compaction is configured through an implementation of RocksDBConfigSetter. The 
exception si gone as soon as I remove:

 
{code:java}
CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); 
fifoOptions.setAllowCompaction(true); 
options.setCompactionOptionsFIFO(fifoOptions); 
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
 

 

Bulk loading works fine when the store is non-existent / empty. This occurs 
only when there are a minimum amount of data in it. I guess it happens when the 
amount SST layers is increased.

I'm currently using a forked version of Kafka 2.4.1 customizing the 
RocksDBStore class with this modification as a work around:

 

I'm not very proud of this workaround, but it suits my use cases well.

 
{code:java}
// code placeholder
public void toggleDbForBulkLoading() {public void toggleDbForBulkLoading() { 
try { db.compactRange(columnFamily, true, 1, 0); } catch (final 
RocksDBException e) { try { if 
(columnFamily.getDescriptor().getOptions().compactionStyle() != 
CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range 
compacting during restoring  store " + name, e); } else { log.warn("Compaction 
of store " + name + " for bulk loading failed. Will continue without compacted 
store, which will be slower.", e); } } catch (RocksDBException e1) { throw new 
ProcessorStateException("Error while range compacting during restoring  store " 
+ name, e); } } }
{code}







 

 

 

  was:
When restoring a non empty RocksDB state store, if it is customized to use 
FIFOCompaction, the following exception is thrown:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
compacting during restoring  store merge_store
        at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 ~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
 [kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
        at org.rocksdb.RocksDB.compactRange(Native Method) 
~[kafka-stream-router.jar:?]
        at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
~[kafka-stream-router.jar:?]
        at 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
 ~[kafka-stream-router.jar:?]
        ... 11 more
{code}
 

 

Compaction is configured through an implementation of RocksDBConfigSetter. The 
exception si gone as soon as I remove:

 
{code:java}
CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new 
CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); 
fifoOptions.setAllowCompaction(true); 
options.setCompactionOptionsFIFO(fifoOptions); 
options.setCompactionStyle(CompactionStyle.FIFO);
{code}
 

 

Bulk loading works fine when the store is non-existent / empty. This occurs 
only when there are a minimum amount of data in it. I guess it happens when the 
amount SST layers is increased.

I'm currently using a forked version of Kafka 2.4.1 customizing the 
RocksDBStore class with this modification as a work around:
{code:java}
// code placeholder
public void toggleDbForBulkLoading() {public void toggleDbForBulkLoading() { 
try { db.compactRange(columnFamily, true, 1, 0); } catch (final 
RocksDBException e) { try { if 
(columnFamily.getDescriptor().getOptions().compactionStyle() != 
CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range 
compacting during restoring  store " + name, e); } else { log.warn("Compaction 
of store " + name + " for bulk loading failed. Will continue without compacted 
store, which will be slower.", e); } } catch (RocksDBException e1) { throw new 
ProcessorStateException("Error while range compacting during restoring  store " 
+ name, e); } } }
{code}
 

I'm not very proud of this workaround, but it suits my use cases well.
{code:java}
 {code}
 

 


> Error while range compacting during bulk loading of FIFO compacted RocksDB 
> Store
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-9880
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9880
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1
>            Reporter: Nicolas Carlot
>            Priority: Major
>
>  
> When restoring a non empty RocksDB state store, if it is customized to use 
> FIFOCompaction, the following exception is thrown:
>  
> {code:java}
> org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
> compacting during restoring  store merge_store
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>  ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>  [kafka-stream-router.jar:?]
> Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
>         at org.rocksdb.RocksDB.compactRange(Native Method) 
> ~[kafka-stream-router.jar:?]
>         at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
> ~[kafka-stream-router.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
>  ~[kafka-stream-router.jar:?]
>         ... 11 more
> {code}
>  
>  
> Compaction is configured through an implementation of RocksDBConfigSetter. 
> The exception si gone as soon as I remove:
>  
> {code:java}
> CompactionOptionsFIFO fifoOptions = new 
> CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new 
> CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); 
> fifoOptions.setAllowCompaction(true); 
> options.setCompactionOptionsFIFO(fifoOptions); 
> options.setCompactionStyle(CompactionStyle.FIFO);
> {code}
>  
>  
> Bulk loading works fine when the store is non-existent / empty. This occurs 
> only when there are a minimum amount of data in it. I guess it happens when 
> the amount SST layers is increased.
> I'm currently using a forked version of Kafka 2.4.1 customizing the 
> RocksDBStore class with this modification as a work around:
>  
> I'm not very proud of this workaround, but it suits my use cases well.
>  
> {code:java}
> // code placeholder
> public void toggleDbForBulkLoading() {public void toggleDbForBulkLoading() { 
> try { db.compactRange(columnFamily, true, 1, 0); } catch (final 
> RocksDBException e) { try { if 
> (columnFamily.getDescriptor().getOptions().compactionStyle() != 
> CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range 
> compacting during restoring  store " + name, e); } else { 
> log.warn("Compaction of store " + name + " for bulk loading failed. Will 
> continue without compacted store, which will be slower.", e); } } catch 
> (RocksDBException e1) { throw new ProcessorStateException("Error while range 
> compacting during restoring  store " + name, e); } } }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to