Hi Denis, Thanks for trying out the new OOC design. As you mentioned, by using message combiner we only keep one message for each vertex at all time. That means the storage needed for messages is #ofVertices*(SizeOfVertexId + SizeOfOneMessage). We studied several applications that use message combiner and noticed that in those applications message type is rather simple (usually a double or a pair of double or similar types). The vertex ID is, also, usually simple (a long value in most cases). That means per vertex we are keeping only 16-20 bytes for its message. We could offload (and in fact, it is very simple to) offload messages in case of having message combiners, but, we noticed that we could achieve a much better performance if we do not do so. In other words, it was intentional to not offload messages when message combiner is used. Although, message flow control is still in effect (and much needed) even with message combiners.
I do not suggest you disable message combiner, as it further reduces the performance. Rather, *I suggest you increase the number of partitions per machine*. If you still see the issue (meaning that the RAM is too small to even hold one message per vertex), you can create a JIRA, mention exactly the numbers you have in your case (e.g., size of message, size of vertex id, size of RAM so there is a justification for the feature) and assign it to me, so that I add the option to offload the messages even when combiners are used. Also, I encourage you to not use the "isStaticGraph" option until it is completely fixed and tested. Best, Hassan On Mon, Nov 14, 2016 at 9:44 AM, Denis Dudinski <[email protected]> wrote: > Hello, > > We are using OutOofCore functionality to perform computations over > huge graph (billions of nodes). > > Recently we have faced a situation when all our workers stuck doing > nothing except performing System.gc() triggered from Giraph's > ThresholdBasedOracle. The intriguing point was that no memory was > freed at all at each gc. At the same time our memory consumption level > was above highMemoryPressure and all commands that Oracle could give > to IO scheduler were STORE_MESSAGES_AND_BUFFERS and STORE_PARTITION. > However, there was NO partitions, messages or buffers available for > offloading. > > We looked into state of the MetaPartitionManager and discovered that > according to state matrix within it all unprocessed partitions are > already spilled to disk as well as their messages. But there were no > data for messages stored on disk. A little bit more struggle and we > discovered that our RAM space was almost entirely consumed by incoming > messages placed in OneMessagePerVertexStore instance. Then we looked > into DiskBackedMessageStore and found out that it just don't offloads > any incoming message data when we use message combiner (please see > org.apache.giraph.ooc.data.DiskBackedMessageStore#offloadPartitionData > and org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand). > > This situation can be reproduced easily using big enough graph and two > workers with small amount of RAM and OOC enabled (and configured > properly). Even with combiner, which leaves only one message per > vertex, number of partitions and vertices can be too big to hold > incoming message data entirely in memory. > > Can we somehow work around such limitation and NOT disable Combiner? > > Our test computation config looks like this: > > hadoop jar /opt/giraph-1.2.0/pr-job-jar-with-dependencies.jar > org.apache.giraph.GiraphRunner com.prototype.di.pr.PageRankComputation > \ > -mc com.prototype.di.pr.PageRankMasterCompute \ > -yj pr-job-jar-with-dependencies.jar \ > -vif com.prototype.di.pr.input.HBLongVertexInputFormat \ > -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \ > -op /user/hadoop/output/pr_test \ > -w 2 \ > -c com.prototype.di.pr.PRDoubleCombiner \ > -wc com.prototype.di.pr.PageRankWorkerContext \ > -ca hbase.rootdir=hdfs://namenode1.testcluster.com:8020/hbase \ > -ca giraph.logLevel=info \ > -ca hbase.mapreduce.inputtable=di_test \ > -ca hbase.mapreduce.scan.columns=di:n \ > -ca hbase.defaults.for.version.skip=true \ > -ca hbase.table.row.textkey=false \ > -ca giraph.yarn.task.heap.mb=10000 \ > -ca giraph.isStaticGraph=true \ > -ca giraph.SplitMasterWorker=false \ > -ca giraph.oneToAllMsgSending=true \ > -ca giraph.metrics.enable=false \ > -ca giraph.jmap.histo.enable=false \ > -ca giraph.vertexIdClass=com.prototype.di.pr.DomainPartAwareLongWritable \ > -ca giraph.outgoingMessageValueClass=org.apache.hadoop.io.DoubleWritable \ > -ca giraph.addDebugOpt=true \ > -ca giraph.useOutOfCoreGraph=true \ > -ca giraph.waitForPerWorkerRequests=true \ > -ca giraph.maxNumberOfUnsentRequests=1000 \ > -ca giraph.vertexInputFilterClass=com.prototype.di.pr.input. > PagesFromSameDomainLimiter > \ > -ca giraph.pr.di.maxPagesFromSameDomain=-1 \ > -ca giraph.useInputSplitLocality=true \ > -ca hbase.mapreduce.scan.cachedrows=1000 \ > -ca giraph.minPartitionsPerComputeThread=150 \ > -ca giraph.graphPartitionerFactoryClass=com.prototype.di.pr. > DomainAwareGraphPartitionerFactory > \ > -ca giraph.numInputThreads=1 \ > -ca giraph.inputSplitSamplePercent=1 \ > -ca giraph.pr.maxNeighborsPerVertex=256 \ > -ca giraph.partitionClass=org.apache.giraph.partition.ByteArrayPartition \ > -ca giraph.vertexClass=org.apache.giraph.graph.ByteValueVertex \ > -ca giraph.inputOutEdgesClass=org.apache.giraph.edge.LongNullArrayEdges \ > -ca giraph.numComputeThreads=2 \ > -ca giraph.memory.failPressure=0.6 \ > -ca giraph.memory.emergencyPressure=0.575 \ > -ca giraph.memory.highPressure=0.55 \ > -ca giraph.memory.optimalPressure=0.525 \ > -ca giraph.memory.lowPressure=0.5 > > Thank you in advance. > > Best Regards, > Denis Dudinski >
