Hey Yu,

1. Memory and other configuration

There's not much configuration going on, it's all in the Java class WordCount. 
Specifically, memory-related there's this one:

rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);

I quickly tried that commenting out that line doesn't seem to change anything.


2. I'm not sure what would / should I look for.

For 'taskmanager.memory.managed.fraction' I tried

configuration.setDouble("taskmanager.memory.managed.fraction", 0.8);

But using debugger, I don't see that variable being used. Maybe it's not used 
in StreamExecutionEnvironment.createLocalEnvironment?


3. There are no timers, so I don't setting this parameter matters. Anyways, I 
tried this:

        configuration.setString(RocksDBOptions.TIMER_SERVICE_FACTORY, "HEAP");
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM, configuration);

No changes in the performance (tried with parallelism 5 and without managed 
memory).


Regards,
Juha


________________________________
From: Yu Li <car...@gmail.com>
Sent: Thursday, June 25, 2020 12:20 PM
To: Andrey Zagrebin <azagre...@apache.org>
Cc: Juha Mynttinen <juha.myntti...@king.com>; Yun Tang <myas...@live.com>; user 
<user@flink.apache.org>
Subject: Re: Performance issue associated with managed RocksDB memory

Thanks for the ping Andrey.

Hi Juha,

Thanks for reporting the issue. I'd like to check the below things before 
further digging into it:

1. Could you let us know your configurations (especially memory related ones) 
when running the tests?

2. Did you watch the memory consumption before / after turning 
`state.backend.rocksdb.memory.managed` off? If not, could you check it out and 
let us know the result?
    2.1 Furthermore, if the memory consumption is much higher when turning 
managed memory off, could you try tuning up the managed memory fraction 
accordingly through `taskmanager.memory.managed.fraction` [1] and check the 
result?

3. With `state.backend.rocksdb.memory.managed` on and nothing else changed, 
could you try to set `state.backend.rocksdb.timer-service.factory` to `HEAP` 
and check out the result? (side note: starting from 1.10.0 release timers are 
stored in RocksDB by default when using RocksDBStateBackend [2])

What's more, you may find these documents [3] [4] useful for memory tunings of 
RocksDB backend.

Thanks.

Best Regards,
Yu

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-managed-fraction
 
[ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_config.html-23taskmanager-2Dmemory-2Dmanaged-2Dfraction&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=nWe2oPjCOeQgnztiDmXO2zE-8n3GoWKCMtDwsoammZ4&e=>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#state
 
[ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_release-2Dnotes_flink-2D1.10.html-23state&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=vBem5cU31p97UhrmB0aDezh-6qJu3uHXu-HXLtWAb04&e=>
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#tuning-rocksdb-memory
 
[ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_state_large-5Fstate-5Ftuning.html-23tuning-2Drocksdb-2Dmemory&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=2kTtbYqWKZXmZjr-cqdgMFUmSD2jSmAZ_mWmYX7QVLA&e=>
[4] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#memory-management
 
[ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_state_state-5Fbackends.html-23memory-2Dmanagement&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=sNygzPzz1UEAGL8rSHJMA002su5zHtwCj-b3antyhoY&e=>


On Thu, 25 Jun 2020 at 15:37, Andrey Zagrebin 
<azagre...@apache.org<mailto:azagre...@apache.org>> wrote:
Hi Juha,

Thanks for sharing the testing program to expose the problem.
This indeed looks suboptimal if X does not leave space for the window operator.
I am adding Yu and Yun who might have a better idea about what could be 
improved about sharing the RocksDB memory among operators.

Best,
Andrey

On Thu, Jun 25, 2020 at 9:10 AM Juha Mynttinen 
<juha.myntti...@king.com<mailto:juha.myntti...@king.com>> wrote:
Hey,

Here's a simple test. It's basically the WordCount example from Flink, but
using RocksDB as the state backend and having a stateful operator. The
javadocs explain how to use it.


/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0 
[apache.org]<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.apache.org_licenses_LICENSE-2D2.0&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=CF2iW5teWqX33CzhRXJ9b8OTwWDFLWfTTWSiWqtoaio&e=>
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.examples.wordcount;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.nio.file.Files;
import java.nio.file.Path;

/**
 * Works fast in the following cases.
 * <ul>
 *     <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li>
 *     <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link
#PARALLELISM} is 1 to 4.</li>
 * </ul>
 * <p>
 * Some results:
 * </p>
 * <ul>
 *     <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li>
 *     <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li>
 *     <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li>
 *     <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li>
 *     <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li>
 *     <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li>
 * </ul>
 * <p>
 */
public class WordCount {
    /**
     * The parallelism of the job.
     */
    private static final int PARALLELISM = 5;

    /**
     * Whether to use managed memory. True, no changes in the config.
     * False, managed memory is disabled.
     */
    private static final boolean USE_MANAGED_MEMORY = true;

    /**
     * The source synthesizes this many events.
     */
    public static final int EVENT_COUNT = 1_000_000;

    /**
     * The value of each event is {@code EVENT_COUNT % MAX_VALUE}.
     * Essentially controls the count of unique keys.
     */
    public static final int MAX_VALUE = 1_000;


    //
*************************************************************************
    // PROGRAM
    //
*************************************************************************

    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);

        // set up the execution environment
        Configuration configuration = new Configuration();
        if (!USE_MANAGED_MEMORY) {
            configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY,
USE_MANAGED_MEMORY);
        }
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM,
configuration);

        Path tempDirPath = Files.createTempDirectory("example");
        String checkpointDataUri = "file://%22 + tempDirPath.toString();

        RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend(checkpointDataUri, true);

rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
        env.setStateBackend((StateBackend) rocksDBStateBackend);

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<Long> text = env.addSource(new ExampleCountSource());

        text.keyBy(v -> v)
                .flatMap(new ValuesCounter())
                .addSink(new DiscardingSink<>());

        long before = System.currentTimeMillis();
        env.execute("Streaming WordCount");
        long duration = System.currentTimeMillis() - before;

        System.out.println("Done " + duration + " ms, parallelism " +
PARALLELISM);
    }

    //
*************************************************************************
    // USER FUNCTIONS
    //
*************************************************************************

    private static class ValuesCounter extends RichFlatMapFunction<Long,
Tuple2&lt;Long, Long>> {
        private ValueState<Long> state;

        @Override
        public void flatMap(Long value, Collector<Tuple2&lt;Long, Long>>
out) throws Exception {
            Long oldCount = state.value();
            if (oldCount == null) {
                oldCount = 0L;
            }
            long newCount = oldCount + 1;
            state.update(newCount);

            out.collect(Tuple2.of(value, newCount));
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            ValueStateDescriptor<Long> descriptor = new
ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO);
            state = getRuntimeContext().getState(descriptor);
        }
    }

    public static class ExampleCountSource implements SourceFunction<Long>,
CheckpointedFunction {
        private long count = 0L;
        private volatile boolean isRunning = true;

        private transient ListState<Long> checkpointedCount;

        public void run(SourceContext<Long> ctx) {
            while (isRunning && count < EVENT_COUNT) {
                // this synchronized block ensures that state checkpointing,
                // internal state updates and emission of elements are an
atomic operation
                synchronized (ctx.getCheckpointLock()) {
                    ctx.collect(count % MAX_VALUE);
                    count++;
                }
            }
        }

        public void cancel() {
            isRunning = false;
        }

        public void initializeState(FunctionInitializationContext context)
throws Exception {
            this.checkpointedCount = context
                    .getOperatorStateStore()
                    .getListState(new ListStateDescriptor<>("count",
Long.class));

            if (context.isRestored()) {
                for (Long count : this.checkpointedCount.get()) {
                    this.count = count;
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws
Exception {
            this.checkpointedCount.clear();
            this.checkpointedCount.add(count);
        }
    }
}


Regards,
Juha



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
[apache-flink-user-mailing-list-archive.2336050.n4.nabble.com]<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=IAemjhbmFdh9Wqn9tixCSS_w5wJ0HoRyyF9Hl05vTm4&e=>

Reply via email to