[ 
https://issues.apache.org/jira/browse/FLINK-38415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-38415:
----------------------------
    Description: 
h1. 1. Exception:

Following is the exception stack:
{code:java}
java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
        at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
        at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
        at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
        at java.base/java.util.Objects.checkIndex(Objects.java:374)
        at java.base/java.util.ArrayList.remove(ArrayList.java:536)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithCopyFromTemporaryInstance(RocksDBIncrementalRestoreOperation.java:691)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithClipAndIngest(RocksDBIncrementalRestoreOperation.java:531)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromMultipleStateHandles(RocksDBIncrementalRestoreOperation.java:477)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:358)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:1058)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:396)
        at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:550)
        at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:397)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:403)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:292)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:114)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:829)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:781)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:781)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:734)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1057)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1016)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:840)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
        at java.base/java.lang.Thread.run(Thread.java:829) {code}
 
h1. 2. Direct Root Cause: mergeStateHandlesWithClipAndIngest Logic Failure

The `mergeStateHandlesWithClipAndIngest` method distributes state handles into 
two lists:

1.  exportedColumnFamilyMetaData: Contains SST files that can be directly 
imported
2. notImportableHandles: Contains state handles that need to be copied via 
temporary DB

{*}Critical Assumption{*}: If `localKeyedStateHandles` is not empty, then at 
least one of the two resulting lists should contain data.

 
h1. 3. Why Both Lists Become Empty (Abnormal Case)

Under normal circumstances, both lists should never be empty because:

1. notImportableHandles gets populated when:
   - State handle data exceeds the proclaimed key-group range
   - Range check fails for any reason
   - Export process encounters errors

2.  exportedColumnFamilyMetaData gets populated when:
   - State handle data is within the expected range
   - Export process successfully creates SST files

*Why both lists become empty in the bug scenario?*
 # Auto-compaction during restore: When RocksDB opens temporary instances for 
each state handle, auto-compaction is triggered
 # Compaction removes SST files: If all records in SST files are expired or 
marked for deletion, compaction removes the files entirely
 # Export process finds no files: 
`RocksDBIncrementalCheckpointUtils.exportColumnFamilies()` attempts to export 
but finds no SST files
 # Both lists remain empty: Neither `exportedColumnFamilyMetaData` nor 
`notImportableHandles` gets populated

h1. 
4. How to 100% Trigger the Bug

The bug requires these specific conditions:
 # Enable ingest-db-restore-mode: 
`state.backend.rocksdb.use-ingest-db-restore-mode=true`
 # All records are deleted or expired by TTL
 # Scale operation: Triggers restore with multiple state handles
 # Auto-compaction during restore**: Removes expired/deleted SST files
 # 4th checkpoint: Creates 4 SST files in level0 to trigger compaction during 
restore

Note: adding a slight sleep(such as 500ms) between open rocksdb db and export 
sst files during recovery is helpful for reproduce bug.
h2. Trigger Sequence:
 # Checkpoint 1-4: Create SST files in L0
 # Scale operation: Triggers restore with `mergeStateHandlesWithClipAndIngest`
 # Auto-compaction: Removes SST files during temporary DB restore
 # Empty lists: Both `exportedColumnFamilyMetaData` and `notImportableHandles` 
are empty
 # 
Fallback:`mergeStateHandlesWithCopyFromTemporaryInstance(notImportableHandles, 
...)` called with empty list
 # IndexOutOfBoundsException: `findTheBestStateHandleForInitial` returns -1 for 
empty list

h1. 5. Approach Analysis

Proposed approach: Disable Auto-compaction
h2. Advantages of this approach:
 * Preserves SST file structure during restore
 * Prevents unexpected data removal during critical operations

h2. Alternative Solution: Handle Empty Lists

Another approach would be to modify `mergeStateHandlesWithClipAndIngest` to 
handle the case where both lists are empty:
{code:java}
if (exportedColumnFamilyMetaData.isEmpty() && notImportableHandles.isEmpty()) {
    // Both lists empty - create empty DB or handle appropriately
    return; // or create empty base DB
} else if (exportedColumnFamilyMetaData.isEmpty()) {
    // Fall back to copy method
    mergeStateHandlesWithCopyFromTemporaryInstance(
        notImportableHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
}{code}
*Why this alternative was not chosen:*
- **Potential risks**: Could mask underlying data corruption issues
- **Data integrity concerns**: Empty lists might indicate genuine restore 
problems

The chosen solution (disabling auto-compaction) is safer because it addresses 
the root cause while maintaining the existing validation logic and error 
detection mechanisms.

  was:
Following is the exception stack:
{code:java}
java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
        at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
        at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
        at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
        at java.base/java.util.Objects.checkIndex(Objects.java:374)
        at java.base/java.util.ArrayList.remove(ArrayList.java:536)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithCopyFromTemporaryInstance(RocksDBIncrementalRestoreOperation.java:691)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithClipAndIngest(RocksDBIncrementalRestoreOperation.java:531)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromMultipleStateHandles(RocksDBIncrementalRestoreOperation.java:477)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:358)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:1058)
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:396)
        at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:550)
        at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:397)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:403)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:292)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:114)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:829)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:781)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:781)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:734)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1057)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1016)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:840)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
        at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

 


> IndexOutOfBoundsException occasionally occurs after 
> rocksdb.use-ingest-db-restore-mode is enabled
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38415
>                 URL: https://issues.apache.org/jira/browse/FLINK-38415
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 2.1.0, 1.20.3
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>
> h1. 1. Exception:
> Following is the exception stack:
> {code:java}
> java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
>       at 
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>       at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>       at 
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>       at java.base/java.util.Objects.checkIndex(Objects.java:374)
>       at java.base/java.util.ArrayList.remove(ArrayList.java:536)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithCopyFromTemporaryInstance(RocksDBIncrementalRestoreOperation.java:691)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithClipAndIngest(RocksDBIncrementalRestoreOperation.java:531)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromMultipleStateHandles(RocksDBIncrementalRestoreOperation.java:477)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:358)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:1058)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:396)
>       at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:550)
>       at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:397)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:403)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:292)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:114)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:829)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:781)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:781)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:734)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1057)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1016)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:840)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>       at java.base/java.lang.Thread.run(Thread.java:829) {code}
>  
> h1. 2. Direct Root Cause: mergeStateHandlesWithClipAndIngest Logic Failure
> The `mergeStateHandlesWithClipAndIngest` method distributes state handles 
> into two lists:
> 1.  exportedColumnFamilyMetaData: Contains SST files that can be directly 
> imported
> 2. notImportableHandles: Contains state handles that need to be copied via 
> temporary DB
> {*}Critical Assumption{*}: If `localKeyedStateHandles` is not empty, then at 
> least one of the two resulting lists should contain data.
>  
> h1. 3. Why Both Lists Become Empty (Abnormal Case)
> Under normal circumstances, both lists should never be empty because:
> 1. notImportableHandles gets populated when:
>    - State handle data exceeds the proclaimed key-group range
>    - Range check fails for any reason
>    - Export process encounters errors
> 2.  exportedColumnFamilyMetaData gets populated when:
>    - State handle data is within the expected range
>    - Export process successfully creates SST files
> *Why both lists become empty in the bug scenario?*
>  # Auto-compaction during restore: When RocksDB opens temporary instances for 
> each state handle, auto-compaction is triggered
>  # Compaction removes SST files: If all records in SST files are expired or 
> marked for deletion, compaction removes the files entirely
>  # Export process finds no files: 
> `RocksDBIncrementalCheckpointUtils.exportColumnFamilies()` attempts to export 
> but finds no SST files
>  # Both lists remain empty: Neither `exportedColumnFamilyMetaData` nor 
> `notImportableHandles` gets populated
> h1. 
> 4. How to 100% Trigger the Bug
> The bug requires these specific conditions:
>  # Enable ingest-db-restore-mode: 
> `state.backend.rocksdb.use-ingest-db-restore-mode=true`
>  # All records are deleted or expired by TTL
>  # Scale operation: Triggers restore with multiple state handles
>  # Auto-compaction during restore**: Removes expired/deleted SST files
>  # 4th checkpoint: Creates 4 SST files in level0 to trigger compaction during 
> restore
> Note: adding a slight sleep(such as 500ms) between open rocksdb db and export 
> sst files during recovery is helpful for reproduce bug.
> h2. Trigger Sequence:
>  # Checkpoint 1-4: Create SST files in L0
>  # Scale operation: Triggers restore with `mergeStateHandlesWithClipAndIngest`
>  # Auto-compaction: Removes SST files during temporary DB restore
>  # Empty lists: Both `exportedColumnFamilyMetaData` and 
> `notImportableHandles` are empty
>  # 
> Fallback:`mergeStateHandlesWithCopyFromTemporaryInstance(notImportableHandles,
>  ...)` called with empty list
>  # IndexOutOfBoundsException: `findTheBestStateHandleForInitial` returns -1 
> for empty list
> h1. 5. Approach Analysis
> Proposed approach: Disable Auto-compaction
> h2. Advantages of this approach:
>  * Preserves SST file structure during restore
>  * Prevents unexpected data removal during critical operations
> h2. Alternative Solution: Handle Empty Lists
> Another approach would be to modify `mergeStateHandlesWithClipAndIngest` to 
> handle the case where both lists are empty:
> {code:java}
> if (exportedColumnFamilyMetaData.isEmpty() && notImportableHandles.isEmpty()) 
> {
>     // Both lists empty - create empty DB or handle appropriately
>     return; // or create empty base DB
> } else if (exportedColumnFamilyMetaData.isEmpty()) {
>     // Fall back to copy method
>     mergeStateHandlesWithCopyFromTemporaryInstance(
>         notImportableHandles, startKeyGroupPrefixBytes, 
> stopKeyGroupPrefixBytes);
> }{code}
> *Why this alternative was not chosen:*
> - **Potential risks**: Could mask underlying data corruption issues
> - **Data integrity concerns**: Empty lists might indicate genuine restore 
> problems
> The chosen solution (disabling auto-compaction) is safer because it addresses 
> the root cause while maintaining the existing validation logic and error 
> detection mechanisms.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to