[ https://issues.apache.org/jira/browse/GIRAPH-127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eugene Koontz updated GIRAPH-127: --------------------------------- Fix Version/s: 0.2.0 > Extending the API with a master.compute() function. > --------------------------------------------------- > > Key: GIRAPH-127 > URL: https://issues.apache.org/jira/browse/GIRAPH-127 > Project: Giraph > Issue Type: New Feature > Components: bsp, examples, graph > Reporter: Semih Salihoglu > Assignee: Semih Salihoglu > Fix For: 0.2.0 > > Attachments: GIRAPH-127-v1.patch > > > First of all, sorry for the long explanation to this feature. > I want to expand the API of Giraph with a new function called > master.compute(), that would get called at the master before each superstep > and I will try to explain the purpose that it would serve with an example. > Let's say we want to implement the following simplified version of the > k-means clustering algorithm. Pseudocode below: > * Input G(V, E), k, numEdgesThreshold, maxIterations > * Algorithm: > * int numEdgesCrossingClusters = Integer.MAX_INT; > * int iterationNo = 0; > * while ((numEdgesCrossingCluster > numEdgesThreshold) && iterationNo < > maxIterations) { > * iterationNo++; > * int[] clusterCenters = pickKClusterCenters(k, G); > * findClusterCenters(G, clusterCenters); > * numEdgesCrossingClusters = countNumEdgesCrossingClusters(); > * } > The algorithm goes through the following steps in iterations: > 1) Pick k random initial cluster centers > 2) Assign each vertex to the cluster center that it's closest to (in Giraph, > this can be implemented in message passing similar to how ShortestPaths is > implemented): > 3) Count the nuimber of edges crossing clusters > 4) Go back to step 1, if there are a lot of edges crossing clusters and we > haven't exceeded maximum number of iterations yet. > In an algorithm like this, step 2 and 3 are where most of the work happens > and both parts have very neat message-passing implementations. I'll try to > give an overview without going into the details. Let's say we define a Vertex > in Giraph to hold a custom Writable object that holds 2 integer values and > sends a message with upto 2 integer values. > Step 2 is very similar to ShortestPaths algorithm and has two stages: In the > first stage, each vertex checks to see whether or not it's one of the cluster > centers. If so, it assigns itself the value (id, 0), otherwise it assigns > itself (Null, Null). In the 2nd stage, the vertices assign themselves to the > minimum distance cluster center by looking at their neighbors (cluster > centers, distance) values (received as 2 integer messages) and their current > values, and changing their values if they find a lower distance cluster > center. This happens in x number of supersteps until every vertex converges. > Step 3, counting the number of edges crossing clusters, is also very easy to > implement in Giraph. Once each vertex has a cluster center, the number of > edges crossing clusters can be counted by an aggregator, let's say called > "num-edges-crossing". It would again have two stages: First stage, every > vertex just sends its cluster id to all its neighbors. Second stage, every > vertex looks at their neighbors' cluster ids in the messages, and for each > cluster id that is not equal to its own cluster id, it increments > "num-edges-crossing" by 1. > The other 2 steps, step 1 and 4, are very simple sequential computations. > Step 1 just picks k random vertex ids and puts it into an aggregator. Step 4 > just compares "num-edges-crossing" by a threshold and also checks whether or > not the algorithm has exceeded maxIterations (not supersteps but iterations > of going through Steps 1-4). With the current API, it's not clear where to do > these computations. There is a per worker function preSuperstep() that can be > implemented, but if we decide to pick a special worker, let's say worker 1, > to pick the k vertices then we'd waste an entire superstep where only worker > 1 would do work, (by picking k vertices in preSuperstep() and put them into > an aggregator), and all other workers would be idle. Trying to do this in > worker 1 in postSuperstep() would not work either because, worker 1 needs to > know that all the vertices have converged to understand that it's time to > pick k vertices or it's time do check in step 4, which would only be > available to it in the beginning of the next superstep. > A master.compute() extension would run at the master and before the superstep > and would modify the aggregator that would keep the k vertices before the > aggregators are broadcast to the workers, which are all very short sequential > computations, so they would not waste resources the way a preSuperstep() or > postSuperstep() approach would do. It would also enable running new > algorithms like kmeans that are composed of very vertex-centric computations > glued together by small sequential ones. It would basically boost Giraph with > sequential computation in a non-wasteful way. > I am a phd student at Stanford and I have been working on my own BSP/Pregel > implementation since last year. It's called GPS. I haven't distributed it, > mainly because in September I learned about Giraph and I decided to slow down > on working on it :). We have basically been using GPS as our own research > platform. The source code for GPS is here if any one is interested > (https://subversion.assembla.com/svn/phd-projects/gps/trunk/). We have the > master.compute() feature in GPS, and here's an example of KMeans > implementation in GPS with master.compute(): > (https://subversion.assembla.com/svn/phd-projects/gps/trunk/src/java/gps/examples/kmeans/). > (Aggregators are called GlobalObjects in GPS). There is another example > (https://subversion.assembla.com/svn/phd-projects/gps/trunk/src/java/gps/examples/randomgraphcoarsening/), > which I'll skip explaining because it's very detailed and would make the > similar points that I am trying to make with k-means. Master.compute() in > general would make it possible to glue together any graph algorithm that is > composed of multiple stages with different message types and computations > that is conducive to run with vertex.compute(). There are many examples of > such algorithms: recursive partitioning, triangle counting, even much simpler > things like finding shortests paths for 100 vertices in pieces (first to 5 > vertices, then to another 5, then to another 5, etc..), which would be good > because trying to find shortests paths to 100 vertices require a very large > messages (would need to store 100 integers per message)). > If the Giraph team approves, I would like to take a similar approach in > implementing this feature in Giraph as I've done in GPS. Overall: > Add a Master.java to org.apache.giraph.graph, that is default Master, with a > compute function that by default aggregates all aggregators and does the > check of whether or not the computation has ended (by comparining numVertices > with numFinishedVertices). This would be a refactoring of > org.apache.giraph.graph.BspServiceMaster class (as far as I can see). > Extend GiraphJob to have a setMaster() method to set a master class (by > default it would be the default master above) > The rest would be sending the custom master class to probably all workers but > only the master would instantiate it with reflection. I need to learn more on > how to do these, I am not familiar with that part of the Giraph code base yet. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira