Hi, In Flink 1.5 there were three big changes, that could affect performance. 1. FLIP-6 changes (As previously Yang and Fabian mentioned) 2. Credit base flow control (especially if you are using SSL) 3. Low latency network changes
I would suspect them in that order. First and second you can disable via configuration switches [1] and [2] respectively. [1] “mode:legacy" https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core> [2] "taskmanager.network.credit-model:false” Could you try disabling them out? Piotrek > On 28 Oct 2019, at 14:10, Jakub Danilewicz <jdanilew...@alto-analytics.com> > wrote: > > Thanks for your replies. > > We use Flink from within a standalone Java 8 application (no Hadoop, no > clustering), so it's basically boils down to running a simple code like this: > > import java.util.*; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.graph.*; > import org.apache.flink.graph.library.CommunityDetection; > > public class FlinkTester { > final Random random = new Random(1); > final float density = 3.0F; > > public static void main(String[] args) throws Exception { > new FlinkTester().execute(1000000, 4); > } > > private void execute(int numEdges, int parallelism) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment(parallelism); > final Graph<Long, Long, Double> graph = createGraph(numEdges, env); > > final long start = System.currentTimeMillis(); > List<Vertex<Long, Long>> vertices = graph.run(new > CommunityDetection<Long>(10, 0.5)).getVertices().collect(); > System.out.println(vertices.size() + " vertices processed in " + > (System.currentTimeMillis()-start)/1000 + " s"); > } > > private Graph<Long, Long, Double> createGraph(int numEdges, > ExecutionEnvironment env) { > System.out.println("Creating new graph of " + numEdges + " edges..."); > > final int maxNumVertices = (int)(numEdges/density); > final Map<Long, Vertex<Long, Long>> vertexMap = new > HashMap<>(maxNumVertices); > final Map<String, Edge<Long, Double>> edgeMap = new > HashMap<>(numEdges); > > while (edgeMap.size() < numEdges) { > long sourceId = random.nextInt(maxNumVertices) + 1; > long targetId = sourceId; > while (targetId == sourceId) > targetId = random.nextInt(maxNumVertices) + 1; > > final String edgeKey = sourceId + "#" + targetId; > if (!edgeMap.containsKey(edgeKey)) { > edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D)); > if (!vertexMap.containsKey(sourceId)) > vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId)); > if (!vertexMap.containsKey(targetId)) > vertexMap.put(targetId, new Vertex<>(targetId, targetId)); > } > } > > System.out.println(edgeMap.size() + " edges created between " + > vertexMap.size() + " vertices."); > return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env); > } > } > > No matter what graph algorithm you pick for benchmarking (above it's > CommunityDetection) the bigger the graph the wider performance gap (and > higher CPU/memory consumption) you observe when comparing the execution times > between the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, > 1.8.2 and 1.9.1). > > Just run the code yourselves (you may play with the number of edges and > parallel threads). > > Best, > > Jakub >