[ 
https://issues.apache.org/jira/browse/FLINK-38415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-38415.
-----------------------------
    Fix Version/s: 2.0.1
                   2.2.0
                   2.1.1
       Resolution: Fixed

> IndexOutOfBoundsException occasionally occurs after 
> rocksdb.use-ingest-db-restore-mode is enabled
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38415
>                 URL: https://issues.apache.org/jira/browse/FLINK-38415
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 2.1.0, 1.20.3
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.0.1, 2.2.0, 2.1.1
>
>
> h1. 1. Exception:
> Following is the exception stack:
> {code:java}
> java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
>       at 
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>       at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>       at 
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>       at java.base/java.util.Objects.checkIndex(Objects.java:374)
>       at java.base/java.util.ArrayList.remove(ArrayList.java:536)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithCopyFromTemporaryInstance(RocksDBIncrementalRestoreOperation.java:691)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithClipAndIngest(RocksDBIncrementalRestoreOperation.java:531)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromMultipleStateHandles(RocksDBIncrementalRestoreOperation.java:477)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:358)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:1058)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:396)
>       at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:550)
>       at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:397)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:403)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:292)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:114)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:829)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:781)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:781)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:734)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1057)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1016)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:840)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>       at java.base/java.lang.Thread.run(Thread.java:829) {code}
>  
> h1. 2. Direct Root Cause: mergeStateHandlesWithClipAndIngest Logic Failure
> The `mergeStateHandlesWithClipAndIngest` method distributes state handles 
> into two lists:
> 1.  exportedColumnFamilyMetaData: Contains SST files that can be directly 
> imported
> 2. notImportableHandles: Contains state handles that need to be copied via 
> temporary DB
> {*}Critical Assumption{*}: If `localKeyedStateHandles` is not empty, then at 
> least one of the two resulting lists should contain data.
>  
> h1. 3. Why Both Lists Become Empty (Abnormal Case)
> Under normal circumstances, both lists should never be empty because:
> 1. notImportableHandles gets populated when:
>    - State handle data exceeds the proclaimed key-group range
>    - Range check fails for any reason
>    - Export process encounters errors
> 2.  exportedColumnFamilyMetaData gets populated when:
>    - State handle data is within the expected range
>    - Export process successfully creates SST files
> *Why both lists become empty in the bug scenario?*
>  # Auto-compaction during restore: When RocksDB opens temporary instances for 
> each state handle, auto-compaction is triggered
>  # Compaction removes SST files: If all records in SST files are expired or 
> marked for deletion, compaction removes the files entirely
>  # Export process finds no files: 
> `RocksDBIncrementalCheckpointUtils.exportColumnFamilies()` attempts to export 
> but finds no SST files
>  # Both lists remain empty: Neither `exportedColumnFamilyMetaData` nor 
> `notImportableHandles` gets populated
> h1. 
> 4. How to 100% Trigger the Bug
> The bug requires these specific conditions:
>  # Enable ingest-db-restore-mode: 
> `state.backend.rocksdb.use-ingest-db-restore-mode=true`
>  # All records are deleted or expired by TTL
>  # Scale operation: Triggers restore with multiple state handles
>  # Auto-compaction during restore**: Removes expired/deleted SST files
>  # 4th checkpoint: Creates 4 SST files in level0 to trigger compaction during 
> restore
> Note: adding a slight sleep(such as 500ms) between open rocksdb db and export 
> sst files during recovery is helpful for reproduce bug.
> h2. Trigger Sequence:
>  # Checkpoint 1-4: Create SST files in L0
>  # Scale operation: Triggers restore with `mergeStateHandlesWithClipAndIngest`
>  # Auto-compaction: Removes SST files during temporary DB restore
>  # Empty lists: Both `exportedColumnFamilyMetaData` and 
> `notImportableHandles` are empty
>  # 
> Fallback:`mergeStateHandlesWithCopyFromTemporaryInstance(notImportableHandles,
>  ...)` called with empty list
>  # IndexOutOfBoundsException: `findTheBestStateHandleForInitial` returns -1 
> for empty list
> h1. 5. Approach Analysis
> Proposed approach: Disable Auto-compaction
> h2. Advantages of this approach:
>  * Preserves SST file structure during restore
>  * Prevents unexpected data removal during critical operations
> h2. Alternative Solution: Handle Empty Lists
> Another approach would be to modify `mergeStateHandlesWithClipAndIngest` to 
> handle the case where both lists are empty:
> {code:java}
> if (exportedColumnFamilyMetaData.isEmpty() && notImportableHandles.isEmpty()) 
> {
>     // Both lists empty - create empty DB or handle appropriately
>     return; // or create empty base DB
> } else if (exportedColumnFamilyMetaData.isEmpty()) {
>     // Fall back to copy method
>     mergeStateHandlesWithCopyFromTemporaryInstance(
>         notImportableHandles, startKeyGroupPrefixBytes, 
> stopKeyGroupPrefixBytes);
> }{code}
> *Why this alternative was not chosen:*
> - **Potential risks**: Could mask underlying data corruption issues
> - **Data integrity concerns**: Empty lists might indicate genuine restore 
> problems
> The chosen solution (disabling auto-compaction) is safer because it addresses 
> the root cause while maintaining the existing validation logic and error 
> detection mechanisms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to