[ 
https://issues.apache.org/jira/browse/FLINK-38567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan resolved FLINK-38567.
---------------------------------
    Fix Version/s: 2.2.0
       Resolution: Fixed

> ForSt state backend error with GROUP BY and async state execution
> -----------------------------------------------------------------
>
>                 Key: FLINK-38567
>                 URL: https://issues.apache.org/jira/browse/FLINK-38567
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Mohsen Rezaei
>            Assignee: Han Yin
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 2.2.0
>
>
> While testing the disaggregate state backend with ForSt DB I ran into the 
> following exception:
> {code}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:359)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:280)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
>       at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> AsyncKeyedProcessOperator_dc09592d5320e3ce8d45932dc9094772_(1/1) from any of 
> the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:165)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:486)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>       ... 12 common frames omitted
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
>       at 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:319)
>       at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:474)
>       at 
> org.apache.flink.state.forst.ForStStateBackend.createAsyncKeyedStateBackend(ForStStateBackend.java:98)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:475)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       ... 14 common frames omitted
> Caused by: java.io.IOException: Error while opening ForSt instance.
>       at 
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:97)
>       at 
> org.apache.flink.state.forst.restore.ForStHandle.loadDb(ForStHandle.java:140)
>       at 
> org.apache.flink.state.forst.restore.ForStHandle.openDB(ForStHandle.java:118)
>       at 
> org.apache.flink.state.forst.restore.ForStNoneRestoreOperation.restore(ForStNoneRestoreOperation.java:64)
>       at 
> org.apache.flink.state.forst.ForStKeyedStateBackendBuilder.build(ForStKeyedStateBackendBuilder.java:261)
>       ... 19 common frames omitted
> Caused by: org.forstdb.RocksDBException: NotFound
>       at org.forstdb.RocksDB.open(Native Method)
>       at org.forstdb.RocksDB.open(RocksDB.java:318)
>       at 
> org.apache.flink.state.forst.ForStOperationUtils.openDB(ForStOperationUtils.java:85)
>       ... 23 common frames omitted
> {code}
> The Flink SQL query I used is a simple {{GROUP BY}} operation:
> {code}
> SELECT COUNT(page) as page_count, userid FROM user_audit GROUP BY userid;
> {code}
> This is the configuration used to reproduce this:
> {code}
> execution.state-recovery.from-local, false
> jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout, 30s
> taskmanager.memory.jvm-metaspace.size, 200m
> kubernetes.cluster-id, q-64dfbe96-e7e2-4991-8b95-a5906fd9b3c7
> high-availability.storageDir, s3p://...
> execution.checkpointing.savepoint-dir, s3p://...
> metrics.internal.query-service.port, 6127
> kubernetes.namespace, q-64dfbe96-e7e2-4991-8b95-a5906fd9b3c7
> state.backend.forst.writebuffer.size, 4mb
> metrics.reporters, prom
> state.backend.forst.metrics.estimate-num-keys, true
> state.backend.type, forst
> table.exec.async-state.enabled, true
> execution.checkpointing.mode, EXACTLY_ONCE
> metrics.reporter.prom.port, 9999
> taskmanager.memory.process.size, 1024m
> execution.checkpointing.incremental, true
> taskmanager.memory.network.fraction, 0.08
> jobmanager.rpc.port, 6123
> web.upload.dir, /var/run/flink-app-jar
> execution.checkpointing.interval, 30000
> execution.checkpointing.timeout, 60000
> rest.port, 8081
> s3.upload.max.concurrent.uploads, 10
> restart-strategy.fixed-delay.delay, 3s
> state.backend.forst.compaction.level.max-size-level-base, 8mb
> taskmanager.memory.managed.fraction, 0.35
> blob.server.port, 6124
> state.backend.forst.compaction.level.target-file-size-base, 4mb
> metrics.system-resource, true
> high-availability.type, KUBERNETES
> state.backend.forst.local-dir, /var/run/flink-forst-local
> taskmanager.cpu.cores, 1
> metrics.reporter.prom.factory.class, 
> org.apache.flink.metrics.prometheus.PrometheusReporterFactory
> state.backend.forst.writebuffer.count, 16
> query.server.port, 6125
> taskmanager.memory.jvm-overhead.min, 128m
> s3.endpoint, s3.us-west-2.amazonaws.com
> state.backend.forst.metrics.compaction-pending, true
> taskmanager.data.port, 6121
> taskmanager.numberOfTaskSlots, 1
> execution.checkpointing.dir, s3p://...
> taskmanager.memory.network.min, 0m
> restart-strategy.fixed-delay.attempts, 10
> jobmanager.memory.process.size, 1024m
> taskmanager.rpc.port, 6122
> table.exec.mini-batch.enabled, false
> jobmanager.scheduler, Adaptive
> high-availability, 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> table.optimizer.agg-phase-strategy, ONE_PHASE
> restart-strategy, fixed-delay
> execution.checkpointing.snapshot-compression, true
> state.backend.forst.metrics.num-running-compactions, true
> s3.path.style.access, true
> {code}
> Based on 
> [documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/disaggregated_state/#for-sql-jobs],
>  a non-distinct aggregation should already be supported for asynchronous 
> state access, and it seems like I've ran into a bug with the async 
> disaggregated state. I haven't tested any of the other operators, e.g. 
> windowing, but I confirmed that a simple projection works just fine with the 
> same configuration.
> If the {{table.exec.async-state.enabled}} config is set to {{false}}, the 
> jobs are able to make progress.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to