Hey,

Here's a simple test. It's basically the WordCount example from Flink, but
using RocksDB as the state backend and having a stateful operator. The
javadocs explain how to use it. 


/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.examples.wordcount;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.nio.file.Files;
import java.nio.file.Path;

/**
 * Works fast in the following cases.
 * <ul>
 *     <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li>
 *     <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link
#PARALLELISM} is 1 to 4.</li>
 * </ul>
 * <p>
 * Some results:
 * </p>
 * <ul>
 *     <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li>
 *     <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li>
 *     <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li>
 *     <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li>
 *     <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li>
 *     <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li>
 * </ul>
 * <p>
 */
public class WordCount {
    /**
     * The parallelism of the job.
     */
    private static final int PARALLELISM = 5;

    /**
     * Whether to use managed memory. True, no changes in the config.
     * False, managed memory is disabled.
     */
    private static final boolean USE_MANAGED_MEMORY = true;

    /**
     * The source synthesizes this many events.
     */
    public static final int EVENT_COUNT = 1_000_000;

    /**
     * The value of each event is {@code EVENT_COUNT % MAX_VALUE}.
     * Essentially controls the count of unique keys.
     */
    public static final int MAX_VALUE = 1_000;


    //
*************************************************************************
    // PROGRAM
    //
*************************************************************************

    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);

        // set up the execution environment
        Configuration configuration = new Configuration();
        if (!USE_MANAGED_MEMORY) {
            configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY,
USE_MANAGED_MEMORY);
        }
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM,
configuration);

        Path tempDirPath = Files.createTempDirectory("example");
        String checkpointDataUri = "file://" + tempDirPath.toString();

        RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend(checkpointDataUri, true);
       
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
        env.setStateBackend((StateBackend) rocksDBStateBackend);

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<Long> text = env.addSource(new ExampleCountSource());

        text.keyBy(v -> v)
                .flatMap(new ValuesCounter())
                .addSink(new DiscardingSink<>());

        long before = System.currentTimeMillis();
        env.execute("Streaming WordCount");
        long duration = System.currentTimeMillis() - before;

        System.out.println("Done " + duration + " ms, parallelism " +
PARALLELISM);
    }

    //
*************************************************************************
    // USER FUNCTIONS
    //
*************************************************************************

    private static class ValuesCounter extends RichFlatMapFunction<Long,
Tuple2&lt;Long, Long>> {
        private ValueState<Long> state;

        @Override
        public void flatMap(Long value, Collector<Tuple2&lt;Long, Long>>
out) throws Exception {
            Long oldCount = state.value();
            if (oldCount == null) {
                oldCount = 0L;
            }
            long newCount = oldCount + 1;
            state.update(newCount);

            out.collect(Tuple2.of(value, newCount));
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            ValueStateDescriptor<Long> descriptor = new
ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO);
            state = getRuntimeContext().getState(descriptor);
        }
    }

    public static class ExampleCountSource implements SourceFunction<Long>,
CheckpointedFunction {
        private long count = 0L;
        private volatile boolean isRunning = true;

        private transient ListState<Long> checkpointedCount;

        public void run(SourceContext<Long> ctx) {
            while (isRunning && count < EVENT_COUNT) {
                // this synchronized block ensures that state checkpointing,
                // internal state updates and emission of elements are an
atomic operation
                synchronized (ctx.getCheckpointLock()) {
                    ctx.collect(count % MAX_VALUE);
                    count++;
                }
            }
        }

        public void cancel() {
            isRunning = false;
        }

        public void initializeState(FunctionInitializationContext context)
throws Exception {
            this.checkpointedCount = context
                    .getOperatorStateStore()
                    .getListState(new ListStateDescriptor<>("count",
Long.class));

            if (context.isRestored()) {
                for (Long count : this.checkpointedCount.get()) {
                    this.count = count;
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws
Exception {
            this.checkpointedCount.clear();
            this.checkpointedCount.add(count);
        }
    }
}


Regards,
Juha



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to