Hello, We are using OutOofCore functionality to perform computations over huge graph (billions of nodes).
Recently we have faced a situation when all our workers stuck doing nothing except performing System.gc() triggered from Giraph's ThresholdBasedOracle. The intriguing point was that no memory was freed at all at each gc. At the same time our memory consumption level was above highMemoryPressure and all commands that Oracle could give to IO scheduler were STORE_MESSAGES_AND_BUFFERS and STORE_PARTITION. However, there was NO partitions, messages or buffers available for offloading. We looked into state of the MetaPartitionManager and discovered that according to state matrix within it all unprocessed partitions are already spilled to disk as well as their messages. But there were no data for messages stored on disk. A little bit more struggle and we discovered that our RAM space was almost entirely consumed by incoming messages placed in OneMessagePerVertexStore instance. Then we looked into DiskBackedMessageStore and found out that it just don't offloads any incoming message data when we use message combiner (please see org.apache.giraph.ooc.data.DiskBackedMessageStore#offloadPartitionData and org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand). This situation can be reproduced easily using big enough graph and two workers with small amount of RAM and OOC enabled (and configured properly). Even with combiner, which leaves only one message per vertex, number of partitions and vertices can be too big to hold incoming message data entirely in memory. Can we somehow work around such limitation and NOT disable Combiner? Our test computation config looks like this: hadoop jar /opt/giraph-1.2.0/pr-job-jar-with-dependencies.jar org.apache.giraph.GiraphRunner com.prototype.di.pr.PageRankComputation \ -mc com.prototype.di.pr.PageRankMasterCompute \ -yj pr-job-jar-with-dependencies.jar \ -vif com.prototype.di.pr.input.HBLongVertexInputFormat \ -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \ -op /user/hadoop/output/pr_test \ -w 2 \ -c com.prototype.di.pr.PRDoubleCombiner \ -wc com.prototype.di.pr.PageRankWorkerContext \ -ca hbase.rootdir=hdfs://namenode1.testcluster.com:8020/hbase \ -ca giraph.logLevel=info \ -ca hbase.mapreduce.inputtable=di_test \ -ca hbase.mapreduce.scan.columns=di:n \ -ca hbase.defaults.for.version.skip=true \ -ca hbase.table.row.textkey=false \ -ca giraph.yarn.task.heap.mb=10000 \ -ca giraph.isStaticGraph=true \ -ca giraph.SplitMasterWorker=false \ -ca giraph.oneToAllMsgSending=true \ -ca giraph.metrics.enable=false \ -ca giraph.jmap.histo.enable=false \ -ca giraph.vertexIdClass=com.prototype.di.pr.DomainPartAwareLongWritable \ -ca giraph.outgoingMessageValueClass=org.apache.hadoop.io.DoubleWritable \ -ca giraph.addDebugOpt=true \ -ca giraph.useOutOfCoreGraph=true \ -ca giraph.waitForPerWorkerRequests=true \ -ca giraph.maxNumberOfUnsentRequests=1000 \ -ca giraph.vertexInputFilterClass=com.prototype.di.pr.input.PagesFromSameDomainLimiter \ -ca giraph.pr.di.maxPagesFromSameDomain=-1 \ -ca giraph.useInputSplitLocality=true \ -ca hbase.mapreduce.scan.cachedrows=1000 \ -ca giraph.minPartitionsPerComputeThread=150 \ -ca giraph.graphPartitionerFactoryClass=com.prototype.di.pr.DomainAwareGraphPartitionerFactory \ -ca giraph.numInputThreads=1 \ -ca giraph.inputSplitSamplePercent=1 \ -ca giraph.pr.maxNeighborsPerVertex=256 \ -ca giraph.partitionClass=org.apache.giraph.partition.ByteArrayPartition \ -ca giraph.vertexClass=org.apache.giraph.graph.ByteValueVertex \ -ca giraph.inputOutEdgesClass=org.apache.giraph.edge.LongNullArrayEdges \ -ca giraph.numComputeThreads=2 \ -ca giraph.memory.failPressure=0.6 \ -ca giraph.memory.emergencyPressure=0.575 \ -ca giraph.memory.highPressure=0.55 \ -ca giraph.memory.optimalPressure=0.525 \ -ca giraph.memory.lowPressure=0.5 Thank you in advance. Best Regards, Denis Dudinski
