Hi Juha, Thanks for sharing the testing program to expose the problem. This indeed looks suboptimal if X does not leave space for the window operator. I am adding Yu and Yun who might have a better idea about what could be improved about sharing the RocksDB memory among operators.
Best, Andrey On Thu, Jun 25, 2020 at 9:10 AM Juha Mynttinen <juha.myntti...@king.com> wrote: > Hey, > > Here's a simple test. It's basically the WordCount example from Flink, but > using RocksDB as the state backend and having a stateful operator. The > javadocs explain how to use it. > > > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > package org.apache.flink.streaming.examples.wordcount; > > import org.apache.flink.api.common.functions.RichFlatMapFunction; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.utils.MultipleParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.contrib.streaming.state.PredefinedOptions; > import org.apache.flink.contrib.streaming.state.RocksDBOptions; > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.runtime.state.FunctionInitializationContext; > import org.apache.flink.runtime.state.FunctionSnapshotContext; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.DiscardingSink; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.util.Collector; > > import java.nio.file.Files; > import java.nio.file.Path; > > /** > * Works fast in the following cases. > * <ul> > * <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li> > * <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link > #PARALLELISM} is 1 to 4.</li> > * </ul> > * <p> > * Some results: > * </p> > * <ul> > * <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li> > * <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li> > * <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li> > * <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li> > * <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li> > * <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li> > * </ul> > * <p> > */ > public class WordCount { > /** > * The parallelism of the job. > */ > private static final int PARALLELISM = 5; > > /** > * Whether to use managed memory. True, no changes in the config. > * False, managed memory is disabled. > */ > private static final boolean USE_MANAGED_MEMORY = true; > > /** > * The source synthesizes this many events. > */ > public static final int EVENT_COUNT = 1_000_000; > > /** > * The value of each event is {@code EVENT_COUNT % MAX_VALUE}. > * Essentially controls the count of unique keys. > */ > public static final int MAX_VALUE = 1_000; > > > // > ************************************************************************* > // PROGRAM > // > ************************************************************************* > > public static void main(String[] args) throws Exception { > > // Checking input parameters > final MultipleParameterTool params = > MultipleParameterTool.fromArgs(args); > > // set up the execution environment > Configuration configuration = new Configuration(); > if (!USE_MANAGED_MEMORY) { > configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, > USE_MANAGED_MEMORY); > } > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM, > configuration); > > Path tempDirPath = Files.createTempDirectory("example"); > String checkpointDataUri = "file://" + tempDirPath.toString(); > > RocksDBStateBackend rocksDBStateBackend = new > RocksDBStateBackend(checkpointDataUri, true); > > > rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED); > env.setStateBackend((StateBackend) rocksDBStateBackend); > > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > > // get input data > DataStream<Long> text = env.addSource(new ExampleCountSource()); > > text.keyBy(v -> v) > .flatMap(new ValuesCounter()) > .addSink(new DiscardingSink<>()); > > long before = System.currentTimeMillis(); > env.execute("Streaming WordCount"); > long duration = System.currentTimeMillis() - before; > > System.out.println("Done " + duration + " ms, parallelism " + > PARALLELISM); > } > > // > ************************************************************************* > // USER FUNCTIONS > // > ************************************************************************* > > private static class ValuesCounter extends RichFlatMapFunction<Long, > Tuple2<Long, Long>> { > private ValueState<Long> state; > > @Override > public void flatMap(Long value, Collector<Tuple2<Long, Long>> > out) throws Exception { > Long oldCount = state.value(); > if (oldCount == null) { > oldCount = 0L; > } > long newCount = oldCount + 1; > state.update(newCount); > > out.collect(Tuple2.of(value, newCount)); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > ValueStateDescriptor<Long> descriptor = new > ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO); > state = getRuntimeContext().getState(descriptor); > } > } > > public static class ExampleCountSource implements SourceFunction<Long>, > CheckpointedFunction { > private long count = 0L; > private volatile boolean isRunning = true; > > private transient ListState<Long> checkpointedCount; > > public void run(SourceContext<Long> ctx) { > while (isRunning && count < EVENT_COUNT) { > // this synchronized block ensures that state > checkpointing, > // internal state updates and emission of elements are an > atomic operation > synchronized (ctx.getCheckpointLock()) { > ctx.collect(count % MAX_VALUE); > count++; > } > } > } > > public void cancel() { > isRunning = false; > } > > public void initializeState(FunctionInitializationContext context) > throws Exception { > this.checkpointedCount = context > .getOperatorStateStore() > .getListState(new ListStateDescriptor<>("count", > Long.class)); > > if (context.isRestored()) { > for (Long count : this.checkpointedCount.get()) { > this.count = count; > } > } > } > > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > this.checkpointedCount.clear(); > this.checkpointedCount.add(count); > } > } > } > > > Regards, > Juha > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >