Hi Juha,

Thanks for sharing the testing program to expose the problem.
This indeed looks suboptimal if X does not leave space for the window
operator.
I am adding Yu and Yun who might have a better idea about what could be
improved about sharing the RocksDB memory among operators.

Best,
Andrey

On Thu, Jun 25, 2020 at 9:10 AM Juha Mynttinen <juha.myntti...@king.com>
wrote:

> Hey,
>
> Here's a simple test. It's basically the WordCount example from Flink, but
> using RocksDB as the state backend and having a stateful operator. The
> javadocs explain how to use it.
>
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *    http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package org.apache.flink.streaming.examples.wordcount;
>
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.state.ListState;
> import org.apache.flink.api.common.state.ListStateDescriptor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.utils.MultipleParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.state.PredefinedOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.FunctionInitializationContext;
> import org.apache.flink.runtime.state.FunctionSnapshotContext;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.util.Collector;
>
> import java.nio.file.Files;
> import java.nio.file.Path;
>
> /**
>  * Works fast in the following cases.
>  * <ul>
>  *     <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li>
>  *     <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link
> #PARALLELISM} is 1 to 4.</li>
>  * </ul>
>  * <p>
>  * Some results:
>  * </p>
>  * <ul>
>  *     <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li>
>  *     <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li>
>  *     <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li>
>  *     <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li>
>  *     <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li>
>  *     <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li>
>  * </ul>
>  * <p>
>  */
> public class WordCount {
>     /**
>      * The parallelism of the job.
>      */
>     private static final int PARALLELISM = 5;
>
>     /**
>      * Whether to use managed memory. True, no changes in the config.
>      * False, managed memory is disabled.
>      */
>     private static final boolean USE_MANAGED_MEMORY = true;
>
>     /**
>      * The source synthesizes this many events.
>      */
>     public static final int EVENT_COUNT = 1_000_000;
>
>     /**
>      * The value of each event is {@code EVENT_COUNT % MAX_VALUE}.
>      * Essentially controls the count of unique keys.
>      */
>     public static final int MAX_VALUE = 1_000;
>
>
>     //
> *************************************************************************
>     // PROGRAM
>     //
> *************************************************************************
>
>     public static void main(String[] args) throws Exception {
>
>         // Checking input parameters
>         final MultipleParameterTool params =
> MultipleParameterTool.fromArgs(args);
>
>         // set up the execution environment
>         Configuration configuration = new Configuration();
>         if (!USE_MANAGED_MEMORY) {
>             configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY,
> USE_MANAGED_MEMORY);
>         }
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM,
> configuration);
>
>         Path tempDirPath = Files.createTempDirectory("example");
>         String checkpointDataUri = "file://" + tempDirPath.toString();
>
>         RocksDBStateBackend rocksDBStateBackend = new
> RocksDBStateBackend(checkpointDataUri, true);
>
>
> rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
>         env.setStateBackend((StateBackend) rocksDBStateBackend);
>
>         // make parameters available in the web interface
>         env.getConfig().setGlobalJobParameters(params);
>
>         // get input data
>         DataStream<Long> text = env.addSource(new ExampleCountSource());
>
>         text.keyBy(v -> v)
>                 .flatMap(new ValuesCounter())
>                 .addSink(new DiscardingSink<>());
>
>         long before = System.currentTimeMillis();
>         env.execute("Streaming WordCount");
>         long duration = System.currentTimeMillis() - before;
>
>         System.out.println("Done " + duration + " ms, parallelism " +
> PARALLELISM);
>     }
>
>     //
> *************************************************************************
>     // USER FUNCTIONS
>     //
> *************************************************************************
>
>     private static class ValuesCounter extends RichFlatMapFunction<Long,
> Tuple2&lt;Long, Long>> {
>         private ValueState<Long> state;
>
>         @Override
>         public void flatMap(Long value, Collector<Tuple2&lt;Long, Long>>
> out) throws Exception {
>             Long oldCount = state.value();
>             if (oldCount == null) {
>                 oldCount = 0L;
>             }
>             long newCount = oldCount + 1;
>             state.update(newCount);
>
>             out.collect(Tuple2.of(value, newCount));
>         }
>
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             super.open(parameters);
>
>             ValueStateDescriptor<Long> descriptor = new
> ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO);
>             state = getRuntimeContext().getState(descriptor);
>         }
>     }
>
>     public static class ExampleCountSource implements SourceFunction<Long>,
> CheckpointedFunction {
>         private long count = 0L;
>         private volatile boolean isRunning = true;
>
>         private transient ListState<Long> checkpointedCount;
>
>         public void run(SourceContext<Long> ctx) {
>             while (isRunning && count < EVENT_COUNT) {
>                 // this synchronized block ensures that state
> checkpointing,
>                 // internal state updates and emission of elements are an
> atomic operation
>                 synchronized (ctx.getCheckpointLock()) {
>                     ctx.collect(count % MAX_VALUE);
>                     count++;
>                 }
>             }
>         }
>
>         public void cancel() {
>             isRunning = false;
>         }
>
>         public void initializeState(FunctionInitializationContext context)
> throws Exception {
>             this.checkpointedCount = context
>                     .getOperatorStateStore()
>                     .getListState(new ListStateDescriptor<>("count",
> Long.class));
>
>             if (context.isRestored()) {
>                 for (Long count : this.checkpointedCount.get()) {
>                     this.count = count;
>                 }
>             }
>         }
>
>         public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
>             this.checkpointedCount.clear();
>             this.checkpointedCount.add(count);
>         }
>     }
> }
>
>
> Regards,
> Juha
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to