Thanks for your replies.

We use Flink from within a standalone Java 8 application (no Hadoop, no 
clustering), so it's basically boils down to running a simple code like this:

import java.util.*;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.*;
import org.apache.flink.graph.library.CommunityDetection;

public class FlinkTester {
    final Random random = new Random(1);
    final float density = 3.0F;

    public static void main(String[] args) throws Exception {
        new FlinkTester().execute(1000000, 4);
    }

    private void execute(int numEdges, int parallelism) throws Exception {
        final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(parallelism);
        final Graph<Long, Long, Double> graph = createGraph(numEdges, env);

        final long start = System.currentTimeMillis();
        List<Vertex<Long, Long>> vertices = graph.run(new 
CommunityDetection<Long>(10, 0.5)).getVertices().collect();
        System.out.println(vertices.size() + " vertices processed in " + 
(System.currentTimeMillis()-start)/1000 + " s");
    }

    private Graph<Long, Long, Double> createGraph(int numEdges, 
ExecutionEnvironment env) {
        System.out.println("Creating new graph of " + numEdges + " edges...");

        final int maxNumVertices = (int)(numEdges/density);
        final Map<Long, Vertex<Long, Long>> vertexMap = new 
HashMap<>(maxNumVertices);
        final Map<String, Edge<Long, Double>> edgeMap = new HashMap<>(numEdges);

        while (edgeMap.size() < numEdges) {
            long sourceId = random.nextInt(maxNumVertices) + 1;
            long targetId = sourceId;
            while (targetId == sourceId)
                targetId = random.nextInt(maxNumVertices) + 1;

            final String edgeKey = sourceId + "#" + targetId;
            if (!edgeMap.containsKey(edgeKey)) {
                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
                if (!vertexMap.containsKey(sourceId))
                    vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId));
                if (!vertexMap.containsKey(targetId))
                    vertexMap.put(targetId, new Vertex<>(targetId, targetId));
            }
        }

        System.out.println(edgeMap.size() + " edges created between " + 
vertexMap.size() + " vertices.");
        return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env);
    }
}

No matter what graph algorithm you pick for benchmarking (above it's 
CommunityDetection) the bigger the graph the wider performance gap (and higher 
CPU/memory consumption) you observe when comparing the execution times between 
the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, 1.8.2 and 
1.9.1).

Just run the code yourselves (you may play with the number of edges and 
parallel threads).

Best,

Jakub

Reply via email to