Dear community, I have a problem which I hope you'll be able to help with. I apologize in advance for the verbosity of the post. I am running the Flink standalone cluster (not even storing to the filesystem) with 2 Docker containers.
I set the image of the Dockerfile for Flink 1.1.2, which was the same version of the main class in the .jar The Docker image was configured to use Java 8, which is what the project's pom.xml requires as well. I have also edited the TaskManager conf/flink-con.yaml to have the following values: .... taskmanager.heap.mb: 7512 .... taskmanager.network.numberOfBuffers: 16048 .... Properties of this host/docker setup: - host machine has *256 GB *of RAM - job manager container is running with default flink config - task manager has *7.5 GB *of memory available - task manager number of buffers is *16048 *which is very generous compared to the default value I am testing on the SNAP DBLP dataset: https://snap.stanford.edu/data/com-DBLP.html It has: 317080 nodes 1049866 edges These are the relevant parts of the pom.xml of the project: *(note: the project executes without error for local executions without the cluster)* .... <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <flink.version>1.1.2</flink.version> </properties> ..... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-gelly_2.10</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> I am running (what I believe to be) a simple Gelly application, performing the ConnectedComponents algorithm with 30 iterations: public static void main(String[] args) { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final String dataPath = args[0]; final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath) .fieldDelimiter("\t") // node IDs are separated by spaces .ignoreComments("#") // comments start with "%" .types(Long.class, Long.class); try { System.out.println("Tuple size: " + edgeTuples.count()); } catch (Exception e1) { e1.printStackTrace(); } /* * @param <K> the key type for edge and vertex identifiers * @param <VV> the value type for vertices * @param <EV> the value type for edges * public class Graph<K, VV, EV> */ final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet( edgeTuples, new MapFunction<Long, Long>() { private static final long serialVersionUID = 8713516577419451509L; public Long map(Long value) { return value; } }, env ); try { /** * @param <K> key type * @param <VV> vertex value type * @param <EV> edge value type * @param <T> the return type class ConnectedComponents<K, VV extends Comparable<VV>, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, VV>>> */ DataSet<Vertex<Long, Long>> verticesWithComponents = graph.run(new ConnectedComponents<Long, Long, NullValue>(30)); System.out.println("Component count: " + verticesWithComponents.count()); } catch (Exception e) { e.printStackTrace(); } } However, the following is output on the host machine on execution: docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink run -m 3de7625b8e28:6123 -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar /home/myuser/com-dblp.ungraph.txt Cluster configuration: Standalone cluster with JobManager at / 172.19.0.2:6123 Using address 172.19.0.2:6123 to connect to JobManager. JobManager web interface address http://172.19.0.2:8081 Starting execution of program Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for job completion. Connected to JobManager at Actor[akka.tcp:// flink@172.19.0.2:6123/user/jobmanager#-658812967] 11/08/2016 21:22:44 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED 11/08/2016 21:22:44 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING 11/08/2016 21:22:44 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED 11/08/2016 21:22:44 DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED 11/08/2016 21:22:44 Job execution switched to status FINISHED. Tuple size: 1049866 Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for job completion. Connected to JobManager at Actor[akka.tcp:// flink@172.19.0.2:6123/user/jobmanager#-658812967] 11/08/2016 21:22:45 Job execution switched to status RUNNING. 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING 11/08/2016 21:22:45 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED 11/08/2016 21:22:45 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING 11/08/2016 21:22:45 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to RUNNING 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to FINISHED 11/08/2016 21:22:48 CHAIN Reduce (Distinct at fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) switched to FINISHED 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to SCHEDULED 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to DEPLOYING 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to RUNNING 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1) switched to FAILED java.lang.IllegalArgumentException: Too few memory segments provided. Hash Table needs at least 33 memory segments. at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206) at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191) at org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 11/08/2016 21:22:48 Job execution switched to status FAILING. java.lang.IllegalArgumentException: Too few memory segments provided. Hash Table needs at least 33 memory segments. at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:206) at org.apache.flink.runtime.operators.hash.CompactingHashTable.<init>(CompactingHashTable.java:191) at org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) The results I found online so far were not enough, and I am not sure as to the best way to solve this. If anyone can help diagnose and correct this issue, I would be very thankful. Best regards, Miguel E. Coimbra Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> Skype: miguel.e.coimbra