Dear community,

I have a problem which I hope you'll be able to help with.
I apologize in advance for the verbosity of the post.
I am running the Flink standalone cluster (not even storing to the
filesystem) with 2 Docker containers.

I set the image of the Dockerfile for Flink 1.1.2, which was the same
version of the main class in the .jar
The Docker image was configured to use Java 8, which is what the project's
pom.xml requires as well.
I have also edited the TaskManager conf/flink-con.yaml to have the
following values:

....
taskmanager.heap.mb: 7512
....
taskmanager.network.numberOfBuffers: 16048
....


Properties of this host/docker setup:
- host machine has *256 GB *of RAM
- job manager container is running with default flink config
- task manager has *7.5 GB *of memory available
- task manager number of buffers is *16048 *which is very generous compared
to the default value

I am testing on the SNAP DBLP dataset:
https://snap.stanford.edu/data/com-DBLP.html
It has:

 317080 nodes
1049866 edges

These are the relevant parts of the pom.xml of the project:
*(note: the project executes without error for local executions without the
cluster)*

....
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <flink.version>1.1.2</flink.version>
  </properties>
.....
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-gelly_2.10</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

I am running (what I believe to be) a simple Gelly application, performing
the ConnectedComponents algorithm with 30 iterations:

public static void main(String[] args) {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();


        final String dataPath = args[0];

        final DataSet<Tuple2<Long, Long>> edgeTuples =
env.readCsvFile(dataPath)
            .fieldDelimiter("\t") // node IDs are separated by spaces
            .ignoreComments("#")  // comments start with "%"
            .types(Long.class, Long.class);

        try {
            System.out.println("Tuple size: " + edgeTuples.count());
        } catch (Exception e1) {
            e1.printStackTrace();
        }

        /*
         * @param <K> the key type for edge and vertex identifiers
         * @param <VV> the value type for vertices
         * @param <EV> the value type for edges
         * public class Graph<K, VV, EV>
         */


        final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
            edgeTuples,
            new MapFunction<Long, Long>() {
                private static final long serialVersionUID =
8713516577419451509L;
                public Long map(Long value) {
                    return value;
                }
            },
            env
        );


        try {
            /**
             * @param <K> key type
             * @param <VV> vertex value type
             * @param <EV> edge value type
             * @param <T> the return type

            class ConnectedComponents<K, VV extends Comparable<VV>, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, VV>>>
            */

            DataSet<Vertex<Long, Long>> verticesWithComponents =
graph.run(new ConnectedComponents<Long, Long, NullValue>(30));
            System.out.println("Component count: " +
verticesWithComponents.count());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


However, the following is output on the host machine on execution:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
/home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
/home/myuser/com-dblp.ungraph.txt

Cluster configuration: Standalone cluster with JobManager at /
172.19.0.2:6123
Using address 172.19.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.19.0.2:8081
Starting execution of program
Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://
flink@172.19.0.2:6123/user/jobmanager#-658812967]

11/08/2016 21:22:44     DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
SCHEDULED
11/08/2016 21:22:44     DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
DEPLOYING
11/08/2016 21:22:44     DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to SCHEDULED
11/08/2016 21:22:44     DataSink (count())(1/1) switched to DEPLOYING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to RUNNING
11/08/2016 21:22:44     DataSink (count())(1/1) switched to FINISHED
11/08/2016 21:22:44     DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
FINISHED
11/08/2016 21:22:44     Job execution switched to status FINISHED.
Tuple size: 1049866
Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://
flink@172.19.0.2:6123/user/jobmanager#-658812967]
11/08/2016 21:22:45     Job execution switched to status RUNNING.
11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED

11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING

11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at
fromDataSet(Graph.java:216)) -> Combine(Distinct at
fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
SCHEDULED
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at
fromDataSet(Graph.java:216)) -> Combine(Distinct at
fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
DEPLOYING
11/08/2016 21:22:45     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN FlatMap (FlatMap at
fromDataSet(Graph.java:216)) -> Combine(Distinct at
fromDataSet(Graph.java:216))(1/1) switched to RUNNING
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CoGroup (Messaging)(1/1) switched to RUNNING
11/08/2016 21:22:45     CHAIN Reduce (Distinct at
fromDataSet(Graph.java:216)) -> Map (Map at
fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
11/08/2016 21:22:45     CHAIN Reduce (Distinct at
fromDataSet(Graph.java:216)) -> Map (Map at
fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
11/08/2016 21:22:45     CHAIN Reduce (Distinct at
fromDataSet(Graph.java:216)) -> Map (Map at
fromDataSet(Graph.java:217))(1/1) switched to RUNNING
11/08/2016 21:22:47     CHAIN Map (Map at mapEdges(Graph.java:596)) ->
FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
11/08/2016 21:22:47     CHAIN FlatMap (FlatMap at
fromDataSet(Graph.java:216)) -> Combine(Distinct at
fromDataSet(Graph.java:216))(1/1) switched to FINISHED
11/08/2016 21:22:48     CHAIN Reduce (Distinct at
fromDataSet(Graph.java:216)) -> Map (Map at
fromDataSet(Graph.java:217))(1/1) switched to FINISHED
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
switched to SCHEDULED
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
switched to DEPLOYING
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
switched to RUNNING
11/08/2016 21:22:48     IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
switched to FAILED
java.lang.IllegalArgumentException: Too few memory segments provided. Hash
Table needs at least 33 memory segments.
        at
org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206)
        at
org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191)
        at
org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
        at
org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)

11/08/2016 21:22:48     Job execution switched to status FAILING.
java.lang.IllegalArgumentException: Too few memory segments provided. Hash
Table needs at least 33 memory segments.
        at
org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206)
        at
org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191)
        at
org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
        at
org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)

The results I found online so far were not enough, and I am not sure as to
the best way to solve this.

If anyone can help diagnose and correct this issue, I would be very
thankful.

Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

Reply via email to