Thanks for your reply, Till. As mentioned above I execute graph processing in a straight-ahead Java standalone environment (no cluster underneath, no specific configuration except for parallelism), just as if you simply ran the Java class I pasted upthread with a Flink distribution JAR (plus Gelly and Slf4j/Log4j JARs) on its classpath.
I do not know what goes on behind the scenes, but the "legacy" mode significantly outperforms the "new" one in every single case. The new mode is a few times slower, getting worse and worse with the increasing size of the graph. As for setting "the maximum parallelism (== number of key groups) to a multiple of your parallelism", could you tell me which configuration option from the list below is it? https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html Best, Jakub On 2019/11/01 10:19:47, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Jakub, > > what are the cluster settings and the exact job settings you are running > your job with? I'm asking because one difference between legacy and FLIP-6 > mode is that the legacy mode spreads out tasks across all available > TaskManagers whereas the FLIP-6 mode tries to bin package them on as few > TaskManagers as possible. If you have more slots than the parallelism of > your job, then I could see how this could affect the performance of your > job if it is not I/O bound but CPU bound. We will add an option to enable > the old spread out strategy again [1]. > > Another reason why you might see a performance degradation is the placement > of key groups. In the legacy mode, Flink distributed them so that two > TaskManagers with the same number of tasks would only have at most one key > group more. In FLIP-6 it can be up to the number of slots more key groups > on one of the TaskManagers. In order to mitigate this problem I would > recommend to set the maximum parallelism (== number of key groups) to a > multiple of your parallelism. > > [1] https://issues.apache.org/jira/browse/FLINK-12122 > > Cheers, > Till > > On Wed, Oct 30, 2019 at 4:28 PM Jakub Danilewicz < > jdanilew...@alto-analytics.com> wrote: > > > Hi, > > > > I can confirm that the performance drop is directly related to FLIP-6 > > changes. Applying this modification to the code posted above restores the > > previous graph processing speed under Flink 1.5.6: > > > > --------------------------------------------------------------------------- > > > > org.apache.flink.configuration.Configuration customConfig = new > > org.apache.flink.configuration.Configuration(); > > customConfig.setString("mode", "legacy"); > > final ExecutionEnvironment env = > > ExecutionEnvironment.createLocalEnvironment(customConfig); > > env.setParallelism(parallelism); > > > > --------------------------------------------------------------------------- > > > > Disabling the "taskmanager.network.credit-model" parameter in > > Configuration provides only a very slight improvement in the performance > > under Flink 1.5.6. > > > > Now the big question: what about newer versions where the legacy mode is > > not supported anymore? I checked Flink 1.8.2 and it does not work. > > > > Is there any way to make the new mode as performant as the "legacy" one in > > the standalone scenarios? Alternatively may we expect improvements in this > > area in the upcoming releases? > > > > Best, > > > > Jakub > > > > On 2019/10/30 14:11:19, Piotr Nowojski <pi...@ververica.com> wrote: > > > Hi, > > > > > > In Flink 1.5 there were three big changes, that could affect > > performance. > > > 1. FLIP-6 changes (As previously Yang and Fabian mentioned) > > > 2. Credit base flow control (especially if you are using SSL) > > > 3. Low latency network changes > > > > > > I would suspect them in that order. First and second you can disable via > > configuration switches [1] and [2] respectively. > > > > > > [1] “mode:legacy" > > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core > > < > > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core > > > > > > [2] "taskmanager.network.credit-model:false” > > > > > > Could you try disabling them out? > > > > > > Piotrek > > > > > > > On 28 Oct 2019, at 14:10, Jakub Danilewicz < > > jdanilew...@alto-analytics.com> wrote: > > > > > > > > Thanks for your replies. > > > > > > > > We use Flink from within a standalone Java 8 application (no Hadoop, > > no clustering), so it's basically boils down to running a simple code like > > this: > > > > > > > > import java.util.*; > > > > import org.apache.flink.api.java.ExecutionEnvironment; > > > > import org.apache.flink.graph.*; > > > > import org.apache.flink.graph.library.CommunityDetection; > > > > > > > > public class FlinkTester { > > > > final Random random = new Random(1); > > > > final float density = 3.0F; > > > > > > > > public static void main(String[] args) throws Exception { > > > > new FlinkTester().execute(1000000, 4); > > > > } > > > > > > > > private void execute(int numEdges, int parallelism) throws > > Exception { > > > > final ExecutionEnvironment env = > > ExecutionEnvironment.createLocalEnvironment(parallelism); > > > > final Graph<Long, Long, Double> graph = createGraph(numEdges, > > env); > > > > > > > > final long start = System.currentTimeMillis(); > > > > List<Vertex<Long, Long>> vertices = graph.run(new > > CommunityDetection<Long>(10, 0.5)).getVertices().collect(); > > > > System.out.println(vertices.size() + " vertices processed in " > > + (System.currentTimeMillis()-start)/1000 + " s"); > > > > } > > > > > > > > private Graph<Long, Long, Double> createGraph(int numEdges, > > ExecutionEnvironment env) { > > > > System.out.println("Creating new graph of " + numEdges + " > > edges..."); > > > > > > > > final int maxNumVertices = (int)(numEdges/density); > > > > final Map<Long, Vertex<Long, Long>> vertexMap = new > > HashMap<>(maxNumVertices); > > > > final Map<String, Edge<Long, Double>> edgeMap = new > > HashMap<>(numEdges); > > > > > > > > while (edgeMap.size() < numEdges) { > > > > long sourceId = random.nextInt(maxNumVertices) + 1; > > > > long targetId = sourceId; > > > > while (targetId == sourceId) > > > > targetId = random.nextInt(maxNumVertices) + 1; > > > > > > > > final String edgeKey = sourceId + "#" + targetId; > > > > if (!edgeMap.containsKey(edgeKey)) { > > > > edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, > > 1D)); > > > > if (!vertexMap.containsKey(sourceId)) > > > > vertexMap.put(sourceId, new Vertex<>(sourceId, > > sourceId)); > > > > if (!vertexMap.containsKey(targetId)) > > > > vertexMap.put(targetId, new Vertex<>(targetId, > > targetId)); > > > > } > > > > } > > > > > > > > System.out.println(edgeMap.size() + " edges created between " + > > vertexMap.size() + " vertices."); > > > > return Graph.fromCollection(vertexMap.values(), > > edgeMap.values(), env); > > > > } > > > > } > > > > > > > > No matter what graph algorithm you pick for benchmarking (above it's > > CommunityDetection) the bigger the graph the wider performance gap (and > > higher CPU/memory consumption) you observe when comparing the execution > > times between the old engine (<= Flink 1.4.2) and the new one (checked on > > 1.5.6, 1.8.2 and 1.9.1). > > > > > > > > Just run the code yourselves (you may play with the number of edges > > and parallel threads). > > > > > > > > Best, > > > > > > > > Jakub > > > > > > > > > > > > >