[
https://issues.apache.org/jira/browse/FLINK-38415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan resolved FLINK-38415.
-----------------------------
Fix Version/s: 2.0.1
2.2.0
2.1.1
Resolution: Fixed
> IndexOutOfBoundsException occasionally occurs after
> rocksdb.use-ingest-db-restore-mode is enabled
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-38415
> URL: https://issues.apache.org/jira/browse/FLINK-38415
> Project: Flink
> Issue Type: Bug
> Affects Versions: 2.0.0, 2.1.0, 1.20.3
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0.1, 2.2.0, 2.1.1
>
>
> h1. 1. Exception:
> Following is the exception stack:
> {code:java}
> java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
> at
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
> at
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
> at
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
> at java.base/java.util.Objects.checkIndex(Objects.java:374)
> at java.base/java.util.ArrayList.remove(ArrayList.java:536)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithCopyFromTemporaryInstance(RocksDBIncrementalRestoreOperation.java:691)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithClipAndIngest(RocksDBIncrementalRestoreOperation.java:531)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromMultipleStateHandles(RocksDBIncrementalRestoreOperation.java:477)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:358)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:1058)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:396)
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:550)
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:397)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:403)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:292)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:114)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:829)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:781)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:781)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:734)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1057)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1016)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:840)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
> at java.base/java.lang.Thread.run(Thread.java:829) {code}
>
> h1. 2. Direct Root Cause: mergeStateHandlesWithClipAndIngest Logic Failure
> The `mergeStateHandlesWithClipAndIngest` method distributes state handles
> into two lists:
> 1. exportedColumnFamilyMetaData: Contains SST files that can be directly
> imported
> 2. notImportableHandles: Contains state handles that need to be copied via
> temporary DB
> {*}Critical Assumption{*}: If `localKeyedStateHandles` is not empty, then at
> least one of the two resulting lists should contain data.
>
> h1. 3. Why Both Lists Become Empty (Abnormal Case)
> Under normal circumstances, both lists should never be empty because:
> 1. notImportableHandles gets populated when:
> - State handle data exceeds the proclaimed key-group range
> - Range check fails for any reason
> - Export process encounters errors
> 2. exportedColumnFamilyMetaData gets populated when:
> - State handle data is within the expected range
> - Export process successfully creates SST files
> *Why both lists become empty in the bug scenario?*
> # Auto-compaction during restore: When RocksDB opens temporary instances for
> each state handle, auto-compaction is triggered
> # Compaction removes SST files: If all records in SST files are expired or
> marked for deletion, compaction removes the files entirely
> # Export process finds no files:
> `RocksDBIncrementalCheckpointUtils.exportColumnFamilies()` attempts to export
> but finds no SST files
> # Both lists remain empty: Neither `exportedColumnFamilyMetaData` nor
> `notImportableHandles` gets populated
> h1.
> 4. How to 100% Trigger the Bug
> The bug requires these specific conditions:
> # Enable ingest-db-restore-mode:
> `state.backend.rocksdb.use-ingest-db-restore-mode=true`
> # All records are deleted or expired by TTL
> # Scale operation: Triggers restore with multiple state handles
> # Auto-compaction during restore**: Removes expired/deleted SST files
> # 4th checkpoint: Creates 4 SST files in level0 to trigger compaction during
> restore
> Note: adding a slight sleep(such as 500ms) between open rocksdb db and export
> sst files during recovery is helpful for reproduce bug.
> h2. Trigger Sequence:
> # Checkpoint 1-4: Create SST files in L0
> # Scale operation: Triggers restore with `mergeStateHandlesWithClipAndIngest`
> # Auto-compaction: Removes SST files during temporary DB restore
> # Empty lists: Both `exportedColumnFamilyMetaData` and
> `notImportableHandles` are empty
> #
> Fallback:`mergeStateHandlesWithCopyFromTemporaryInstance(notImportableHandles,
> ...)` called with empty list
> # IndexOutOfBoundsException: `findTheBestStateHandleForInitial` returns -1
> for empty list
> h1. 5. Approach Analysis
> Proposed approach: Disable Auto-compaction
> h2. Advantages of this approach:
> * Preserves SST file structure during restore
> * Prevents unexpected data removal during critical operations
> h2. Alternative Solution: Handle Empty Lists
> Another approach would be to modify `mergeStateHandlesWithClipAndIngest` to
> handle the case where both lists are empty:
> {code:java}
> if (exportedColumnFamilyMetaData.isEmpty() && notImportableHandles.isEmpty())
> {
> // Both lists empty - create empty DB or handle appropriately
> return; // or create empty base DB
> } else if (exportedColumnFamilyMetaData.isEmpty()) {
> // Fall back to copy method
> mergeStateHandlesWithCopyFromTemporaryInstance(
> notImportableHandles, startKeyGroupPrefixBytes,
> stopKeyGroupPrefixBytes);
> }{code}
> *Why this alternative was not chosen:*
> - **Potential risks**: Could mask underlying data corruption issues
> - **Data integrity concerns**: Empty lists might indicate genuine restore
> problems
> The chosen solution (disabling auto-compaction) is safer because it addresses
> the root cause while maintaining the existing validation logic and error
> detection mechanisms.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)