Thanks for the ping Andrey.

Hi Juha,

Thanks for reporting the issue. I'd like to check the below things before
further digging into it:

1. Could you let us know your configurations (especially memory related
ones) when running the tests?

2. Did you watch the memory consumption before / after turning
`state.backend.rocksdb.memory.managed` off? If not, could you check it out
and let us know the result?
    2.1 Furthermore, if the memory consumption is much higher when turning
managed memory off, could you try tuning up the managed memory fraction
accordingly through `taskmanager.memory.managed.fraction` [1] and check the

3. With `state.backend.rocksdb.memory.managed` on and nothing else changed,
could you try to set `state.backend.rocksdb.timer-service.factory` to
`HEAP` and check out the result? (side note: starting from 1.10.0 release
timers are stored in RocksDB by default when using RocksDBStateBackend [2])

What's more, you may find these documents [3] [4] useful for memory tunings
of RocksDB backend.


Best Regards,


On Thu, 25 Jun 2020 at 15:37, Andrey Zagrebin <> wrote:

> Hi Juha,
> Thanks for sharing the testing program to expose the problem.
> This indeed looks suboptimal if X does not leave space for the window
> operator.
> I am adding Yu and Yun who might have a better idea about what could be
> improved about sharing the RocksDB memory among operators.
> Best,
> Andrey
> On Thu, Jun 25, 2020 at 9:10 AM Juha Mynttinen <>
> wrote:
>> Hey,
>> Here's a simple test. It's basically the WordCount example from Flink, but
>> using RocksDB as the state backend and having a stateful operator. The
>> javadocs explain how to use it.
>> /*
>>  * Licensed to the Apache Software Foundation (ASF) under one or more
>>  * contributor license agreements.  See the NOTICE file distributed with
>>  * this work for additional information regarding copyright ownership.
>>  * The ASF licenses this file to You under the Apache License, Version 2.0
>>  * (the "License"); you may not use this file except in compliance with
>>  * the License.  You may obtain a copy of the License at
>>  *
>>  *
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>> implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>> package org.apache.flink.streaming.examples.wordcount;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> import org.apache.flink.api.common.state.ListState;
>> import org.apache.flink.api.common.state.ListStateDescriptor;
>> import org.apache.flink.api.common.state.ValueState;
>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>> import;
>> import;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.contrib.streaming.state.PredefinedOptions;
>> import org.apache.flink.contrib.streaming.state.RocksDBOptions;
>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>> import org.apache.flink.runtime.state.StateBackend;
>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>> import org.apache.flink.util.Collector;
>> import java.nio.file.Files;
>> import java.nio.file.Path;
>> /**
>>  * Works fast in the following cases.
>>  * <ul>
>>  *     <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li>
>>  *     <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link
>> #PARALLELISM} is 1 to 4.</li>
>>  * </ul>
>>  * <p>
>>  * Some results:
>>  * </p>
>>  * <ul>
>>  *     <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li>
>>  *     <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li>
>>  *     <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li>
>>  *     <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li>
>>  *     <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li>
>>  *     <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li>
>>  * </ul>
>>  * <p>
>>  */
>> public class WordCount {
>>     /**
>>      * The parallelism of the job.
>>      */
>>     private static final int PARALLELISM = 5;
>>     /**
>>      * Whether to use managed memory. True, no changes in the config.
>>      * False, managed memory is disabled.
>>      */
>>     private static final boolean USE_MANAGED_MEMORY = true;
>>     /**
>>      * The source synthesizes this many events.
>>      */
>>     public static final int EVENT_COUNT = 1_000_000;
>>     /**
>>      * The value of each event is {@code EVENT_COUNT % MAX_VALUE}.
>>      * Essentially controls the count of unique keys.
>>      */
>>     public static final int MAX_VALUE = 1_000;
>>     //
>> *************************************************************************
>>     // PROGRAM
>>     //
>> *************************************************************************
>>     public static void main(String[] args) throws Exception {
>>         // Checking input parameters
>>         final MultipleParameterTool params =
>> MultipleParameterTool.fromArgs(args);
>>         // set up the execution environment
>>         Configuration configuration = new Configuration();
>>         if (!USE_MANAGED_MEMORY) {
>>             configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY,
>>         }
>>         final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM,
>> configuration);
>>         Path tempDirPath = Files.createTempDirectory("example");
>>         String checkpointDataUri = "file://" + tempDirPath.toString();
>>         RocksDBStateBackend rocksDBStateBackend = new
>> RocksDBStateBackend(checkpointDataUri, true);
>> rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
>>         env.setStateBackend((StateBackend) rocksDBStateBackend);
>>         // make parameters available in the web interface
>>         env.getConfig().setGlobalJobParameters(params);
>>         // get input data
>>         DataStream<Long> text = env.addSource(new ExampleCountSource());
>>         text.keyBy(v -> v)
>>                 .flatMap(new ValuesCounter())
>>                 .addSink(new DiscardingSink<>());
>>         long before = System.currentTimeMillis();
>>         env.execute("Streaming WordCount");
>>         long duration = System.currentTimeMillis() - before;
>>         System.out.println("Done " + duration + " ms, parallelism " +
>>     }
>>     //
>> *************************************************************************
>>     //
>> *************************************************************************
>>     private static class ValuesCounter extends RichFlatMapFunction<Long,
>> Tuple2&lt;Long, Long>> {
>>         private ValueState<Long> state;
>>         @Override
>>         public void flatMap(Long value, Collector<Tuple2&lt;Long, Long>>
>> out) throws Exception {
>>             Long oldCount = state.value();
>>             if (oldCount == null) {
>>                 oldCount = 0L;
>>             }
>>             long newCount = oldCount + 1;
>>             state.update(newCount);
>>             out.collect(Tuple2.of(value, newCount));
>>         }
>>         @Override
>>         public void open(Configuration parameters) throws Exception {
>>   ;
>>             ValueStateDescriptor<Long> descriptor = new
>> ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO);
>>             state = getRuntimeContext().getState(descriptor);
>>         }
>>     }
>>     public static class ExampleCountSource implements
>> SourceFunction<Long>,
>> CheckpointedFunction {
>>         private long count = 0L;
>>         private volatile boolean isRunning = true;
>>         private transient ListState<Long> checkpointedCount;
>>         public void run(SourceContext<Long> ctx) {
>>             while (isRunning && count < EVENT_COUNT) {
>>                 // this synchronized block ensures that state
>> checkpointing,
>>                 // internal state updates and emission of elements are an
>> atomic operation
>>                 synchronized (ctx.getCheckpointLock()) {
>>                     ctx.collect(count % MAX_VALUE);
>>                     count++;
>>                 }
>>             }
>>         }
>>         public void cancel() {
>>             isRunning = false;
>>         }
>>         public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>>             this.checkpointedCount = context
>>                     .getOperatorStateStore()
>>                     .getListState(new ListStateDescriptor<>("count",
>> Long.class));
>>             if (context.isRestored()) {
>>                 for (Long count : this.checkpointedCount.get()) {
>>                     this.count = count;
>>                 }
>>             }
>>         }
>>         public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {
>>             this.checkpointedCount.clear();
>>             this.checkpointedCount.add(count);
>>         }
>>     }
>> }
>> Regards,
>> Juha
>> --
>> Sent from:

Reply via email to