[
https://issues.apache.org/jira/browse/FLINK-38558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zakelly Lan closed FLINK-38558.
-------------------------------
Fix Version/s: 2.2.0
Resolution: Fixed
> Checkpoint rescale and restore fails with RocksDB Heap timer
> ------------------------------------------------------------
>
> Key: FLINK-38558
> URL: https://issues.apache.org/jira/browse/FLINK-38558
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.20.3
> Reporter: Yangze Guo
> Priority: Critical
> Labels: pull-request-available
> Fix For: 2.2.0
>
>
> In Flink 1.20, when using the RocksDB state backend with timer service
> configured as Heap (state.backend.rocksdb.timer-service.factory: Heap),
> rescaling a job from a checkpoint fails during restore, throwing the
> following exception:
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:332)
> ...
> Caused by: java.lang.IllegalArgumentException: KeyGroupRange
> {startKeyGroup=0, endKeyGroup=31}
> does not contain key group 32
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:160)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:149)
> ...
> And restoring from a savepoint works fine with the exact same configuration.
> Root Cause Analysis:
> 1. When state.backend.rocksdb.timer-service.factory=heap, timers in
> checkpoints are snapshotted into RawKeyedState. During restore, these timers
> are added to HeapPriorityQueueSet, which validates against the current key
> group range. However, timers are not pruned during rescaling, leading to a
> mismatch (e.g., a timer belonging to key group 32 when the current range is
> 0-31), triggering the IllegalArgumentException.
> 2. The reason savepoints work is due to FLINK-21344, which stops serializing
> heap timers in the RocksDB state backend during savepoint creation. This
> avoids the key group validation issue during savepoint restore.
> We could use the folloing code to reproduce:
> 1. Firstly, waiting the following job (parallelism = 2) complete checkpoint
> Configuration config = new Configuration();
> config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
> config.set(AUTO_WATERMARK_INTERVAL, Duration.ofMillis(200));
> config.set(
> CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxxx");
> config.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMillis(10_000));
> config.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
> RETAIN_ON_CANCELLATION);
> config.setString("state.backend.rocksdb.timer-service.factory", "HEAP");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(config);
> env.disableOperatorChaining();
> env.setParallelism(2);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> DataStreamSource<Row> source =
> env.addSource(
> new SourceFunction<Row>() {
> private boolean running = true;
> private int ts = 1000;
> private Random rnd = new Random();
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> while (running)
> { Thread.sleep(1); ts += rnd.nextInt(1); ctx.collect( Row.ofKind(
> RowKind.INSERT, Instant.ofEpochMilli(ts), "USD", rnd.nextInt(300)));
> ctx.collect( Row.ofKind( RowKind.INSERT, Instant.ofEpochMilli(ts), "CHD",
> rnd.nextInt(200))); ctx.collect( Row.ofKind( RowKind.INSERT,
> Instant.ofEpochMilli(ts), "BST", rnd.nextInt(200))); }
> }
> @Override
> public void cancel()
> { running = false; }
> },
> TypeExtractor.getForObject(
> Row.ofKind(RowKind.INSERT, Instant.ofEpochMilli(0), "USD", 1)));
> // Create a table from change log stream
> Table rateTable =
> tEnv.fromDataStream(
> source,
> Schema.newBuilder()
> .column("f0", DataTypes.TIMESTAMP_LTZ(3))
> .column("f1", DataTypes.STRING().notNull())
> .column("f2", DataTypes.INT().notNull())
> .watermark("f0", "f0 - INTERVAL '2' SECONDS")
> .primaryKey("f1")
> .build())
> .as("ts", "product", "amount");
> // Register the table as a view, it will be accessible under a name
> tEnv.createTemporaryView("source", rateTable);
> String query =
> "SELECT\n"
> + " CAST(window_start AS STRING) as window_start,\n"
> + " CAST(window_end AS STRING) as window_end,\n"
> + " sum(amount) as pv,\n"
> + " count(1) AS uv,\n"
> + " `product`\n"
> + "FROM\n"
> + " TABLE(\n"
> + " CUMULATE(\n"
> + " TABLE source,\n"
> + " DESCRIPTOR(ts),\n"
> + " INTERVAL '10' MINUTES,\n"
> + " INTERVAL '60' MINUTES\n"
> + " )\n"
> + " )\n"
> + "GROUP BY\n"
> + " window_start,\n"
> + " window_end,\n"
> + " product;";
> tEnv.executeSql(query);
> 2. Secondly, restore from cp with new parallelism (p = 4)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)