Re: Basic questions about Giraph internals
On Fri, Feb 7, 2014 at 2:30 PM, Claudio Martella claudio.marte...@gmail.com wrote: On Fri, Feb 7, 2014 at 9:44 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Thank you, I will try to do this. As I understood I should set number of threads manually through Giraph API. BTW, what is conceptual difference between running multiple workers on the TaskTracker and running single worker and multiple threads? In terms of vertex fetching, memory sharing etc. Basically, better usage of resources: one single JVM, no duplication of core data structures, less netty threads and communication points, more locality (less messages over the network), less actors accessing zookeeper etc. Also I would like to ask how message transfer between vertices is implemented in terms of Hadoop primitives? Source code reference will be enough. Communication does not happen via Hadoop primitives, but ad-hoc via netty. Ok. It seams that Hadoop has minimalistic influence on Giraph application execution after graph is loaded into memory (that is mapping is done).
Re: Basic questions about Giraph internals
Yes, Giraph hijacks mapper tasks, and then does everything else on its own. On Fri, Feb 7, 2014 at 12:39 PM, Alexander Frolov alexndr.fro...@gmail.comwrote: On Fri, Feb 7, 2014 at 2:30 PM, Claudio Martella claudio.marte...@gmail.com wrote: On Fri, Feb 7, 2014 at 9:44 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Thank you, I will try to do this. As I understood I should set number of threads manually through Giraph API. BTW, what is conceptual difference between running multiple workers on the TaskTracker and running single worker and multiple threads? In terms of vertex fetching, memory sharing etc. Basically, better usage of resources: one single JVM, no duplication of core data structures, less netty threads and communication points, more locality (less messages over the network), less actors accessing zookeeper etc. Also I would like to ask how message transfer between vertices is implemented in terms of Hadoop primitives? Source code reference will be enough. Communication does not happen via Hadoop primitives, but ad-hoc via netty. Ok. It seams that Hadoop has minimalistic influence on Giraph application execution after graph is loaded into memory (that is mapping is done). -- Claudio Martella
Re: Basic questions about Giraph internals
I tried the setup with one multithreaded worker per machine for the first time a few minutes ago on a cluster of 25 machines, and my job (closeness centrality estimation on a billion edge graph) ran twice as fast! On 02/07/2014 12:21 PM, Claudio Martella wrote: Yes, I think this is the best setup if you have control over your cluster. And yes, I have already tried that. On Fri, Feb 7, 2014 at 11:39 AM, Sundara Raghavan Sankaran sun...@crayondata.com wrote: On Fri, Feb 7, 2014 at 4:00 PM, Claudio Martella claudio.marte...@gmail.com wrote: On Fri, Feb 7, 2014 at 9:44 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Thank you, I will try to do this. As I understood I should set number of threads manually through Giraph API. BTW, what is conceptual difference between running multiple workers on the TaskTracker and running single worker and multiple threads? In terms of vertex fetching, memory sharing etc. Basically, better usage of resources: one single JVM, no duplication of core data structures, less netty threads and communication points, more locality (less messages over the network), less actors accessing zookeeper etc. So, is it better to have one worker per machine with the number of threads as per the core of the machines? Suppose if I have 8 machines with 6 cores each, then instead of running 47 Workers (1 thread per Worker) + 1 Master, it's better to run 8 Workers (6 threads per Worker) + 1 Master? Have you tried this already? Also I would like to ask how message transfer between vertices is implemented in terms of Hadoop primitives? Source code reference will be enough. Communication does not happen via Hadoop primitives, but ad-hoc via netty. -- Claudio Martella -- *Sundara Raghavan Sankaran* -- http://crayondata.com/?utm_source=emailsig https://www.facebook.com/crayondatahttps://twitter.com/CrayonBigDatahttp://www.linkedin.com/company/crayon-datahttps://plus.google.com/+Crayondata1http://www.youtube.com/user/crayonbigdata www.crayondata.com http://crayondata.com/?utm_source=emailsig http://bigdata-madesimple.com/?utm_source=emailsig www.bigdata-madesimple.comhttp://bigdata-madesimple.com/?utm_source=emailsig -- Finalisthttp://www.code-n.org/fileadmin/user_upload/pdf/131210_List_Top_50_EN.pdf at the Code_N 2014 Contest http://www.code-n.org/cebit/award/ at CEBIThttp://www.cebit.com/, Hanover - the only big data company from Asia. This email and its contents are confidential, and meant only for you. Views or opinions, presented in this email, are solely of the author and may not necessarily represent Crayon Data.
Re: Basic questions about Giraph internals
Hi Alex, answers are inline. On Thu, Feb 6, 2014 at 11:22 AM, Alexander Frolov alexndr.fro...@gmail.comwrote: Hi, folks! I have started small research of Giraph framework and I have not much experience with Giraph and Hadoop :-(. I would like to ask several questions about how things are working in Giraph which are not straightforward for me. I am trying to use the sources but sometimes it is not too easy ;-) So here they are: 1) How Workers are assigned to TaskTrackers? Each worker is a mapper, and mapper tasks are assigned to tasktrackers by the jobtracker. There's no control by Giraph there, and because Giraph doesn't need data-locality like Mapreduce does, basically nothing is done. 2) How vertices are assigned to Workers? Does it depend on distribution of input file on DataNodes? Is there available any choice of distribution politics or no? In the default scheme, vertices are assigned through modulo hash partitioning. Given k workers, vertex v is assigned to worker i according to hash(v) % k = i. 3) How Workers and Map tasks are related to each other? (1:1)? (n:1)? (1:n)? It's 1:1. Each worker is implemented by a mapper task. The master is usually (but does not need to) implemented by an additional mapper. 4) Can Workers migrate from one TaskTracker to the other? Workers does not migrate. A Giraph computation is not dynamic wrt to assignment and size of the tasks. 5) What is the best way to monitor Giraph app execution (progress, worker assignment, load balancing etc.)? Just like you would for a standard Mapreduce job. Go to the job page on the jobtracker http page. I think this is all for the moment. Thank you. Testbed description: Hardware: 8 node dual-CPU cluster with IB FDR. Giraph: release-1.0.0-RC2-152-g585511f Hadoop: hadoop-0.20.203.0, hadoop-rdma-0.9.8 Best, Alex -- Claudio Martella
Re: Basic questions about Giraph internals
On Thu, Feb 6, 2014 at 11:56 AM, Alexander Frolov alexndr.fro...@gmail.comwrote: Hi Claudio, thank you. If I understood correctly, mapper and mapper task is the same thing. More or less. A mapper is a functional element of the programming model, while the mapper task is the task that executes the mapper function on the records. On Thu, Feb 6, 2014 at 2:28 PM, Claudio Martella claudio.marte...@gmail.com wrote: Hi Alex, answers are inline. On Thu, Feb 6, 2014 at 11:22 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi, folks! I have started small research of Giraph framework and I have not much experience with Giraph and Hadoop :-(. I would like to ask several questions about how things are working in Giraph which are not straightforward for me. I am trying to use the sources but sometimes it is not too easy ;-) So here they are: 1) How Workers are assigned to TaskTrackers? Each worker is a mapper, and mapper tasks are assigned to tasktrackers by the jobtracker. That is each Worker is created at the beginning of superstep and then dies. In the next superstep all Workers are created again. Is it correct? Nope. The workers are created at the beginning of the computation, and destroyed at the end of the computation. A computation is persistent throughout the computation. There's no control by Giraph there, and because Giraph doesn't need data-locality like Mapreduce does, basically nothing is done. This is important for me. So Giraph Worker (a.k.a Hadoop mapper) fetches vertex with corresponding index from the HDFS and perform computation. What does it do next with it? As I understood Giraph is fully in-memory framework and in the next superstep this vertex should be fetched from the memory by the same Worker. Where the vertices are stored between supersteps? In HDFS or in memory? As I said, the workers are persistent (in-memory) between supersteps, so they keep everything in memory. 2) How vertices are assigned to Workers? Does it depend on distribution of input file on DataNodes? Is there available any choice of distribution politics or no? In the default scheme, vertices are assigned through modulo hash partitioning. Given k workers, vertex v is assigned to worker i according to hash(v) % k = i. 3) How Workers and Map tasks are related to each other? (1:1)? (n:1)? (1:n)? It's 1:1. Each worker is implemented by a mapper task. The master is usually (but does not need to) implemented by an additional mapper . 4) Can Workers migrate from one TaskTracker to the other? Workers does not migrate. A Giraph computation is not dynamic wrt to assignment and size of the tasks. 5) What is the best way to monitor Giraph app execution (progress, worker assignment, load balancing etc.)? Just like you would for a standard Mapreduce job. Go to the job page on the jobtracker http page. I think this is all for the moment. Thank you. Testbed description: Hardware: 8 node dual-CPU cluster with IB FDR. Giraph: release-1.0.0-RC2-152-g585511f Hadoop: hadoop-0.20.203.0, hadoop-rdma-0.9.8 Best, Alex -- Claudio Martella -- Claudio Martella
Re: Basic questions about Giraph internals
On Thu, Feb 6, 2014 at 3:00 PM, Claudio Martella claudio.marte...@gmail.com wrote: On Thu, Feb 6, 2014 at 11:56 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi Claudio, thank you. If I understood correctly, mapper and mapper task is the same thing. More or less. A mapper is a functional element of the programming model, while the mapper task is the task that executes the mapper function on the records. Ok, I see. Then mapred.tasktracker.map.tasks.maximum is a maximum number of Workers [or Workers + Master] which will be created at the same node. That is if I have 8 node cluster with mapred.tasktracker.map.tasks.maximum=4, then I can run up to 31 Workers + 1 Master. Is it correct? On Thu, Feb 6, 2014 at 2:28 PM, Claudio Martella claudio.marte...@gmail.com wrote: Hi Alex, answers are inline. On Thu, Feb 6, 2014 at 11:22 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi, folks! I have started small research of Giraph framework and I have not much experience with Giraph and Hadoop :-(. I would like to ask several questions about how things are working in Giraph which are not straightforward for me. I am trying to use the sources but sometimes it is not too easy ;-) So here they are: 1) How Workers are assigned to TaskTrackers? Each worker is a mapper, and mapper tasks are assigned to tasktrackers by the jobtracker. That is each Worker is created at the beginning of superstep and then dies. In the next superstep all Workers are created again. Is it correct? Nope. The workers are created at the beginning of the computation, and destroyed at the end of the computation. A computation is persistent throughout the computation. There's no control by Giraph there, and because Giraph doesn't need data-locality like Mapreduce does, basically nothing is done. This is important for me. So Giraph Worker (a.k.a Hadoop mapper) fetches vertex with corresponding index from the HDFS and perform computation. What does it do next with it? As I understood Giraph is fully in-memory framework and in the next superstep this vertex should be fetched from the memory by the same Worker. Where the vertices are stored between supersteps? In HDFS or in memory? As I said, the workers are persistent (in-memory) between supersteps, so they keep everything in memory. Ok. Is there any means to see assignment of Workers to TaskTrackers during or after the computation? And is there any means to see assignment of vertices to Workers (as distribution function, histogram etc.)? 2) How vertices are assigned to Workers? Does it depend on distribution of input file on DataNodes? Is there available any choice of distribution politics or no? In the default scheme, vertices are assigned through modulo hash partitioning. Given k workers, vertex v is assigned to worker i according to hash(v) % k = i. 3) How Workers and Map tasks are related to each other? (1:1)? (n:1)? (1:n)? It's 1:1. Each worker is implemented by a mapper task. The master is usually (but does not need to) implemented by an additional mapper . 4) Can Workers migrate from one TaskTracker to the other? Workers does not migrate. A Giraph computation is not dynamic wrt to assignment and size of the tasks. 5) What is the best way to monitor Giraph app execution (progress, worker assignment, load balancing etc.)? Just like you would for a standard Mapreduce job. Go to the job page on the jobtracker http page. I think this is all for the moment. Thank you. Testbed description: Hardware: 8 node dual-CPU cluster with IB FDR. Giraph: release-1.0.0-RC2-152-g585511f Hadoop: hadoop-0.20.203.0, hadoop-rdma-0.9.8 Best, Alex -- Claudio Martella -- Claudio Martella
Re: Basic questions about Giraph internals
Yes, this is correct. On 02/06/2014 12:15 PM, Alexander Frolov wrote: On Thu, Feb 6, 2014 at 3:00 PM, Claudio Martella claudio.marte...@gmail.com wrote: On Thu, Feb 6, 2014 at 11:56 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi Claudio, thank you. If I understood correctly, mapper and mapper task is the same thing. More or less. A mapper is a functional element of the programming model, while the mapper task is the task that executes the mapper function on the records. Ok, I see. Then mapred.tasktracker.map.tasks.maximum is a maximum number of Workers [or Workers + Master] which will be created at the same node. That is if I have 8 node cluster with mapred.tasktracker.map.tasks.maximum=4, then I can run up to 31 Workers + 1 Master. Is it correct? On Thu, Feb 6, 2014 at 2:28 PM, Claudio Martella claudio.marte...@gmail.com wrote: Hi Alex, answers are inline. On Thu, Feb 6, 2014 at 11:22 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi, folks! I have started small research of Giraph framework and I have not much experience with Giraph and Hadoop :-(. I would like to ask several questions about how things are working in Giraph which are not straightforward for me. I am trying to use the sources but sometimes it is not too easy ;-) So here they are: 1) How Workers are assigned to TaskTrackers? Each worker is a mapper, and mapper tasks are assigned to tasktrackers by the jobtracker. That is each Worker is created at the beginning of superstep and then dies. In the next superstep all Workers are created again. Is it correct? Nope. The workers are created at the beginning of the computation, and destroyed at the end of the computation. A computation is persistent throughout the computation. There's no control by Giraph there, and because Giraph doesn't need data-locality like Mapreduce does, basically nothing is done. This is important for me. So Giraph Worker (a.k.a Hadoop mapper) fetches vertex with corresponding index from the HDFS and perform computation. What does it do next with it? As I understood Giraph is fully in-memory framework and in the next superstep this vertex should be fetched from the memory by the same Worker. Where the vertices are stored between supersteps? In HDFS or in memory? As I said, the workers are persistent (in-memory) between supersteps, so they keep everything in memory. Ok. Is there any means to see assignment of Workers to TaskTrackers during or after the computation? And is there any means to see assignment of vertices to Workers (as distribution function, histogram etc.)? 2) How vertices are assigned to Workers? Does it depend on distribution of input file on DataNodes? Is there available any choice of distribution politics or no? In the default scheme, vertices are assigned through modulo hash partitioning. Given k workers, vertex v is assigned to worker i according to hash(v) % k = i. 3) How Workers and Map tasks are related to each other? (1:1)? (n:1)? (1:n)? It's 1:1. Each worker is implemented by a mapper task. The master is usually (but does not need to) implemented by an additional mapper . 4) Can Workers migrate from one TaskTracker to the other? Workers does not migrate. A Giraph computation is not dynamic wrt to assignment and size of the tasks. 5) What is the best way to monitor Giraph app execution (progress, worker assignment, load balancing etc.)? Just like you would for a standard Mapreduce job. Go to the job page on the jobtracker http page. I think this is all for the moment. Thank you. Testbed description: Hardware: 8 node dual-CPU cluster with IB FDR. Giraph: release-1.0.0-RC2-152-g585511f Hadoop: hadoop-0.20.203.0, hadoop-rdma-0.9.8 Best, Alex -- Claudio Martella -- Claudio Martella
Re: Basic questions about Giraph internals
On Thu, Feb 6, 2014 at 12:15 PM, Alexander Frolov alexndr.fro...@gmail.comwrote: On Thu, Feb 6, 2014 at 3:00 PM, Claudio Martella claudio.marte...@gmail.com wrote: On Thu, Feb 6, 2014 at 11:56 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi Claudio, thank you. If I understood correctly, mapper and mapper task is the same thing. More or less. A mapper is a functional element of the programming model, while the mapper task is the task that executes the mapper function on the records. Ok, I see. Then mapred.tasktracker.map.tasks.maximum is a maximum number of Workers [or Workers + Master] which will be created at the same node. That is if I have 8 node cluster with mapred.tasktracker.map.tasks.maximum=4, then I can run up to 31 Workers + 1 Master. Is it correct? That is correct. However, if you have total control over your cluster, you may want to run one worker per node (hence setting the max number of map tasks per machine to 1), and use multiple threads (input, compute, output). This is going to make better use of resources. On Thu, Feb 6, 2014 at 2:28 PM, Claudio Martella claudio.marte...@gmail.com wrote: Hi Alex, answers are inline. On Thu, Feb 6, 2014 at 11:22 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi, folks! I have started small research of Giraph framework and I have not much experience with Giraph and Hadoop :-(. I would like to ask several questions about how things are working in Giraph which are not straightforward for me. I am trying to use the sources but sometimes it is not too easy ;-) So here they are: 1) How Workers are assigned to TaskTrackers? Each worker is a mapper, and mapper tasks are assigned to tasktrackers by the jobtracker. That is each Worker is created at the beginning of superstep and then dies. In the next superstep all Workers are created again. Is it correct? Nope. The workers are created at the beginning of the computation, and destroyed at the end of the computation. A computation is persistent throughout the computation. There's no control by Giraph there, and because Giraph doesn't need data-locality like Mapreduce does, basically nothing is done. This is important for me. So Giraph Worker (a.k.a Hadoop mapper) fetches vertex with corresponding index from the HDFS and perform computation. What does it do next with it? As I understood Giraph is fully in-memory framework and in the next superstep this vertex should be fetched from the memory by the same Worker. Where the vertices are stored between supersteps? In HDFS or in memory? As I said, the workers are persistent (in-memory) between supersteps, so they keep everything in memory. Ok. Is there any means to see assignment of Workers to TaskTrackers during or after the computation? The jobtracker http interface will show you the mapper running, hence i'd check there And is there any means to see assignment of vertices to Workers (as distribution function, histogram etc.)? You can check the worker logs, I think the information should be there. 2) How vertices are assigned to Workers? Does it depend on distribution of input file on DataNodes? Is there available any choice of distribution politics or no? In the default scheme, vertices are assigned through modulo hash partitioning. Given k workers, vertex v is assigned to worker i according to hash(v) % k = i. 3) How Workers and Map tasks are related to each other? (1:1)? (n:1)? (1:n)? It's 1:1. Each worker is implemented by a mapper task. The master is usually (but does not need to) implemented by an additional mapper . 4) Can Workers migrate from one TaskTracker to the other? Workers does not migrate. A Giraph computation is not dynamic wrt to assignment and size of the tasks. 5) What is the best way to monitor Giraph app execution (progress, worker assignment, load balancing etc.)? Just like you would for a standard Mapreduce job. Go to the job page on the jobtracker http page. I think this is all for the moment. Thank you. Testbed description: Hardware: 8 node dual-CPU cluster with IB FDR. Giraph: release-1.0.0-RC2-152-g585511f Hadoop: hadoop-0.20.203.0, hadoop-rdma-0.9.8 Best, Alex -- Claudio Martella -- Claudio Martella -- Claudio Martella
Re: Basic questions about Giraph internals
On Thu, Feb 6, 2014 at 3:04 PM, Alexander Frolov alexndr.fro...@gmail.comwrote: Claudio, thank you very much for your help. On Thu, Feb 6, 2014 at 4:06 PM, Claudio Martella claudio.marte...@gmail.com wrote: On Thu, Feb 6, 2014 at 12:15 PM, Alexander Frolov alexndr.fro...@gmail.com wrote: On Thu, Feb 6, 2014 at 3:00 PM, Claudio Martella claudio.marte...@gmail.com wrote: On Thu, Feb 6, 2014 at 11:56 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi Claudio, thank you. If I understood correctly, mapper and mapper task is the same thing. More or less. A mapper is a functional element of the programming model, while the mapper task is the task that executes the mapper function on the records. Ok, I see. Then mapred.tasktracker.map.tasks.maximum is a maximum number of Workers [or Workers + Master] which will be created at the same node. That is if I have 8 node cluster with mapred.tasktracker.map.tasks.maximum=4, then I can run up to 31 Workers + 1 Master. Is it correct? That is correct. However, if you have total control over your cluster, you may want to run one worker per node (hence setting the max number of map tasks per machine to 1), and use multiple threads (input, compute, output). This is going to make better use of resources. Should I explicitly force Giraph to use multiple threads for input, compute, output? Only three threads, I suppose? But I have 12 cores available in each node (24 if HT is enabled). You're right, I was not clear. I suggest you use N threads for each of those three classes, where N is something close to the number of processing units (e.g. cores) you have available on each machine. Consider that Giraph has a number of other threads running in the background, for example to handle communication etc. I suggest you try different setups through benchmarking. On Thu, Feb 6, 2014 at 2:28 PM, Claudio Martella claudio.marte...@gmail.com wrote: Hi Alex, answers are inline. On Thu, Feb 6, 2014 at 11:22 AM, Alexander Frolov alexndr.fro...@gmail.com wrote: Hi, folks! I have started small research of Giraph framework and I have not much experience with Giraph and Hadoop :-(. I would like to ask several questions about how things are working in Giraph which are not straightforward for me. I am trying to use the sources but sometimes it is not too easy ;-) So here they are: 1) How Workers are assigned to TaskTrackers? Each worker is a mapper, and mapper tasks are assigned to tasktrackers by the jobtracker. That is each Worker is created at the beginning of superstep and then dies. In the next superstep all Workers are created again. Is it correct? Nope. The workers are created at the beginning of the computation, and destroyed at the end of the computation. A computation is persistent throughout the computation. There's no control by Giraph there, and because Giraph doesn't need data-locality like Mapreduce does, basically nothing is done. This is important for me. So Giraph Worker (a.k.a Hadoop mapper) fetches vertex with corresponding index from the HDFS and perform computation. What does it do next with it? As I understood Giraph is fully in-memory framework and in the next superstep this vertex should be fetched from the memory by the same Worker. Where the vertices are stored between supersteps? In HDFS or in memory? As I said, the workers are persistent (in-memory) between supersteps, so they keep everything in memory. Ok. Is there any means to see assignment of Workers to TaskTrackers during or after the computation? The jobtracker http interface will show you the mapper running, hence i'd check there And is there any means to see assignment of vertices to Workers (as distribution function, histogram etc.)? You can check the worker logs, I think the information should be there. 2) How vertices are assigned to Workers? Does it depend on distribution of input file on DataNodes? Is there available any choice of distribution politics or no? In the default scheme, vertices are assigned through modulo hash partitioning. Given k workers, vertex v is assigned to worker i according to hash(v) % k = i. 3) How Workers and Map tasks are related to each other? (1:1)? (n:1)? (1:n)? It's 1:1. Each worker is implemented by a mapper task. The master is usually (but does not need to) implemented by an additional mapper . 4) Can Workers migrate from one TaskTracker to the other? Workers does not migrate. A Giraph computation is not dynamic wrt to assignment and size of the tasks. 5) What is the best way to monitor Giraph app execution (progress, worker assignment, load balancing etc.)? Just like you would for a standard Mapreduce job. Go to the job page on the jobtracker http page. I think this is all for the moment. Thank you. Testbed description: Hardware: 8 node dual-CPU cluster with IB FDR.