[
https://issues.apache.org/jira/browse/FLINK-38415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan updated FLINK-38415:
----------------------------
Description:
h1. 1. Exception:
Following is the exception stack:
{code:java}
java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
at
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:374)
at java.base/java.util.ArrayList.remove(ArrayList.java:536)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithCopyFromTemporaryInstance(RocksDBIncrementalRestoreOperation.java:691)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithClipAndIngest(RocksDBIncrementalRestoreOperation.java:531)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromMultipleStateHandles(RocksDBIncrementalRestoreOperation.java:477)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:358)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:1058)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:396)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:550)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:397)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:403)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:292)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:829)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:781)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:781)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:734)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1057)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1016)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:840)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
h1. 2. Direct Root Cause: mergeStateHandlesWithClipAndIngest Logic Failure
The `mergeStateHandlesWithClipAndIngest` method distributes state handles into
two lists:
1. exportedColumnFamilyMetaData: Contains SST files that can be directly
imported
2. notImportableHandles: Contains state handles that need to be copied via
temporary DB
{*}Critical Assumption{*}: If `localKeyedStateHandles` is not empty, then at
least one of the two resulting lists should contain data.
h1. 3. Why Both Lists Become Empty (Abnormal Case)
Under normal circumstances, both lists should never be empty because:
1. notImportableHandles gets populated when:
- State handle data exceeds the proclaimed key-group range
- Range check fails for any reason
- Export process encounters errors
2. exportedColumnFamilyMetaData gets populated when:
- State handle data is within the expected range
- Export process successfully creates SST files
*Why both lists become empty in the bug scenario?*
# Auto-compaction during restore: When RocksDB opens temporary instances for
each state handle, auto-compaction is triggered
# Compaction removes SST files: If all records in SST files are expired or
marked for deletion, compaction removes the files entirely
# Export process finds no files:
`RocksDBIncrementalCheckpointUtils.exportColumnFamilies()` attempts to export
but finds no SST files
# Both lists remain empty: Neither `exportedColumnFamilyMetaData` nor
`notImportableHandles` gets populated
h1.
4. How to 100% Trigger the Bug
The bug requires these specific conditions:
# Enable ingest-db-restore-mode:
`state.backend.rocksdb.use-ingest-db-restore-mode=true`
# All records are deleted or expired by TTL
# Scale operation: Triggers restore with multiple state handles
# Auto-compaction during restore**: Removes expired/deleted SST files
# 4th checkpoint: Creates 4 SST files in level0 to trigger compaction during
restore
Note: adding a slight sleep(such as 500ms) between open rocksdb db and export
sst files during recovery is helpful for reproduce bug.
h2. Trigger Sequence:
# Checkpoint 1-4: Create SST files in L0
# Scale operation: Triggers restore with `mergeStateHandlesWithClipAndIngest`
# Auto-compaction: Removes SST files during temporary DB restore
# Empty lists: Both `exportedColumnFamilyMetaData` and `notImportableHandles`
are empty
#
Fallback:`mergeStateHandlesWithCopyFromTemporaryInstance(notImportableHandles,
...)` called with empty list
# IndexOutOfBoundsException: `findTheBestStateHandleForInitial` returns -1 for
empty list
h1. 5. Approach Analysis
Proposed approach: Disable Auto-compaction
h2. Advantages of this approach:
* Preserves SST file structure during restore
* Prevents unexpected data removal during critical operations
h2. Alternative Solution: Handle Empty Lists
Another approach would be to modify `mergeStateHandlesWithClipAndIngest` to
handle the case where both lists are empty:
{code:java}
if (exportedColumnFamilyMetaData.isEmpty() && notImportableHandles.isEmpty()) {
// Both lists empty - create empty DB or handle appropriately
return; // or create empty base DB
} else if (exportedColumnFamilyMetaData.isEmpty()) {
// Fall back to copy method
mergeStateHandlesWithCopyFromTemporaryInstance(
notImportableHandles, startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);
}{code}
*Why this alternative was not chosen:*
- **Potential risks**: Could mask underlying data corruption issues
- **Data integrity concerns**: Empty lists might indicate genuine restore
problems
The chosen solution (disabling auto-compaction) is safer because it addresses
the root cause while maintaining the existing validation logic and error
detection mechanisms.
was:
Following is the exception stack:
{code:java}
java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
at
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:374)
at java.base/java.util.ArrayList.remove(ArrayList.java:536)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithCopyFromTemporaryInstance(RocksDBIncrementalRestoreOperation.java:691)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithClipAndIngest(RocksDBIncrementalRestoreOperation.java:531)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromMultipleStateHandles(RocksDBIncrementalRestoreOperation.java:477)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:358)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:1058)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:396)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:550)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:397)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:403)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:292)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:114)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:829)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:781)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:781)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:734)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1057)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1016)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:840)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
> IndexOutOfBoundsException occasionally occurs after
> rocksdb.use-ingest-db-restore-mode is enabled
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-38415
> URL: https://issues.apache.org/jira/browse/FLINK-38415
> Project: Flink
> Issue Type: Bug
> Affects Versions: 2.0.0, 2.1.0, 1.20.3
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
>
> h1. 1. Exception:
> Following is the exception stack:
> {code:java}
> java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 0
> at
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
> at
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
> at
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
> at java.base/java.util.Objects.checkIndex(Objects.java:374)
> at java.base/java.util.ArrayList.remove(ArrayList.java:536)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithCopyFromTemporaryInstance(RocksDBIncrementalRestoreOperation.java:691)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.mergeStateHandlesWithClipAndIngest(RocksDBIncrementalRestoreOperation.java:531)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromMultipleStateHandles(RocksDBIncrementalRestoreOperation.java:477)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:358)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$restore$1(RocksDBIncrementalRestoreOperation.java:270)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:1058)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:269)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:396)
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:550)
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:397)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:403)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:292)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:114)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:829)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:781)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:781)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:734)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1057)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1016)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:840)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
> at java.base/java.lang.Thread.run(Thread.java:829) {code}
>
> h1. 2. Direct Root Cause: mergeStateHandlesWithClipAndIngest Logic Failure
> The `mergeStateHandlesWithClipAndIngest` method distributes state handles
> into two lists:
> 1. exportedColumnFamilyMetaData: Contains SST files that can be directly
> imported
> 2. notImportableHandles: Contains state handles that need to be copied via
> temporary DB
> {*}Critical Assumption{*}: If `localKeyedStateHandles` is not empty, then at
> least one of the two resulting lists should contain data.
>
> h1. 3. Why Both Lists Become Empty (Abnormal Case)
> Under normal circumstances, both lists should never be empty because:
> 1. notImportableHandles gets populated when:
> - State handle data exceeds the proclaimed key-group range
> - Range check fails for any reason
> - Export process encounters errors
> 2. exportedColumnFamilyMetaData gets populated when:
> - State handle data is within the expected range
> - Export process successfully creates SST files
> *Why both lists become empty in the bug scenario?*
> # Auto-compaction during restore: When RocksDB opens temporary instances for
> each state handle, auto-compaction is triggered
> # Compaction removes SST files: If all records in SST files are expired or
> marked for deletion, compaction removes the files entirely
> # Export process finds no files:
> `RocksDBIncrementalCheckpointUtils.exportColumnFamilies()` attempts to export
> but finds no SST files
> # Both lists remain empty: Neither `exportedColumnFamilyMetaData` nor
> `notImportableHandles` gets populated
> h1.
> 4. How to 100% Trigger the Bug
> The bug requires these specific conditions:
> # Enable ingest-db-restore-mode:
> `state.backend.rocksdb.use-ingest-db-restore-mode=true`
> # All records are deleted or expired by TTL
> # Scale operation: Triggers restore with multiple state handles
> # Auto-compaction during restore**: Removes expired/deleted SST files
> # 4th checkpoint: Creates 4 SST files in level0 to trigger compaction during
> restore
> Note: adding a slight sleep(such as 500ms) between open rocksdb db and export
> sst files during recovery is helpful for reproduce bug.
> h2. Trigger Sequence:
> # Checkpoint 1-4: Create SST files in L0
> # Scale operation: Triggers restore with `mergeStateHandlesWithClipAndIngest`
> # Auto-compaction: Removes SST files during temporary DB restore
> # Empty lists: Both `exportedColumnFamilyMetaData` and
> `notImportableHandles` are empty
> #
> Fallback:`mergeStateHandlesWithCopyFromTemporaryInstance(notImportableHandles,
> ...)` called with empty list
> # IndexOutOfBoundsException: `findTheBestStateHandleForInitial` returns -1
> for empty list
> h1. 5. Approach Analysis
> Proposed approach: Disable Auto-compaction
> h2. Advantages of this approach:
> * Preserves SST file structure during restore
> * Prevents unexpected data removal during critical operations
> h2. Alternative Solution: Handle Empty Lists
> Another approach would be to modify `mergeStateHandlesWithClipAndIngest` to
> handle the case where both lists are empty:
> {code:java}
> if (exportedColumnFamilyMetaData.isEmpty() && notImportableHandles.isEmpty())
> {
> // Both lists empty - create empty DB or handle appropriately
> return; // or create empty base DB
> } else if (exportedColumnFamilyMetaData.isEmpty()) {
> // Fall back to copy method
> mergeStateHandlesWithCopyFromTemporaryInstance(
> notImportableHandles, startKeyGroupPrefixBytes,
> stopKeyGroupPrefixBytes);
> }{code}
> *Why this alternative was not chosen:*
> - **Potential risks**: Could mask underlying data corruption issues
> - **Data integrity concerns**: Empty lists might indicate genuine restore
> problems
> The chosen solution (disabling auto-compaction) is safer because it addresses
> the root cause while maintaining the existing validation logic and error
> detection mechanisms.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)