Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.

2016-11-15 Thread Vasiliki Kalavri
Hi Miguel,

I'm sorry for the late reply; this e-mail got stuck in my spam folder. I'm
glad that you've found a solution :)

I've never used flink with docker, so I'm probably not the best person to
advise you on this. However, if I understand correctly, you're changing the
configuration before submitting the job but while the flink cluster is
already running. I don't know if docker is supposed to do something
differently, but after a flink cluster has been started, nodes won't reload
any changes you make to the flink-conf.yaml. You'll either have to make
your changes before starting the cluster or re-start.

Cheers,
-Vasia.

On 14 November 2016 at 18:33, Miguel Coimbra 
wrote:

> Hello,
>
> I believe I have figured this out.
>
> First, I tried Aandrey Melentyev's suggestion of executing with Apache
> Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as
> with some changes to provide additional memory. However, the same error
> happened.
>
> Note: I changed my project's pom.xml and generated the .jar again using
> Maven.
> I also copied the new .jar to both Docker instances.
>
> The test machine has 256 GB RAM and it is a scenario of two Docker
> containers.
> I send attached the relevant parts of the logs of the JobManager and of
> the TaskManager.
> Regarding memory in the TaskManager log, I was looking at a couple of
> executions and noticed something strange:
>
> 2016-11-14 15:48:45,256 INFO  org.apache.flink.runtime.io.ne
> twork.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool
> (number of memory segments: 2048, bytes per segment: 32768).
> 2016-11-14 15:48:45,413 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  - Limiting managed memory to 0.7 of the
> currently free heap space (310 MB), memory will be allocated lazily.
>
> After that, I looked at the start of the TaskManager log and found this:
>
> 2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  Starting TaskManager (Version: 1.1.3,
> Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC)
> 2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  Current user: flink
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  JVM: OpenJDK 64-Bit Server VM - Oracle
> Corporation - 1.8/25.92-b14
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  Maximum heap size: 512 MiBytes
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  JAVA_HOME:
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  Hadoop version: 2.7.2
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  JVM Options:
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  - -XX:+UseG1GC
>
>
> *2016-11-14 15:48:38,850 INFO
> org.apache.flink.runtime.taskmanager.TaskManager  -
> -Xms512M2016-11-14 15:48:38,850 INFO
> org.apache.flink.runtime.taskmanager.TaskManager  -
> -Xmx512M*2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  - -XX:MaxDirectMemorySize=8388607T
>
> It seems it is running with only 512 MB, which is the default.
> This in spite of me having edited the flink-conf.yaml file before
> invoking the program for the cluster.
> I looked at the log of the JobManager and the same thing happened: it was
> using the default 256 MB instead of my 1024MB.
>
> - To recap, I built the Docker Flink image with (I send the Dockerfile
> attached):
>
> cd docker-flink-image-builder/
> ls
> Dockerfile  Dockerfile~  README.md  README.md~
> bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml
> ./build.sh
>
> The only file I changed from those is the Dockerfile.
> This set of files was obtained from the Flink repository.
> I used docker-compose up to start the standalone cluster:
>
> screen
> cd docker-flink-image-builder/
> ls
> Dockerfile  Dockerfile~  README.md  README.md~
> bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml
> docker-compose.yml  docker-entrypoint.sh*
> docker-compose up
>
> Then I accessed each Docker instance:
>
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> /bin/sh
> docker exec -it $(docker ps --filter 
> name=dockerflinkimagebuilder_taskmanager_1
> --format={{.ID}}) /bin/sh
>
> While inside each of those, I started a bash shell and changed the config
> file like so:
>
> bash
> cd /home/myuser/docker-image-build-context/flink-1.1.3/conf
> vi flink-conf.yaml
>
> I have edited (on both the JobManager and the TaskManager) the following
> settings:
>
> # The heap size for the JobManager JVM
> jobmanager.heap.mb: 1024
>
> # The heap size for the TaskManager JVM
> taskmanager.heap.mb: 

Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.

2016-11-14 Thread Ufuk Celebi
What do the TaskManager logs say wrt to allocation of managed memory?

Something like:

Limiting managed memory to ... of the currently free heap space ..., memory 
will be allocated lazily.

What else did you configure in flink-conf?

Looping in Greg and Vasia who maintain Gelly and are most-familiar with the 
internals.

– Ufuk


On 8 November 2016 at 22:35:22, Miguel Coimbra (miguel.e.coim...@gmail.com) 
wrote:
> Dear community,
>  
> I have a problem which I hope you'll be able to help with.
> I apologize in advance for the verbosity of the post.
> I am running the Flink standalone cluster (not even storing to the
> filesystem) with 2 Docker containers.
>  
> I set the image of the Dockerfile for Flink 1.1.2, which was the same
> version of the main class in the .jar
> The Docker image was configured to use Java 8, which is what the project's
> pom.xml requires as well.
> I have also edited the TaskManager conf/flink-con.yaml to have the
> following values:
>  
> 
> taskmanager.heap.mb: 7512
> 
> taskmanager.network.numberOfBuffers: 16048
> 
>  
>  
> Properties of this host/docker setup:
> - host machine has *256 GB *of RAM
> - job manager container is running with default flink config
> - task manager has *7.5 GB *of memory available
> - task manager number of buffers is *16048 *which is very generous compared
> to the default value
>  
> I am testing on the SNAP DBLP dataset:
> https://snap.stanford.edu/data/com-DBLP.html
> It has:
>  
> 317080 nodes
> 1049866 edges
>  
> These are the relevant parts of the pom.xml of the project:
> *(note: the project executes without error for local executions without the
> cluster)*
>  
> 
>  
> UTF-8
>  
> UTF-8  
> 1.8
> 1.8
> 1.1.2
>  
> .
>  
>  
> org.apache.flink
> flink-java
> ${flink.version}
>  
>  
> org.apache.flink
> flink-core
> ${flink.version}
>  
>  
> org.apache.flink
> flink-streaming-java_2.10
> ${flink.version}
>  
>  
> org.apache.flink
> flink-clients_2.10
> ${flink.version}
>  
>  
> org.apache.flink
> flink-gelly_2.10
> ${flink.version}
>  
>  
> junit
> junit
> 3.8.1
> test
>  
>  
>  
> I am running (what I believe to be) a simple Gelly application, performing
> the ConnectedComponents algorithm with 30 iterations:
>  
> public static void main(String[] args) {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>  
>  
> final String dataPath = args[0];
>  
> final DataSet> edgeTuples =
> env.readCsvFile(dataPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#") // comments start with "%"
> .types(Long.class, Long.class);
>  
> try {
> System.out.println("Tuple size: " + edgeTuples.count());
> } catch (Exception e1) {
> e1.printStackTrace();
> }
>  
> /*
> * @param the key type for edge and vertex identifiers
> * @param the value type for vertices
> * @param the value type for edges
> * public class Graph
> */
>  
>  
> final Graph graph = Graph.fromTuple2DataSet(
> edgeTuples,
> new MapFunction() {
> private static final long serialVersionUID =
> 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env
> );
>  
>  
> try {
> /**
> * @param key type
> * @param vertex value type
> * @param edge value type
> * @param the return type
>  
> class ConnectedComponents, EV>
> implements GraphAlgorithm>>
> */
>  
> DataSet> verticesWithComponents =
> graph.run(new ConnectedComponents(30));
> System.out.println("Component count: " +
> verticesWithComponents.count());
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>  
>  
> However, the following is output on the host machine on execution:
>  
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
> /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
> /home/myuser/com-dblp.ungraph.txt
>  
> Cluster configuration: Standalone cluster with JobManager at /
> 172.19.0.2:6123
> Using address 172.19.0.2:6123 to connect to JobManager.
> JobManager web interface address http://172.19.0.2:8081
> Starting execution of program
> Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://
> flink@172.19.0.2:6123/user/jobmanager#-658812967]
>  
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> SCHEDULED
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
> DEPLOYING
> 11/08/2016 21:22:44 DataSource (at main(App.java:25)
> (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING  
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING
> 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED
> 11/08/2016 

Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.

2016-11-10 Thread Andrey Melentyev
Hi Miguel,

I tried to reproduce the problem in a similar setup using Flink in Docker
with 2 workers:

CONTAINER IDIMAGE   COMMAND
 CREATED STATUS  PORTS
 NAMES
3fbaf5876e31melentye/flink:latest   "docker-entrypoint.sh"   3
minutes ago   Up 3 minutes6121-6122/tcp
 flinkdocker_flink-worker_2
bd87bfa6c03dmelentye/flink:latest   "docker-entrypoint.sh"   4
minutes ago   Up 3 minutes6121-6122/tcp
 flinkdocker_flink-worker_1
16c7607b3ec2melentye/flink:latest   "docker-entrypoint.sh"   4
minutes ago   Up 4 minutes0.0.0.0:8081->8081/tcp, 6123/tcp,
0.0.0.0:9010->9010/tcp   flinkdocker_flink-master_1

for i in $(docker ps --filter name=flink --format={{.ID}}); do
  docker cp ~/Downloads/com-dblp.ungraph.txt $i:/com-dblp.ungraph.txt
done

for i in $(docker ps --filter name=flink-master --format={{.ID}}); do
  docker cp
~/Dev/flink-playground/target/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar
$i:/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar
done

docker exec -it $(docker ps --filter name=flink-master --format={{.ID}})
flink run -c com.github.melentye.flink.gelly.DblpConnectedComponents
/flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar /com-dblp.ungraph.txt

where flink-playground-1.0-SNAPSHOT-ConnectedComponents.jar contains your
code example. It seems to work better for me:

Cluster configuration: Standalone cluster with JobManager at /
172.20.0.2:6123
Using address 172.20.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.20.0.2:8081
Starting execution of program
Submitting job with JobID: 3146a443dd208c57f5b795767da0b720. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://
flink@172.20.0.2:6123/user/jobmanager#73377916]
11/10/2016 20:10:26 Job execution switched to status RUNNING.
11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
SCHEDULED
11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
DEPLOYING
11/10/2016 20:10:26 DataSource (at main(DblpConnectedComponents.java:23)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/10/2016 20:10:26 DataSink (count())(1/1) switched to SCHEDULED
11/10/2016 20:10:26 DataSink (count())(1/1) switched to DEPLOYING
11/10/2016 20:10:26 DataSink (count())(1/1) switched to RUNNING
11/10/2016 20:10:27 DataSink (count())(1/1) switched to FINISHED
11/10/2016 20:10:27 DataSource (at main(DblpConnectedComponents.java:23)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
FINISHED
11/10/2016 20:10:27 Job execution switched to status FINISHED.
Tuple size: 1049866
Submitting job with JobID: 76040d6474efa39a8f0a03f420e8746c. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://
flink@172.20.0.2:6123/user/jobmanager#73377916]
11/10/2016 20:10:27 Job execution switched to status RUNNING.
11/10/2016 20:10:27 CHAIN DataSource (at
main(DblpConnectedComponents.java:23)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED
11/10/2016 20:10:27 CHAIN DataSource (at
main(DblpConnectedComponents.java:23)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING
11/10/2016 20:10:27 CHAIN DataSource (at
main(DblpConnectedComponents.java:23)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING
11/10/2016 20:10:27 CHAIN DataSource (at
main(DblpConnectedComponents.java:23)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at
fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED
11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap
(FlatMap at getUndirected(Graph.java:926))(1/1) switched to SCHEDULED
11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216))
-> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to
SCHEDULED
11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216))
-> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to
DEPLOYING
11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap
(FlatMap at getUndirected(Graph.java:926))(1/1) switched to DEPLOYING
11/10/2016 20:10:27 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:216))
-> Combine(Distinct at fromDataSet(Graph.java:216))(1/1) switched to RUNNING
11/10/2016 20:10:27 CHAIN Map (Map at mapEdges(Graph.java:596)) -> FlatMap
(FlatMap at getUndirected(Graph.java:926))(1/1) switched to RUNNING
11/10/2016 20:10:27 CoGroup (Messaging)(1/1) switched to SCHEDULED
11/10/2016 20:10:27 CoGroup (Messaging)(1/1) switched to DEPLOYING