Hi,

In Flink 1.5 there were three big changes, that could affect performance. 
1. FLIP-6 changes (As previously Yang and Fabian mentioned)
2. Credit base flow control (especially if you are using SSL)
3. Low latency network changes

I would suspect them in that order. First and second you can disable via 
configuration switches [1] and [2] respectively.

[1] “mode:legacy" 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core>
[2] "taskmanager.network.credit-model:false”

Could you try disabling them out?

Piotrek

> On 28 Oct 2019, at 14:10, Jakub Danilewicz <jdanilew...@alto-analytics.com> 
> wrote:
> 
> Thanks for your replies.
> 
> We use Flink from within a standalone Java 8 application (no Hadoop, no 
> clustering), so it's basically boils down to running a simple code like this:
> 
> import java.util.*;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.graph.*;
> import org.apache.flink.graph.library.CommunityDetection;
> 
> public class FlinkTester {
>    final Random random = new Random(1);
>    final float density = 3.0F;
> 
>    public static void main(String[] args) throws Exception {
>        new FlinkTester().execute(1000000, 4);
>    }
> 
>    private void execute(int numEdges, int parallelism) throws Exception {
>        final ExecutionEnvironment env = 
> ExecutionEnvironment.createLocalEnvironment(parallelism);
>        final Graph<Long, Long, Double> graph = createGraph(numEdges, env);
> 
>        final long start = System.currentTimeMillis();
>        List<Vertex<Long, Long>> vertices = graph.run(new 
> CommunityDetection<Long>(10, 0.5)).getVertices().collect();
>        System.out.println(vertices.size() + " vertices processed in " + 
> (System.currentTimeMillis()-start)/1000 + " s");
>    }
> 
>    private Graph<Long, Long, Double> createGraph(int numEdges, 
> ExecutionEnvironment env) {
>        System.out.println("Creating new graph of " + numEdges + " edges...");
> 
>        final int maxNumVertices = (int)(numEdges/density);
>        final Map<Long, Vertex<Long, Long>> vertexMap = new 
> HashMap<>(maxNumVertices);
>        final Map<String, Edge<Long, Double>> edgeMap = new 
> HashMap<>(numEdges);
> 
>        while (edgeMap.size() < numEdges) {
>            long sourceId = random.nextInt(maxNumVertices) + 1;
>            long targetId = sourceId;
>            while (targetId == sourceId)
>                targetId = random.nextInt(maxNumVertices) + 1;
> 
>            final String edgeKey = sourceId + "#" + targetId;
>            if (!edgeMap.containsKey(edgeKey)) {
>                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
>                if (!vertexMap.containsKey(sourceId))
>                    vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId));
>                if (!vertexMap.containsKey(targetId))
>                    vertexMap.put(targetId, new Vertex<>(targetId, targetId));
>            }
>        }
> 
>        System.out.println(edgeMap.size() + " edges created between " + 
> vertexMap.size() + " vertices.");
>        return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env);
>    }
> }
> 
> No matter what graph algorithm you pick for benchmarking (above it's 
> CommunityDetection) the bigger the graph the wider performance gap (and 
> higher CPU/memory consumption) you observe when comparing the execution times 
> between the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, 
> 1.8.2 and 1.9.1).
> 
> Just run the code yourselves (you may play with the number of edges and 
> parallel threads).
> 
> Best,
> 
> Jakub
> 

Reply via email to