Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.
Hi Miguel, I'm sorry for the late reply; this e-mail got stuck in my spam folder. I'm glad that you've found a solution :) I've never used flink with docker, so I'm probably not the best person to advise you on this. However, if I understand correctly, you're changing the configuration before submitting the job but while the flink cluster is already running. I don't know if docker is supposed to do something differently, but after a flink cluster has been started, nodes won't reload any changes you make to the flink-conf.yaml. You'll either have to make your changes before starting the cluster or re-start. Cheers, -Vasia. On 14 November 2016 at 18:33, Miguel Coimbrawrote: > Hello, > > I believe I have figured this out. > > First, I tried Aandrey Melentyev's suggestion of executing with Apache > Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as > with some changes to provide additional memory. However, the same error > happened. > > Note: I changed my project's pom.xml and generated the .jar again using > Maven. > I also copied the new .jar to both Docker instances. > > The test machine has 256 GB RAM and it is a scenario of two Docker > containers. > I send attached the relevant parts of the logs of the JobManager and of > the TaskManager. > Regarding memory in the TaskManager log, I was looking at a couple of > executions and noticed something strange: > > 2016-11-14 15:48:45,256 INFO org.apache.flink.runtime.io.ne > twork.buffer.NetworkBufferPool - Allocated 64 MB for network buffer pool > (number of memory segments: 2048, bytes per segment: 32768). > 2016-11-14 15:48:45,413 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Limiting managed memory to 0.7 of the > currently free heap space (310 MB), memory will be allocated lazily. > > After that, I looked at the start of the TaskManager log and found this: > > 2016-11-14 15:48:38,843 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Starting TaskManager (Version: 1.1.3, > Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC) > 2016-11-14 15:48:38,843 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Current user: flink > 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskm > anager.TaskManager - JVM: OpenJDK 64-Bit Server VM - Oracle > Corporation - 1.8/25.92-b14 > 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Maximum heap size: 512 MiBytes > 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskm > anager.TaskManager - JAVA_HOME: > /usr/lib/jvm/java-1.8-openjdk/jre > 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Hadoop version: 2.7.2 > 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskm > anager.TaskManager - JVM Options: > 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskm > anager.TaskManager - -XX:+UseG1GC > > > *2016-11-14 15:48:38,850 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Xms512M2016-11-14 15:48:38,850 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Xmx512M*2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskm > anager.TaskManager - -XX:MaxDirectMemorySize=8388607T > > It seems it is running with only 512 MB, which is the default. > This in spite of me having edited the flink-conf.yaml file before > invoking the program for the cluster. > I looked at the log of the JobManager and the same thing happened: it was > using the default 256 MB instead of my 1024MB. > > - To recap, I built the Docker Flink image with (I send the Dockerfile > attached): > > cd docker-flink-image-builder/ > ls > Dockerfile Dockerfile~ README.md README.md~ > bluemix-docker-compose.sh* build.sh* docker-compose-bluemix.yml > ./build.sh > > The only file I changed from those is the Dockerfile. > This set of files was obtained from the Flink repository. > I used docker-compose up to start the standalone cluster: > > screen > cd docker-flink-image-builder/ > ls > Dockerfile Dockerfile~ README.md README.md~ > bluemix-docker-compose.sh* build.sh* docker-compose-bluemix.yml > docker-compose.yml docker-entrypoint.sh* > docker-compose up > > Then I accessed each Docker instance: > > docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) > /bin/sh > docker exec -it $(docker ps --filter > name=dockerflinkimagebuilder_taskmanager_1 > --format={{.ID}}) /bin/sh > > While inside each of those, I started a bash shell and changed the config > file like so: > > bash > cd /home/myuser/docker-image-build-context/flink-1.1.3/conf > vi flink-conf.yaml > > I have edited (on both the JobManager and the TaskManager) the following > settings: > > # The heap size for the JobManager JVM > jobmanager.heap.mb: 1024 > > # The heap size for the TaskManager JVM > taskmanager.heap.mb:
Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.
What do the TaskManager logs say wrt to allocation of managed memory? Something like: Limiting managed memory to ... of the currently free heap space ..., memory will be allocated lazily. What else did you configure in flink-conf? Looping in Greg and Vasia who maintain Gelly and are most-familiar with the internals. – Ufuk On 8 November 2016 at 22:35:22, Miguel Coimbra (miguel.e.coim...@gmail.com) wrote: > Dear community, > > I have a problem which I hope you'll be able to help with. > I apologize in advance for the verbosity of the post. > I am running the Flink standalone cluster (not even storing to the > filesystem) with 2 Docker containers. > > I set the image of the Dockerfile for Flink 1.1.2, which was the same > version of the main class in the .jar > The Docker image was configured to use Java 8, which is what the project's > pom.xml requires as well. > I have also edited the TaskManager conf/flink-con.yaml to have the > following values: > > > taskmanager.heap.mb: 7512 > > taskmanager.network.numberOfBuffers: 16048 > > > > Properties of this host/docker setup: > - host machine has *256 GB *of RAM > - job manager container is running with default flink config > - task manager has *7.5 GB *of memory available > - task manager number of buffers is *16048 *which is very generous compared > to the default value > > I am testing on the SNAP DBLP dataset: > https://snap.stanford.edu/data/com-DBLP.html > It has: > > 317080 nodes > 1049866 edges > > These are the relevant parts of the pom.xml of the project: > *(note: the project executes without error for local executions without the > cluster)* > > > > UTF-8 > > UTF-8 > 1.8 > 1.8 > 1.1.2 > > . > > > org.apache.flink > flink-java > ${flink.version} > > > org.apache.flink > flink-core > ${flink.version} > > > org.apache.flink > flink-streaming-java_2.10 > ${flink.version} > > > org.apache.flink > flink-clients_2.10 > ${flink.version} > > > org.apache.flink > flink-gelly_2.10 > ${flink.version} > > > junit > junit > 3.8.1 > test > > > > I am running (what I believe to be) a simple Gelly application, performing > the ConnectedComponents algorithm with 30 iterations: > > public static void main(String[] args) { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > > final String dataPath = args[0]; > > final DataSet> edgeTuples = > env.readCsvFile(dataPath) > .fieldDelimiter("\t") // node IDs are separated by spaces > .ignoreComments("#") // comments start with "%" > .types(Long.class, Long.class); > > try { > System.out.println("Tuple size: " + edgeTuples.count()); > } catch (Exception e1) { > e1.printStackTrace(); > } > > /* > * @param the key type for edge and vertex identifiers > * @param the value type for vertices > * @param the value type for edges > * public class Graph > */ > > > final Graph graph = Graph.fromTuple2DataSet( > edgeTuples, > new MapFunction() { > private static final long serialVersionUID = > 8713516577419451509L; > public Long map(Long value) { > return value; > } > }, > env > ); > > > try { > /** > * @param key type > * @param vertex value type > * @param edge value type > * @param the return type > > class ConnectedComponents, EV> > implements GraphAlgorithm>> > */ > > DataSet> verticesWithComponents = > graph.run(new ConnectedComponents(30)); > System.out.println("Component count: " + > verticesWithComponents.count()); > } catch (Exception e) { > e.printStackTrace(); > } > } > > > However, the following is output on the host machine on execution: > > docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) > flink run -m 3de7625b8e28:6123 -c flink.graph.example.App > /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar > /home/myuser/com-dblp.ungraph.txt > > Cluster configuration: Standalone cluster with JobManager at / > 172.19.0.2:6123 > Using address 172.19.0.2:6123 to connect to JobManager. > JobManager web interface address http://172.19.0.2:8081 > Starting execution of program > Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for > job completion. > Connected to JobManager at Actor[akka.tcp:// > flink@172.19.0.2:6123/user/jobmanager#-658812967] > > 11/08/2016 21:22:44 DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > SCHEDULED > 11/08/2016 21:22:44 DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > DEPLOYING > 11/08/2016 21:22:44 DataSource (at main(App.java:25) > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED > 11/08/2016
Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.
Hi Miguel, I tried to reproduce the problem in a similar setup using Flink in Docker with 2 workers: CONTAINER IDIMAGE COMMAND CREATED STATUS PORTS NAMES 3fbaf5876e31melentye/flink:latest "docker-entrypoint.sh" 3 minutes ago Up 3 minutes6121-6122/tcp flinkdocker_flink-worker_2 bd87bfa6c03dmelentye/flink:latest "docker-entrypoint.sh" 4 minutes ago Up 3 minutes6121-6122/tcp flinkdocker_flink-worker_1 16c7607b3ec2melentye/flink:latest "docker-entrypoint.sh" 4 minutes ago Up 4 minutes0.0.0.0:8081->8081/tcp, 6123/tcp, 0.0.0.0:9010->9010/tcp flinkdocker_flink-master_1 for i in $(docker ps --filter name=flink --format={{.ID}}); do docker cp ~/Downloads/com-dblp.ungraph.txt $i:/com-dblp.ungraph.txt done for i in $(docker ps --filter name=flink-master --format={{.ID}}); do docker cp ~/Dev/flink-playground/target/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar $i:/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar done docker exec -it $(docker ps --filter name=flink-master --format={{.ID}}) flink run -c com.github.melentye.flink.gelly.DblpConnectedComponents /flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar /com-dblp.ungraph.txt where flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar contains your code example. It seems to work better for me: Cluster configuration: Standalone cluster with JobManager at / 172.20.0.2:6123 Using address 172.20.0.2:6123 to connect to JobManager. JobManager web interface address http://172.20.0.2:8081 Starting execution of program Submitting job with JobID: 3146a443dd208c57f5b795767da0b720. Waiting for job completion. Connected to JobManager at Actor[akka.tcp:// flink@172.20.0.2:6123/user/jobmanager#73377916] 11/10/2016 20:10:26 Job execution switched to status RUNNING. 11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to SCHEDULED 11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to DEPLOYING 11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING 11/10/2016 20:10:26 DataSink (count())(1/1) switched to SCHEDULED 11/10/2016 20:10:26 DataSink (count())(1/1) switched to DEPLOYING 11/10/2016 20:10:26 DataSink (count())(1/1) switched to RUNNING 11/10/2016 20:10:27 DataSink (count())(1/1) switched to FINISHED 11/10/2016 20:10:27 DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to FINISHED 11/10/2016 20:10:27 Job execution switched to status FINISHED. Tuple size: 1049866 Submitting job with JobID: 76040d6474efa39a8f0a03f420e8746c. Waiting for job completion. Connected to JobManager at Actor[akka.tcp:// flink@172.20.0.2:6123/user/jobmanager#73377916] 11/10/2016 20:10:27 Job execution switched to status RUNNING. 11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING 11/10/2016 20:10:27 CHAIN DataSource (at main(DblpConnectedComponents.java:23) (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED 11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED 11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED 11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING 11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING 11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216)) -> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING 11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING 11/10/2016 20:10:27 CoGroup (Messaging)(1/1) switched to SCHEDULED 11/10/2016 20:10:27 CoGroup (Messaging)(1/1) switched to DEPLOYING