[
https://issues.apache.org/jira/browse/GIRAPH-1125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15755842#comment-15755842
]
ASF GitHub Bot commented on GIRAPH-1125:
----------------------------------------
Github user edunov commented on a diff in the pull request:
https://github.com/apache/giraph/pull/12#discussion_r92905726
--- Diff:
giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
---
@@ -0,0 +1,851 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.ooc.policy;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
+import org.apache.giraph.comm.NetworkMetrics;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.edge.AbstractEdgeStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.WaitIOCommand;
+import org.apache.giraph.worker.EdgeInputSplitsCallable;
+import org.apache.giraph.worker.VertexInputSplitsCallable;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.log4j.Logger;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of {@link OutOfCoreOracle} that uses a linear regression
model
+ * to estimate actual memory usage based on the current state of
computation.
+ * The model takes into consideration 5 parameters:
+ *
+ * y = c0 + c1*x1 + c2*x2 + c3*x3 + c4*x4 + c5*x5
+ *
+ * y: memory usage
+ * x1: edges loaded
+ * x2: vertices loaded
+ * x3: vertices processed
+ * x4: bytes received due to messages
+ * x5: bytes loaded/stored from/to disk due to OOC.
+ *
+ */
+public class MemoryEstimatorOracle implements OutOfCoreOracle {
+ /** Memory check interval in msec */
+ public static final LongConfOption CHECK_MEMORY_INTERVAL =
+ new LongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000,
+ "The interval where memory checker thread wakes up and " +
+ "monitors memory footprint (in milliseconds)");
+ /**
+ * If mem-usage is above this threshold and no Full GC has been called,
+ * we call it manually
+ */
+ public static final FloatConfOption MANUAL_GC_MEMORY_PRESSURE =
+ new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f,
+ "The threshold above which GC is called manually if Full GC has
not " +
+ "happened in a while");
+ /** Used to detect a high memory pressure situation */
+ public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION =
+ new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f,
+ "Minimum percentage of memory we expect to be reclaimed after a
Full " +
+ "GC. If less than this amount is reclaimed, it is sage to say
" +
+ "we are in a high memory situation and the estimation
mechanism " +
+ "has not recognized it yet!");
+ /** If mem-usage is above this threshold, active threads are set to 0 */
+ public static final FloatConfOption AM_HIGH_THRESHOLD =
+ new FloatConfOption("giraph.amHighThreshold", 0.95f,
+ "If mem-usage is above this threshold, all active threads " +
+ "(compute/input) are paused.");
+ /** If mem-usage is below this threshold, active threads are set to max
*/
+ public static final FloatConfOption AM_LOW_THRESHOLD =
+ new FloatConfOption("giraph.amLowThreshold", 0.90f,
+ "If mem-usage is below this threshold, all active threads " +
+ "(compute/input) are running.");
+ /** If mem-usage is above this threshold, credit is set to 0 */
+ public static final FloatConfOption CREDIT_HIGH_THRESHOLD =
+ new FloatConfOption("giraph.creditHighThreshold", 0.95f,
+ "If mem-usage is above this threshold, credit is set to 0");
+ /** If mem-usage is below this threshold, credit is set to max */
+ public static final FloatConfOption CREDIT_LOW_THRESHOLD =
+ new FloatConfOption("giraph.creditLowThreshold", 0.90f,
+ "If mem-usage is below this threshold, credit is set to max");
+ /** OOC starts if mem-usage is above this threshold */
+ public static final FloatConfOption OOC_THRESHOLD =
+ new FloatConfOption("giraph.oocThreshold", 0.90f,
+ "If mem-usage is above this threshold, out of core threads starts
" +
+ "writing data to disk");
+
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(MemoryEstimatorOracle.class);
+
+ /** Cached value for {@link #MANUAL_GC_MEMORY_PRESSURE} */
+ private final float manualGCMemoryPressure;
+ /** Cached value for {@link #GC_MINIMUM_RECLAIM_FRACTION} */
+ private final float gcReclaimFraction;
+ /** Cached value for {@link #AM_HIGH_THRESHOLD} */
+ private final float amHighThreshold;
+ /** Cached value for {@link #AM_LOW_THRESHOLD} */
+ private final float amLowThreshold;
+ /** Cached value for {@link #CREDIT_HIGH_THRESHOLD} */
+ private final float creditHighThreshold;
+ /** Cached value for {@link #CREDIT_LOW_THRESHOLD} */
+ private final float creditLowThreshold;
+ /** Cached value for {@link #OOC_THRESHOLD} */
+ private final float oocThreshold;
+
+ /** Reference to running OOC engine */
+ private final OutOfCoreEngine oocEngine;
+ /** Memory estimator instance */
+ private final MemoryEstimator memoryEstimator;
+ /** Keeps track of the number of bytes stored/loaded by OOC */
+ private final AtomicLong oocBytesInjected = new AtomicLong(0);
+ /** How many bytes to offload */
+ private final AtomicLong numBytesToOffload = new AtomicLong(0);
+ /** Current state of the OOC */
+ private volatile State state = State.STABLE;
+ /** Timestamp of the last major GC */
+ private volatile long lastMajorGCTime = 0;
+
+ /**
+ * Different states the OOC can be in.
+ */
+ private enum State {
+ /** No offloading */
+ STABLE,
+ /** Current offloading */
+ OFFLOADING,
+ }
+
+ /**
+ * Constructor.
+ * @param conf Configuration
+ * @param oocEngine OOC engine.:w
+ *
+ */
+ public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,
+ final OutOfCoreEngine oocEngine) {
+ this.oocEngine = oocEngine;
+ this.memoryEstimator = new MemoryEstimator(this.oocBytesInjected,
+ oocEngine.getNetworkMetrics());
+
+ this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf);
+ this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf);
+ this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf);
+ this.amLowThreshold = AM_LOW_THRESHOLD.get(conf);
+ this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf);
+ this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf);
+ this.oocThreshold = OOC_THRESHOLD.get(conf);
+
+ final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
+
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ long oldGenUsageEstimate = memoryEstimator.getUsageEstimate();
+ MemoryUsage usage = getOldGenUsed();
+ if (oldGenUsageEstimate > 0) {
+ updateRates(oldGenUsageEstimate, usage.getMax());
+ } else {
+ long time = System.currentTimeMillis();
+ if (time - lastMajorGCTime >= 10000) {
+ double used = (double) usage.getUsed() / usage.getMax();
+ if (used > manualGCMemoryPressure) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "High memory pressure with no full GC from the JVM. " +
+ "Calling GC manually. Used fraction of old-gen is " +
+ String.format("%.2f", used) + ".");
+ }
+ System.gc();
+ time = System.currentTimeMillis() - time;
+ usage = getOldGenUsed();
+ used = (double) usage.getUsed() / usage.getMax();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Manual GC done. It took " +
+ String.format("%.2f", time / 1000.0) +
+ " seconds. Used fraction of old-gen is " +
+ String.format("%.2f", used) + ".");
+ }
+ }
+ }
+ }
+ try {
+ Thread.sleep(checkMemoryInterval);
+ } catch (InterruptedException e) {
+ LOG.warn("run: exception occurred!", e);
+ return;
+ }
+ }
+ }
+ });
+ thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
+ .getGraphTaskManager().createUncaughtExceptionHandler());
+ thread.setName("ooc-memory-checker");
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Resets all the counters used in the memory estimation. This is called
at
+ * the beginning of a new superstep.
+ * <p>
+ * The number of vertices to compute in the next superstep gets reset in
+ * {@link
org.apache.giraph.graph.GraphTaskManager#processGraphPartitions}
+ * right before
+ * {@link org.apache.giraph.partition.PartitionStore#startIteration()}
gets
+ * called.
+ */
+ @Override
+ public void startIteration() {
+ oocBytesInjected.set(0);
+ memoryEstimator.clear();
+ memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep());
+ oocEngine.updateRequestsCreditFraction(1);
+ oocEngine.updateActiveThreadsFraction(1);
+ }
+
+
+ @Override
+ public IOAction[] getNextIOActions() {
+ if (state == State.OFFLOADING) {
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION};
+ }
+ long oldGenUsage = memoryEstimator.getUsageEstimate();
+ MemoryUsage usage = getOldGenUsed();
+ if (oldGenUsage > 0) {
+ double usageEstimate = (double) oldGenUsage / usage.getMax();
+ if (usageEstimate > oocThreshold) {
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PARTITION};
+ } else {
+ return new IOAction[]{IOAction.LOAD_PARTITION};
+ }
+ } else {
+ return new IOAction[]{IOAction.LOAD_PARTITION};
+ }
+ }
+
+ @Override
+ public boolean approve(IOCommand command) {
+ return true;
+ }
+
+ @Override
+ public void commandCompleted(IOCommand command) {
+ if (command instanceof LoadPartitionIOCommand) {
+ oocBytesInjected.getAndAdd(command.bytesTransferred());
+ if (state == State.OFFLOADING) {
+ numBytesToOffload.getAndAdd(command.bytesTransferred());
+ }
+ } else if (!(command instanceof WaitIOCommand)) {
+ oocBytesInjected.getAndAdd(0 - command.bytesTransferred());
+ if (state == State.OFFLOADING) {
+ numBytesToOffload.getAndAdd(0 - command.bytesTransferred());
+ }
+ }
+
+ if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) {
+ numBytesToOffload.set(0);
+ state = State.STABLE;
+ updateRates(-1, 1);
+ }
+ }
+
+ /**
+ * When a new GC has completed, we can get an accurate measurement of the
+ * memory usage. We use this to update the linear regression model.
+ *
+ * @param gcInfo GC information
+ */
+ @Override
+ public synchronized void gcCompleted(
+ GarbageCollectionNotificationInfo gcInfo) {
+ String action = gcInfo.getGcAction().toLowerCase();
+ String cause = gcInfo.getGcCause().toLowerCase();
+ if (action.contains("major") &&
+ (cause.contains("ergo") || cause.contains("system"))) {
+ lastMajorGCTime = System.currentTimeMillis();
+ MemoryUsage before = null;
+ MemoryUsage after = null;
+
+ for (Map.Entry<String, MemoryUsage> entry :
+ gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) {
+ String poolName = entry.getKey();
+ if (poolName.toLowerCase().contains("old")) {
+ before = entry.getValue();
+ after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName);
+ break;
+ }
+ }
+ if (after == null) {
+ throw new IllegalStateException("Missing Memory Usage After GC
info");
+ }
+ if (before == null) {
+ throw new IllegalStateException("Missing Memory Usage Before GC
info");
+ }
+
+ // Compare the estimation with the actual value
+ long usedMemoryEstimate = memoryEstimator.getUsageEstimate();
+ long usedMemoryReal = after.getUsed();
+ if (usedMemoryEstimate >= 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + "
real=" +
+ usedMemoryReal + " error=" +
+ ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) /
+ usedMemoryReal * 100));
+ }
+ }
+
+ // Number of edges loaded so far (if in input superstep)
+ long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
+ EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
+ // Number of vertices loaded so far (if in input superstep)
+ long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
+ VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
+ // Number of vertices computed (if either in compute or store phase)
+ long verticesComputed = WorkerProgress.get().getVerticesComputed() +
+ WorkerProgress.get().getVerticesStored() +
+ AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
+ // Number of bytes received
+ long receivedBytes =
+ oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep();
+ // Number of OOC bytes
+ long oocBytes = oocBytesInjected.get();
+
+ memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded,
+ verticesLoaded, verticesComputed, receivedBytes, oocBytes);
+
+ long garbage = before.getUsed() - after.getUsed();
+ long maxMem = after.getMax();
+ long memUsed = after.getUsed();
+ boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction *
maxMem &&
+ garbage < gcReclaimFraction * maxMem;
+ boolean predictionExist = memoryEstimator.getUsageEstimate() > 0;
+ if (isTight && !predictionExist) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" +
+ memUsed + " maxMem=" + maxMem);
+ }
+ numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) -
+ (maxMem - memUsed));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("gcCompleted: tight memory usage. Starting to offload "
+
+ "until " + numBytesToOffload.get() + " bytes are offloaded");
+ }
+ state = State.OFFLOADING;
+ updateRates(1, 1);
+ }
+ }
+ }
+
+ /**
+ * Given an estimate for the current memory usage and the maximum
available
+ * memory, it updates the active threads and flow control credit in the
+ * OOC engine.
+ *
+ * @param usageEstimateMem Estimate of memory usage.
+ * @param maxMemory Maximum memory.
+ */
+ private void updateRates(long usageEstimateMem, long maxMemory) {
+ double usageEstimate = (double) usageEstimateMem / maxMemory;
+ if (usageEstimate > 0) {
+ if (usageEstimate >= amHighThreshold) {
+ oocEngine.updateActiveThreadsFraction(0);
+ } else if (usageEstimate < amLowThreshold) {
+ oocEngine.updateActiveThreadsFraction(1);
+ } else {
+ oocEngine.updateActiveThreadsFraction(1 -
+ (usageEstimate - amLowThreshold) /
+ (amHighThreshold - amLowThreshold));
+ }
+
+ if (usageEstimate >= creditHighThreshold) {
+ oocEngine.updateRequestsCreditFraction(0);
+ } else if (usageEstimate < creditLowThreshold) {
+ oocEngine.updateRequestsCreditFraction(1);
+ } else {
+ oocEngine.updateRequestsCreditFraction(1 -
+ (usageEstimate - creditLowThreshold) /
+ (creditHighThreshold - creditLowThreshold));
+ }
+ } else {
+ oocEngine.updateActiveThreadsFraction(1);
+ oocEngine.updateRequestsCreditFraction(1);
+ }
+ }
+
+ /**
+ * Returns statistics about the old gen pool.
+ * @return {@link MemoryUsage}.
+ */
+ private MemoryUsage getOldGenUsed() {
+ List<MemoryPoolMXBean> memoryPoolList =
+ ManagementFactory.getMemoryPoolMXBeans();
+ for (MemoryPoolMXBean pool : memoryPoolList) {
+ String normalName = pool.getName().toLowerCase();
+ if (normalName.contains("old") || normalName.contains("tenured")) {
+ return pool.getUsage();
+ }
+ }
+ throw new IllegalStateException("Bad Memory Pool");
+ }
+
+ /**
+ * Maintains statistics about the current state and progress of the
+ * computation and produces estimates of memory usage using a technique
+ * based on linear regression.
+ *
+ * Upon a GC events, it gets updated with the most recent statistics
through
+ * the {@link #addRecord} method.
+ */
+ private static class MemoryEstimator {
+ /** Stores the (x1,x2,...,x5) arrays of data samples, one for each
sample */
+ private Vector<double[]> dataSamples = new Vector<>();
+ /** Stores the y memory usage dataSamples, one for each sample */
+ private Vector<Double> memorySamples = new Vector<>();
--- End diff --
What is the point of using Vector everywhere in the class? Is it possible
to use DoubleArrayList instead? I see a few pieces that can be simplified (e.g.
conversion to primitive array)
> Add memory estimation mechanism to out-of-core
> ----------------------------------------------
>
> Key: GIRAPH-1125
> URL: https://issues.apache.org/jira/browse/GIRAPH-1125
> Project: Giraph
> Issue Type: Improvement
> Reporter: Hassan Eslami
> Assignee: Hassan Eslami
>
> The new out-of-core mechanism is designed with the adaptivity goal in mind,
> meaning that we wanted out-of-core mechanism to kick in only when it is
> necessary. In other words, when the amount of data (graph, messages, and
> mutations) all fit in memory, we want to take advantage of the entire memory.
> And, when in a stage the memory is short, only enough (minimal) amount of
> data goes out of core (to disk). This ensures a good performance for the
> out-of-core mechanism.
> To satisfy the adaptiveness goal, we need to know how much memory is used at
> each point of time. The default out-of-core mechanism (ThresholdBasedOracle)
> get memory information based on JVM's internal methods (Runtime's
> freeMemory()). This method is inaccurate (and pessimistic), meaning that it
> does not account for garbage data that has not been purged by GC. Using JVM's
> default methods, OOC behaves pessimistically and move data out of core even
> if it is not necessary. For instance, consider the case where there are a lot
> of garbage on the heap, but GC has not happened for a while. In this case,
> the default OOC pushes data on disk and immediately after a major GC it
> brings back the data to memory. This causes inefficiency in the default out
> of core mechanism. If out-of-core is used but the data can entirely fit in
> memory, the job goes out of core even though going out of core is not
> necessary.
> To address this issue, we need to have a mechanism to more accurately know
> how much of heap is filled with non-garbage data. Consequently, we need to
> change the Oracle (OOC policy) to take advantage of a more accurate memory
> usage estimation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)