[ 
https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-20646:
---------------------------------
    Release Note:   (was: Fixed via:
* master (1.13): cf71008984dd68b25db3612a9ca39f197b7e09c8
* release-1.12: 4d1c9927f77b11f6990e447856fac6627a46bdcf)

> ReduceTransformation does not work with RocksDBStateBackend
> -----------------------------------------------------------
>
>                 Key: FLINK-20646
>                 URL: https://issues.apache.org/jira/browse/FLINK-20646
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.12.0
>            Reporter: Xintong Song
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to 
> properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed 
> stream (with non-nullĀ {{KeySelector}}) to callĀ 
> {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly 
> introduced {{ReduceTransformation}} did not.
> As a result, Flink will not reserve managed memory for operators converted 
> from {{ReduceTransformation}} (FLINK-19931), leading to the following failure 
> when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Exception while restoring keyed state backend for 
> StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
> alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>       at java.lang.Thread.run(Thread.java:832) [?:?]
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to 
> allocate should not be 0. Please make sure that all types of managed memory 
> consumers contained in the job are configured with a non-negative weight via 
> `taskmanager.memory.managed.consumer-weights`.
>       at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:164) 
> ~[flink-core-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:631)
>  ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:612)
>  ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:499)
>  ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:260)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>       ... 16 more
> {code}
> The problem is reported on the user-zh mailing list. (In Chinese though.)
> http://apache-flink.147419.n8.nabble.com/flink-1-12-RocksDBStateBackend-td9504.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to