What do the TaskManager logs say wrt to allocation of managed memory?

Something like:

Limiting managed memory to ... of the currently free heap space ..., memory 
will be allocated lazily.

What else did you configure in flink-conf?

Looping in Greg and Vasia who maintain Gelly and are most-familiar with the 
internals.

– Ufuk


On 8 November 2016 at 22:35:22, Miguel Coimbra (miguel.e.coim...@gmail.com) 
wrote:
> Dear community,
>  
> I have a problem which I hope you'll be able to help with.
> I apologize in advance for the verbosity of the post.
> I am running the Flink standalone cluster (not even storing to the
> filesystem) with 2 Docker containers.
>  
> I set the image of the Dockerfile for Flink 1.1.2, which was the same
> version of the main class in the .jar
> The Docker image was configured to use Java 8, which is what the project's
> pom.xml requires as well.
> I have also edited the TaskManager conf/flink-con.yaml to have the
> following values:
>  
> ....
> taskmanager.heap.mb: 7512
> ....
> taskmanager.network.numberOfBuffers: 16048
> ....
>  
>  
> Properties of this host/docker setup:
> - host machine has *256 GB *of RAM
> - job manager container is running with default flink config
> - task manager has *7.5 GB *of memory available
> - task manager number of buffers is *16048 *which is very generous compared
> to the default value
>  
> I am testing on the SNAP DBLP dataset:
> https://snap.stanford.edu/data/com-DBLP.html
> It has:
>  
> 317080 nodes
> 1049866 edges
>  
> These are the relevant parts of the pom.xml of the project:
> *(note: the project executes without error for local executions without the
> cluster)*
>  
> ....
>  
> UTF-8
>  
> UTF-8  
> 1.8
> 1.8
> 1.1.2
>  
> .....
>  
>  
> org.apache.flink
> flink-java
> ${flink.version}
>  
>  
> org.apache.flink
> flink-core
> ${flink.version}
>  
>  
> org.apache.flink
> flink-streaming-java_2.10
> ${flink.version}
>  
>  
> org.apache.flink
> flink-clients_2.10
> ${flink.version}
>  
>  
> org.apache.flink
> flink-gelly_2.10
> ${flink.version}
>  
>  
> junit
> junit
> 3.8.1
> test
>  
>  
>  
> I am running (what I believe to be) a simple Gelly application, performing
> the ConnectedComponents algorithm with 30 iterations:
>  
> public static void main(String[] args) {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>  
>  
> final String dataPath = args[0];
>  
> final DataSet> edgeTuples =
> env.readCsvFile(dataPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#") // comments start with "%"
> .types(Long.class, Long.class);
>  
> try {
> System.out.println("Tuple size: " + edgeTuples.count());
> } catch (Exception e1) {
> e1.printStackTrace();
> }
>  
> /*
> * @param the key type for edge and vertex identifiers
> * @param the value type for vertices
> * @param the value type for edges
> * public class Graph
> */
>  
>  
> final Graph graph = Graph.fromTuple2DataSet(
> edgeTuples,
> new MapFunction() {
> private static final long serialVersionUID =
> 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env
> );
>  
>  
> try {
> /**
> * @param key type
> * @param vertex value type
> * @param edge value type
> * @param the return type
>  
> class ConnectedComponents, EV>
> implements GraphAlgorithm>>
> */
>  
> DataSet> verticesWithComponents =
> graph.run(new ConnectedComponents(30));
> System.out.println("Component count: " +
> verticesWithComponents.count());
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>  
>  
> However, the following is output on the host machine on execution:
>  
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
> /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
> /home/myuser/com-dblp.ungraph.txt
>  
> Cluster configuration: Standalone cluster with JobManager at /
> 172.19.0.2:6123
> Using address 172.19.0.2:6123 to connect to JobManager.
> JobManager web interface address http://172.19.0.2:8081
> Starting execution of program
> Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
>  
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING  
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> FINISHED
> 11/08/2016 21:22:44 Job execution switched to status FINISHED.
> Tuple size: 1049866
> Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
> 11/08/2016 21:22:45 Job execution switched to status RUNNING.
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
>  
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
>  
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
> fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING
> 11/08/2016 21:22:45 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to RUNNING
> 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) ->
> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to FINISHED
> 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at
> fromDataSet(Graph.java:216)) -> Combine(Distinct at
> fromDataSet(Graph.java:216))(1/1) switched to FINISHED
> 11/08/2016 21:22:48 CHAIN Reduce (Distinct at
> fromDataSet(Graph.java:216)) -> Map (Map at
> fromDataSet(Graph.java:217))(1/1) switched to FINISHED
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
>   
> switched to SCHEDULED
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
>   
> switched to DEPLOYING
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
>   
> switched to RUNNING
> 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration
> (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 |
> org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a))(1/1)
>   
> switched to FAILED
> java.lang.IllegalArgumentException: Too few memory segments provided. Hash
> Table needs at least 33 memory segments.
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206)
>   
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191)
>   
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
>   
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
>   
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>  
> 11/08/2016 21:22:48 Job execution switched to status FAILING.
> java.lang.IllegalArgumentException: Too few memory segments provided. Hash
> Table needs at least 33 memory segments.
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:206)
>   
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.(CompactingHashTable.java:191)
>   
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initCompactingHashTable(IterationHeadTask.java:175)
>   
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:272)
>   
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>  
> The results I found online so far were not enough, and I am not sure as to
> the best way to solve this.
>  
> If anyone can help diagnose and correct this issue, I would be very
> thankful.
>  
> Best regards,
>  
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com  
> Skype: miguel.e.coimbra
>  

Reply via email to