Hi,

I can confirm that the performance drop is directly related to FLIP-6 changes. 
Applying this modification to the code posted above restores the previous graph 
processing speed under Flink 1.5.6:

---------------------------------------------------------------------------

    org.apache.flink.configuration.Configuration customConfig = new 
org.apache.flink.configuration.Configuration();
    customConfig.setString("mode", "legacy");
    final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(customConfig);
    env.setParallelism(parallelism);

---------------------------------------------------------------------------

Disabling the "taskmanager.network.credit-model" parameter in Configuration 
provides only a very slight improvement in the performance under Flink 1.5.6.

Now the big question: what about newer versions where the legacy mode is not 
supported anymore? I checked Flink 1.8.2 and it does not work.

Is there any way to make the new mode as performant as the "legacy" one in the 
standalone scenarios? Alternatively may we expect improvements in this area in 
the upcoming releases?

Best,

Jakub

On 2019/10/30 14:11:19, Piotr Nowojski <pi...@ververica.com> wrote: 
> Hi,
> 
> In Flink 1.5 there were three big changes, that could affect performance. 
> 1. FLIP-6 changes (As previously Yang and Fabian mentioned)
> 2. Credit base flow control (especially if you are using SSL)
> 3. Low latency network changes
> 
> I would suspect them in that order. First and second you can disable via 
> configuration switches [1] and [2] respectively.
> 
> [1] “mode:legacy" 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core>
> [2] "taskmanager.network.credit-model:false”
> 
> Could you try disabling them out?
> 
> Piotrek
> 
> > On 28 Oct 2019, at 14:10, Jakub Danilewicz <jdanilew...@alto-analytics.com> 
> > wrote:
> > 
> > Thanks for your replies.
> > 
> > We use Flink from within a standalone Java 8 application (no Hadoop, no 
> > clustering), so it's basically boils down to running a simple code like 
> > this:
> > 
> > import java.util.*;
> > import org.apache.flink.api.java.ExecutionEnvironment;
> > import org.apache.flink.graph.*;
> > import org.apache.flink.graph.library.CommunityDetection;
> > 
> > public class FlinkTester {
> >    final Random random = new Random(1);
> >    final float density = 3.0F;
> > 
> >    public static void main(String[] args) throws Exception {
> >        new FlinkTester().execute(1000000, 4);
> >    }
> > 
> >    private void execute(int numEdges, int parallelism) throws Exception {
> >        final ExecutionEnvironment env = 
> > ExecutionEnvironment.createLocalEnvironment(parallelism);
> >        final Graph<Long, Long, Double> graph = createGraph(numEdges, env);
> > 
> >        final long start = System.currentTimeMillis();
> >        List<Vertex<Long, Long>> vertices = graph.run(new 
> > CommunityDetection<Long>(10, 0.5)).getVertices().collect();
> >        System.out.println(vertices.size() + " vertices processed in " + 
> > (System.currentTimeMillis()-start)/1000 + " s");
> >    }
> > 
> >    private Graph<Long, Long, Double> createGraph(int numEdges, 
> > ExecutionEnvironment env) {
> >        System.out.println("Creating new graph of " + numEdges + " 
> > edges...");
> > 
> >        final int maxNumVertices = (int)(numEdges/density);
> >        final Map<Long, Vertex<Long, Long>> vertexMap = new 
> > HashMap<>(maxNumVertices);
> >        final Map<String, Edge<Long, Double>> edgeMap = new 
> > HashMap<>(numEdges);
> > 
> >        while (edgeMap.size() < numEdges) {
> >            long sourceId = random.nextInt(maxNumVertices) + 1;
> >            long targetId = sourceId;
> >            while (targetId == sourceId)
> >                targetId = random.nextInt(maxNumVertices) + 1;
> > 
> >            final String edgeKey = sourceId + "#" + targetId;
> >            if (!edgeMap.containsKey(edgeKey)) {
> >                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
> >                if (!vertexMap.containsKey(sourceId))
> >                    vertexMap.put(sourceId, new Vertex<>(sourceId, 
> > sourceId));
> >                if (!vertexMap.containsKey(targetId))
> >                    vertexMap.put(targetId, new Vertex<>(targetId, 
> > targetId));
> >            }
> >        }
> > 
> >        System.out.println(edgeMap.size() + " edges created between " + 
> > vertexMap.size() + " vertices.");
> >        return Graph.fromCollection(vertexMap.values(), edgeMap.values(), 
> > env);
> >    }
> > }
> > 
> > No matter what graph algorithm you pick for benchmarking (above it's 
> > CommunityDetection) the bigger the graph the wider performance gap (and 
> > higher CPU/memory consumption) you observe when comparing the execution 
> > times between the old engine (<= Flink 1.4.2) and the new one (checked on 
> > 1.5.6, 1.8.2 and 1.9.1).
> > 
> > Just run the code yourselves (you may play with the number of edges and 
> > parallel threads).
> > 
> > Best,
> > 
> > Jakub
> > 
> 
> 

Reply via email to