[ https://issues.apache.org/jira/browse/GIRAPH-1125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762637#comment-15762637 ]
ASF GitHub Bot commented on GIRAPH-1125: ---------------------------------------- Github user heslami commented on a diff in the pull request: https://github.com/apache/giraph/pull/12#discussion_r93143346 --- Diff: giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java --- @@ -0,0 +1,851 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.ooc.policy; + +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression; +import org.apache.giraph.comm.NetworkMetrics; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.LongConfOption; +import org.apache.giraph.edge.AbstractEdgeStore; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.command.IOCommand; +import org.apache.giraph.ooc.command.LoadPartitionIOCommand; +import org.apache.giraph.ooc.command.WaitIOCommand; +import org.apache.giraph.worker.EdgeInputSplitsCallable; +import org.apache.giraph.worker.VertexInputSplitsCallable; +import org.apache.giraph.worker.WorkerProgress; +import org.apache.log4j.Logger; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryUsage; +import java.util.List; +import java.util.Map; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Implementation of {@link OutOfCoreOracle} that uses a linear regression model + * to estimate actual memory usage based on the current state of computation. + * The model takes into consideration 5 parameters: + * + * y = c0 + c1*x1 + c2*x2 + c3*x3 + c4*x4 + c5*x5 + * + * y: memory usage + * x1: edges loaded + * x2: vertices loaded + * x3: vertices processed + * x4: bytes received due to messages + * x5: bytes loaded/stored from/to disk due to OOC. + * + */ +public class MemoryEstimatorOracle implements OutOfCoreOracle { + /** Memory check interval in msec */ + public static final LongConfOption CHECK_MEMORY_INTERVAL = + new LongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000, + "The interval where memory checker thread wakes up and " + + "monitors memory footprint (in milliseconds)"); + /** + * If mem-usage is above this threshold and no Full GC has been called, + * we call it manually + */ + public static final FloatConfOption MANUAL_GC_MEMORY_PRESSURE = + new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f, + "The threshold above which GC is called manually if Full GC has not " + + "happened in a while"); + /** Used to detect a high memory pressure situation */ + public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION = + new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f, + "Minimum percentage of memory we expect to be reclaimed after a Full " + + "GC. If less than this amount is reclaimed, it is sage to say " + + "we are in a high memory situation and the estimation mechanism " + + "has not recognized it yet!"); + /** If mem-usage is above this threshold, active threads are set to 0 */ + public static final FloatConfOption AM_HIGH_THRESHOLD = + new FloatConfOption("giraph.amHighThreshold", 0.95f, + "If mem-usage is above this threshold, all active threads " + + "(compute/input) are paused."); + /** If mem-usage is below this threshold, active threads are set to max */ + public static final FloatConfOption AM_LOW_THRESHOLD = + new FloatConfOption("giraph.amLowThreshold", 0.90f, + "If mem-usage is below this threshold, all active threads " + + "(compute/input) are running."); + /** If mem-usage is above this threshold, credit is set to 0 */ + public static final FloatConfOption CREDIT_HIGH_THRESHOLD = + new FloatConfOption("giraph.creditHighThreshold", 0.95f, + "If mem-usage is above this threshold, credit is set to 0"); + /** If mem-usage is below this threshold, credit is set to max */ + public static final FloatConfOption CREDIT_LOW_THRESHOLD = + new FloatConfOption("giraph.creditLowThreshold", 0.90f, + "If mem-usage is below this threshold, credit is set to max"); + /** OOC starts if mem-usage is above this threshold */ + public static final FloatConfOption OOC_THRESHOLD = + new FloatConfOption("giraph.oocThreshold", 0.90f, + "If mem-usage is above this threshold, out of core threads starts " + + "writing data to disk"); + + /** Logger */ + private static final Logger LOG = + Logger.getLogger(MemoryEstimatorOracle.class); + + /** Cached value for {@link #MANUAL_GC_MEMORY_PRESSURE} */ + private final float manualGCMemoryPressure; + /** Cached value for {@link #GC_MINIMUM_RECLAIM_FRACTION} */ + private final float gcReclaimFraction; + /** Cached value for {@link #AM_HIGH_THRESHOLD} */ + private final float amHighThreshold; + /** Cached value for {@link #AM_LOW_THRESHOLD} */ + private final float amLowThreshold; + /** Cached value for {@link #CREDIT_HIGH_THRESHOLD} */ + private final float creditHighThreshold; + /** Cached value for {@link #CREDIT_LOW_THRESHOLD} */ + private final float creditLowThreshold; + /** Cached value for {@link #OOC_THRESHOLD} */ + private final float oocThreshold; + + /** Reference to running OOC engine */ + private final OutOfCoreEngine oocEngine; + /** Memory estimator instance */ + private final MemoryEstimator memoryEstimator; + /** Keeps track of the number of bytes stored/loaded by OOC */ + private final AtomicLong oocBytesInjected = new AtomicLong(0); + /** How many bytes to offload */ + private final AtomicLong numBytesToOffload = new AtomicLong(0); + /** Current state of the OOC */ + private volatile State state = State.STABLE; + /** Timestamp of the last major GC */ + private volatile long lastMajorGCTime = 0; + + /** + * Different states the OOC can be in. + */ + private enum State { + /** No offloading */ + STABLE, + /** Current offloading */ + OFFLOADING, + } + + /** + * Constructor. + * @param conf Configuration + * @param oocEngine OOC engine.:w + * + */ + public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf, + final OutOfCoreEngine oocEngine) { + this.oocEngine = oocEngine; + this.memoryEstimator = new MemoryEstimator(this.oocBytesInjected, + oocEngine.getNetworkMetrics()); + + this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf); + this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf); + this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf); + this.amLowThreshold = AM_LOW_THRESHOLD.get(conf); + this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf); + this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf); + this.oocThreshold = OOC_THRESHOLD.get(conf); + + final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + long oldGenUsageEstimate = memoryEstimator.getUsageEstimate(); + MemoryUsage usage = getOldGenUsed(); + if (oldGenUsageEstimate > 0) { + updateRates(oldGenUsageEstimate, usage.getMax()); + } else { + long time = System.currentTimeMillis(); + if (time - lastMajorGCTime >= 10000) { + double used = (double) usage.getUsed() / usage.getMax(); + if (used > manualGCMemoryPressure) { + if (LOG.isInfoEnabled()) { + LOG.info( + "High memory pressure with no full GC from the JVM. " + + "Calling GC manually. Used fraction of old-gen is " + + String.format("%.2f", used) + "."); + } + System.gc(); + time = System.currentTimeMillis() - time; + usage = getOldGenUsed(); + used = (double) usage.getUsed() / usage.getMax(); + if (LOG.isInfoEnabled()) { + LOG.info("Manual GC done. It took " + + String.format("%.2f", time / 1000.0) + + " seconds. Used fraction of old-gen is " + + String.format("%.2f", used) + "."); + } + } + } + } + try { + Thread.sleep(checkMemoryInterval); + } catch (InterruptedException e) { + LOG.warn("run: exception occurred!", e); + return; + } + } + } + }); + thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker() + .getGraphTaskManager().createUncaughtExceptionHandler()); + thread.setName("ooc-memory-checker"); + thread.setDaemon(true); + thread.start(); + } + + /** + * Resets all the counters used in the memory estimation. This is called at + * the beginning of a new superstep. + * <p> + * The number of vertices to compute in the next superstep gets reset in + * {@link org.apache.giraph.graph.GraphTaskManager#processGraphPartitions} + * right before + * {@link org.apache.giraph.partition.PartitionStore#startIteration()} gets + * called. + */ + @Override + public void startIteration() { + oocBytesInjected.set(0); + memoryEstimator.clear(); + memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep()); + oocEngine.updateRequestsCreditFraction(1); + oocEngine.updateActiveThreadsFraction(1); + } + + + @Override + public IOAction[] getNextIOActions() { + if (state == State.OFFLOADING) { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION}; + } + long oldGenUsage = memoryEstimator.getUsageEstimate(); + MemoryUsage usage = getOldGenUsed(); + if (oldGenUsage > 0) { + double usageEstimate = (double) oldGenUsage / usage.getMax(); + if (usageEstimate > oocThreshold) { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PARTITION}; + } else { + return new IOAction[]{IOAction.LOAD_PARTITION}; + } + } else { + return new IOAction[]{IOAction.LOAD_PARTITION}; + } + } + + @Override + public boolean approve(IOCommand command) { + return true; + } + + @Override + public void commandCompleted(IOCommand command) { + if (command instanceof LoadPartitionIOCommand) { + oocBytesInjected.getAndAdd(command.bytesTransferred()); + if (state == State.OFFLOADING) { + numBytesToOffload.getAndAdd(command.bytesTransferred()); + } + } else if (!(command instanceof WaitIOCommand)) { + oocBytesInjected.getAndAdd(0 - command.bytesTransferred()); + if (state == State.OFFLOADING) { + numBytesToOffload.getAndAdd(0 - command.bytesTransferred()); + } + } + + if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) { + numBytesToOffload.set(0); + state = State.STABLE; + updateRates(-1, 1); + } + } + + /** + * When a new GC has completed, we can get an accurate measurement of the + * memory usage. We use this to update the linear regression model. + * + * @param gcInfo GC information + */ + @Override + public synchronized void gcCompleted( + GarbageCollectionNotificationInfo gcInfo) { + String action = gcInfo.getGcAction().toLowerCase(); + String cause = gcInfo.getGcCause().toLowerCase(); + if (action.contains("major") && + (cause.contains("ergo") || cause.contains("system"))) { + lastMajorGCTime = System.currentTimeMillis(); + MemoryUsage before = null; + MemoryUsage after = null; + + for (Map.Entry<String, MemoryUsage> entry : + gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) { + String poolName = entry.getKey(); + if (poolName.toLowerCase().contains("old")) { + before = entry.getValue(); + after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName); + break; + } + } + if (after == null) { + throw new IllegalStateException("Missing Memory Usage After GC info"); + } + if (before == null) { + throw new IllegalStateException("Missing Memory Usage Before GC info"); + } + + // Compare the estimation with the actual value + long usedMemoryEstimate = memoryEstimator.getUsageEstimate(); + long usedMemoryReal = after.getUsed(); + if (usedMemoryEstimate >= 0) { + if (LOG.isInfoEnabled()) { + LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " real=" + + usedMemoryReal + " error=" + + ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) / + usedMemoryReal * 100)); + } + } + + // Number of edges loaded so far (if in input superstep) + long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 : + EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count(); + // Number of vertices loaded so far (if in input superstep) + long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 : + VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count(); + // Number of vertices computed (if either in compute or store phase) + long verticesComputed = WorkerProgress.get().getVerticesComputed() + + WorkerProgress.get().getVerticesStored() + + AbstractEdgeStore.PROGRESS_COUNTER.getProgress(); + // Number of bytes received + long receivedBytes = + oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep(); + // Number of OOC bytes + long oocBytes = oocBytesInjected.get(); + + memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded, + verticesLoaded, verticesComputed, receivedBytes, oocBytes); + + long garbage = before.getUsed() - after.getUsed(); + long maxMem = after.getMax(); + long memUsed = after.getUsed(); + boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem && + garbage < gcReclaimFraction * maxMem; + boolean predictionExist = memoryEstimator.getUsageEstimate() > 0; + if (isTight && !predictionExist) { + if (LOG.isInfoEnabled()) { + LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" + + memUsed + " maxMem=" + maxMem); + } + numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) - + (maxMem - memUsed)); + if (LOG.isInfoEnabled()) { + LOG.info("gcCompleted: tight memory usage. Starting to offload " + + "until " + numBytesToOffload.get() + " bytes are offloaded"); + } + state = State.OFFLOADING; + updateRates(1, 1); + } + } + } + + /** + * Given an estimate for the current memory usage and the maximum available + * memory, it updates the active threads and flow control credit in the + * OOC engine. + * + * @param usageEstimateMem Estimate of memory usage. + * @param maxMemory Maximum memory. + */ + private void updateRates(long usageEstimateMem, long maxMemory) { + double usageEstimate = (double) usageEstimateMem / maxMemory; + if (usageEstimate > 0) { + if (usageEstimate >= amHighThreshold) { + oocEngine.updateActiveThreadsFraction(0); + } else if (usageEstimate < amLowThreshold) { + oocEngine.updateActiveThreadsFraction(1); + } else { + oocEngine.updateActiveThreadsFraction(1 - + (usageEstimate - amLowThreshold) / + (amHighThreshold - amLowThreshold)); + } + + if (usageEstimate >= creditHighThreshold) { + oocEngine.updateRequestsCreditFraction(0); + } else if (usageEstimate < creditLowThreshold) { + oocEngine.updateRequestsCreditFraction(1); + } else { + oocEngine.updateRequestsCreditFraction(1 - + (usageEstimate - creditLowThreshold) / + (creditHighThreshold - creditLowThreshold)); + } + } else { + oocEngine.updateActiveThreadsFraction(1); + oocEngine.updateRequestsCreditFraction(1); + } + } + + /** + * Returns statistics about the old gen pool. + * @return {@link MemoryUsage}. + */ + private MemoryUsage getOldGenUsed() { + List<MemoryPoolMXBean> memoryPoolList = + ManagementFactory.getMemoryPoolMXBeans(); + for (MemoryPoolMXBean pool : memoryPoolList) { + String normalName = pool.getName().toLowerCase(); + if (normalName.contains("old") || normalName.contains("tenured")) { + return pool.getUsage(); + } + } + throw new IllegalStateException("Bad Memory Pool"); + } + + /** + * Maintains statistics about the current state and progress of the + * computation and produces estimates of memory usage using a technique + * based on linear regression. + * + * Upon a GC events, it gets updated with the most recent statistics through + * the {@link #addRecord} method. + */ + private static class MemoryEstimator { + /** Stores the (x1,x2,...,x5) arrays of data samples, one for each sample */ + private Vector<double[]> dataSamples = new Vector<>(); + /** Stores the y memory usage dataSamples, one for each sample */ + private Vector<Double> memorySamples = new Vector<>(); --- End diff -- `DoubleArrayList` from fastutil is useful when we have a lot of entry in the list. The number of entries in our case is very small (the number of full GCs in one superstep, which is in the range of 1-100, getting closer to 1000 in the very extreme cases). So, the choice of either `Vector` or `ArrayList` is more natural. I agree that we are not using the synchronized guarantees of `Vector`, hence using `ArrayList` is an even more natural choice. I'll change these to `ArrayList`. Conversion from `ArrayList` (or even `Vector`) to primitive array can be done through `toArray` method in either of the collections. I'll fix that too. > Add memory estimation mechanism to out-of-core > ---------------------------------------------- > > Key: GIRAPH-1125 > URL: https://issues.apache.org/jira/browse/GIRAPH-1125 > Project: Giraph > Issue Type: Improvement > Reporter: Hassan Eslami > Assignee: Hassan Eslami > > The new out-of-core mechanism is designed with the adaptivity goal in mind, > meaning that we wanted out-of-core mechanism to kick in only when it is > necessary. In other words, when the amount of data (graph, messages, and > mutations) all fit in memory, we want to take advantage of the entire memory. > And, when in a stage the memory is short, only enough (minimal) amount of > data goes out of core (to disk). This ensures a good performance for the > out-of-core mechanism. > To satisfy the adaptiveness goal, we need to know how much memory is used at > each point of time. The default out-of-core mechanism (ThresholdBasedOracle) > get memory information based on JVM's internal methods (Runtime's > freeMemory()). This method is inaccurate (and pessimistic), meaning that it > does not account for garbage data that has not been purged by GC. Using JVM's > default methods, OOC behaves pessimistically and move data out of core even > if it is not necessary. For instance, consider the case where there are a lot > of garbage on the heap, but GC has not happened for a while. In this case, > the default OOC pushes data on disk and immediately after a major GC it > brings back the data to memory. This causes inefficiency in the default out > of core mechanism. If out-of-core is used but the data can entirely fit in > memory, the job goes out of core even though going out of core is not > necessary. > To address this issue, we need to have a mechanism to more accurately know > how much of heap is filled with non-garbage data. Consequently, we need to > change the Oracle (OOC policy) to take advantage of a more accurate memory > usage estimation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)