[ 
https://issues.apache.org/jira/browse/GIRAPH-1125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762637#comment-15762637
 ] 

ASF GitHub Bot commented on GIRAPH-1125:
----------------------------------------

Github user heslami commented on a diff in the pull request:

    https://github.com/apache/giraph/pull/12#discussion_r93143346
  
    --- Diff: 
giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
 ---
    @@ -0,0 +1,851 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.giraph.ooc.policy;
    +
    +import com.sun.management.GarbageCollectionNotificationInfo;
    +import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
    +import org.apache.giraph.comm.NetworkMetrics;
    +import org.apache.giraph.conf.FloatConfOption;
    +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
    +import org.apache.giraph.conf.LongConfOption;
    +import org.apache.giraph.edge.AbstractEdgeStore;
    +import org.apache.giraph.ooc.OutOfCoreEngine;
    +import org.apache.giraph.ooc.command.IOCommand;
    +import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
    +import org.apache.giraph.ooc.command.WaitIOCommand;
    +import org.apache.giraph.worker.EdgeInputSplitsCallable;
    +import org.apache.giraph.worker.VertexInputSplitsCallable;
    +import org.apache.giraph.worker.WorkerProgress;
    +import org.apache.log4j.Logger;
    +
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.MemoryPoolMXBean;
    +import java.lang.management.MemoryUsage;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Vector;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static com.google.common.base.Preconditions.checkState;
    +
    +/**
    + * Implementation of {@link OutOfCoreOracle} that uses a linear regression 
model
    + * to estimate actual memory usage based on the current state of 
computation.
    + * The model takes into consideration 5 parameters:
    + *
    + * y = c0 + c1*x1 + c2*x2 + c3*x3 + c4*x4 + c5*x5
    + *
    + * y: memory usage
    + * x1: edges loaded
    + * x2: vertices loaded
    + * x3: vertices processed
    + * x4: bytes received due to messages
    + * x5: bytes loaded/stored from/to disk due to OOC.
    + *
    + */
    +public class MemoryEstimatorOracle implements OutOfCoreOracle {
    +  /** Memory check interval in msec */
    +  public static final LongConfOption CHECK_MEMORY_INTERVAL =
    +    new LongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000,
    +        "The interval where memory checker thread wakes up and " +
    +            "monitors memory footprint (in milliseconds)");
    +  /**
    +   * If mem-usage is above this threshold and no Full GC has been called,
    +   * we call it manually
    +   */
    +  public static final FloatConfOption MANUAL_GC_MEMORY_PRESSURE =
    +    new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f,
    +        "The threshold above which GC is called manually if Full GC has 
not " +
    +            "happened in a while");
    +  /** Used to detect a high memory pressure situation */
    +  public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION =
    +    new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f,
    +        "Minimum percentage of memory we expect to be reclaimed after a 
Full " +
    +            "GC. If less than this amount is reclaimed, it is sage to say 
" +
    +            "we are in a high memory situation and the estimation 
mechanism " +
    +            "has not recognized it yet!");
    +  /** If mem-usage is above this threshold, active threads are set to 0 */
    +  public static final FloatConfOption AM_HIGH_THRESHOLD =
    +    new FloatConfOption("giraph.amHighThreshold", 0.95f,
    +        "If mem-usage is above this threshold, all active threads " +
    +            "(compute/input) are paused.");
    +  /** If mem-usage is below this threshold, active threads are set to max 
*/
    +  public static final FloatConfOption AM_LOW_THRESHOLD =
    +    new FloatConfOption("giraph.amLowThreshold", 0.90f,
    +        "If mem-usage is below this threshold, all active threads " +
    +            "(compute/input) are running.");
    +  /** If mem-usage is above this threshold, credit is set to 0 */
    +  public static final FloatConfOption CREDIT_HIGH_THRESHOLD =
    +    new FloatConfOption("giraph.creditHighThreshold", 0.95f,
    +        "If mem-usage is above this threshold, credit is set to 0");
    +  /** If mem-usage is below this threshold, credit is set to max */
    +  public static final FloatConfOption CREDIT_LOW_THRESHOLD =
    +    new FloatConfOption("giraph.creditLowThreshold", 0.90f,
    +        "If mem-usage is below this threshold, credit is set to max");
    +  /** OOC starts if mem-usage is above this threshold */
    +  public static final FloatConfOption OOC_THRESHOLD =
    +    new FloatConfOption("giraph.oocThreshold", 0.90f,
    +        "If mem-usage is above this threshold, out of core threads starts 
" +
    +            "writing data to disk");
    +
    +  /** Logger */
    +  private static final Logger LOG =
    +    Logger.getLogger(MemoryEstimatorOracle.class);
    +
    +  /** Cached value for {@link #MANUAL_GC_MEMORY_PRESSURE} */
    +  private final float manualGCMemoryPressure;
    +  /** Cached value for {@link #GC_MINIMUM_RECLAIM_FRACTION} */
    +  private final float gcReclaimFraction;
    +  /** Cached value for {@link #AM_HIGH_THRESHOLD} */
    +  private final float amHighThreshold;
    +  /** Cached value for {@link #AM_LOW_THRESHOLD} */
    +  private final float amLowThreshold;
    +  /** Cached value for {@link #CREDIT_HIGH_THRESHOLD} */
    +  private final float creditHighThreshold;
    +  /** Cached value for {@link #CREDIT_LOW_THRESHOLD} */
    +  private final float creditLowThreshold;
    +  /** Cached value for {@link #OOC_THRESHOLD} */
    +  private final float oocThreshold;
    +
    +  /** Reference to running OOC engine */
    +  private final OutOfCoreEngine oocEngine;
    +  /** Memory estimator instance */
    +  private final MemoryEstimator memoryEstimator;
    +  /** Keeps track of the number of bytes stored/loaded by OOC */
    +  private final AtomicLong oocBytesInjected = new AtomicLong(0);
    +  /** How many bytes to offload */
    +  private final AtomicLong numBytesToOffload = new AtomicLong(0);
    +  /** Current state of the OOC */
    +  private volatile State state = State.STABLE;
    +  /** Timestamp of the last major GC */
    +  private volatile long lastMajorGCTime = 0;
    +
    +  /**
    +   * Different states the OOC can be in.
    +   */
    +  private enum State {
    +    /** No offloading */
    +    STABLE,
    +    /** Current offloading */
    +    OFFLOADING,
    +  }
    +
    +  /**
    +   * Constructor.
    +   * @param conf Configuration
    +   * @param oocEngine OOC engine.:w
    +   *
    +   */
    +  public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,
    +                               final OutOfCoreEngine oocEngine) {
    +    this.oocEngine = oocEngine;
    +    this.memoryEstimator = new MemoryEstimator(this.oocBytesInjected,
    +      oocEngine.getNetworkMetrics());
    +
    +    this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf);
    +    this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf);
    +    this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf);
    +    this.amLowThreshold = AM_LOW_THRESHOLD.get(conf);
    +    this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf);
    +    this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf);
    +    this.oocThreshold = OOC_THRESHOLD.get(conf);
    +
    +    final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
    +
    +    Thread thread = new Thread(new Runnable() {
    +      @Override
    +      public void run() {
    +        while (true) {
    +          long oldGenUsageEstimate = memoryEstimator.getUsageEstimate();
    +          MemoryUsage usage = getOldGenUsed();
    +          if (oldGenUsageEstimate > 0) {
    +            updateRates(oldGenUsageEstimate, usage.getMax());
    +          } else {
    +            long time = System.currentTimeMillis();
    +            if (time - lastMajorGCTime >= 10000) {
    +              double used = (double) usage.getUsed() / usage.getMax();
    +              if (used > manualGCMemoryPressure) {
    +                if (LOG.isInfoEnabled()) {
    +                  LOG.info(
    +                    "High memory pressure with no full GC from the JVM. " +
    +                      "Calling GC manually. Used fraction of old-gen is " +
    +                      String.format("%.2f", used) + ".");
    +                }
    +                System.gc();
    +                time = System.currentTimeMillis() - time;
    +                usage = getOldGenUsed();
    +                used = (double) usage.getUsed() / usage.getMax();
    +                if (LOG.isInfoEnabled()) {
    +                  LOG.info("Manual GC done. It took " +
    +                    String.format("%.2f", time / 1000.0) +
    +                    " seconds. Used fraction of old-gen is " +
    +                    String.format("%.2f", used) + ".");
    +                }
    +              }
    +            }
    +          }
    +          try {
    +            Thread.sleep(checkMemoryInterval);
    +          } catch (InterruptedException e) {
    +            LOG.warn("run: exception occurred!", e);
    +            return;
    +          }
    +        }
    +      }
    +    });
    +    thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
    +      .getGraphTaskManager().createUncaughtExceptionHandler());
    +    thread.setName("ooc-memory-checker");
    +    thread.setDaemon(true);
    +    thread.start();
    +  }
    +
    +  /**
    +   * Resets all the counters used in the memory estimation. This is called 
at
    +   * the beginning of a new superstep.
    +   * <p>
    +   * The number of vertices to compute in the next superstep gets reset in
    +   * {@link 
org.apache.giraph.graph.GraphTaskManager#processGraphPartitions}
    +   * right before
    +   * {@link org.apache.giraph.partition.PartitionStore#startIteration()} 
gets
    +   * called.
    +   */
    +  @Override
    +  public void startIteration() {
    +    oocBytesInjected.set(0);
    +    memoryEstimator.clear();
    +    memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep());
    +    oocEngine.updateRequestsCreditFraction(1);
    +    oocEngine.updateActiveThreadsFraction(1);
    +  }
    +
    +
    +  @Override
    +  public IOAction[] getNextIOActions() {
    +    if (state == State.OFFLOADING) {
    +      return new IOAction[]{
    +        IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION};
    +    }
    +    long oldGenUsage = memoryEstimator.getUsageEstimate();
    +    MemoryUsage usage = getOldGenUsed();
    +    if (oldGenUsage > 0) {
    +      double usageEstimate = (double) oldGenUsage / usage.getMax();
    +      if (usageEstimate > oocThreshold) {
    +        return new IOAction[]{
    +          IOAction.STORE_MESSAGES_AND_BUFFERS,
    +          IOAction.STORE_PARTITION};
    +      } else {
    +        return new IOAction[]{IOAction.LOAD_PARTITION};
    +      }
    +    } else {
    +      return new IOAction[]{IOAction.LOAD_PARTITION};
    +    }
    +  }
    +
    +  @Override
    +  public boolean approve(IOCommand command) {
    +    return true;
    +  }
    +
    +  @Override
    +  public void commandCompleted(IOCommand command) {
    +    if (command instanceof LoadPartitionIOCommand) {
    +      oocBytesInjected.getAndAdd(command.bytesTransferred());
    +      if (state == State.OFFLOADING) {
    +        numBytesToOffload.getAndAdd(command.bytesTransferred());
    +      }
    +    } else if (!(command instanceof WaitIOCommand)) {
    +      oocBytesInjected.getAndAdd(0 - command.bytesTransferred());
    +      if (state == State.OFFLOADING) {
    +        numBytesToOffload.getAndAdd(0 - command.bytesTransferred());
    +      }
    +    }
    +
    +    if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) {
    +      numBytesToOffload.set(0);
    +      state = State.STABLE;
    +      updateRates(-1, 1);
    +    }
    +  }
    +
    +  /**
    +   * When a new GC has completed, we can get an accurate measurement of the
    +   * memory usage. We use this to update the linear regression model.
    +   *
    +   * @param gcInfo GC information
    +   */
    +  @Override
    +  public synchronized void gcCompleted(
    +    GarbageCollectionNotificationInfo gcInfo) {
    +    String action = gcInfo.getGcAction().toLowerCase();
    +    String cause = gcInfo.getGcCause().toLowerCase();
    +    if (action.contains("major") &&
    +      (cause.contains("ergo") || cause.contains("system"))) {
    +      lastMajorGCTime = System.currentTimeMillis();
    +      MemoryUsage before = null;
    +      MemoryUsage after = null;
    +
    +      for (Map.Entry<String, MemoryUsage> entry :
    +        gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) {
    +        String poolName = entry.getKey();
    +        if (poolName.toLowerCase().contains("old")) {
    +          before = entry.getValue();
    +          after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName);
    +          break;
    +        }
    +      }
    +      if (after == null) {
    +        throw new IllegalStateException("Missing Memory Usage After GC 
info");
    +      }
    +      if (before == null) {
    +        throw new IllegalStateException("Missing Memory Usage Before GC 
info");
    +      }
    +
    +      // Compare the estimation with the actual value
    +      long usedMemoryEstimate = memoryEstimator.getUsageEstimate();
    +      long usedMemoryReal = after.getUsed();
    +      if (usedMemoryEstimate >= 0) {
    +        if (LOG.isInfoEnabled()) {
    +          LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " 
real=" +
    +            usedMemoryReal + " error=" +
    +            ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) /
    +              usedMemoryReal * 100));
    +        }
    +      }
    +
    +      // Number of edges loaded so far (if in input superstep)
    +      long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
    +        EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
    +      // Number of vertices loaded so far (if in input superstep)
    +      long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
    +        VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
    +      // Number of vertices computed (if either in compute or store phase)
    +      long verticesComputed = WorkerProgress.get().getVerticesComputed() +
    +        WorkerProgress.get().getVerticesStored() +
    +        AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
    +      // Number of bytes received
    +      long receivedBytes =
    +        oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep();
    +      // Number of OOC bytes
    +      long oocBytes = oocBytesInjected.get();
    +
    +      memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded,
    +        verticesLoaded, verticesComputed, receivedBytes, oocBytes);
    +
    +      long garbage = before.getUsed() - after.getUsed();
    +      long maxMem = after.getMax();
    +      long memUsed = after.getUsed();
    +      boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * 
maxMem &&
    +        garbage < gcReclaimFraction * maxMem;
    +      boolean predictionExist = memoryEstimator.getUsageEstimate() > 0;
    +      if (isTight && !predictionExist) {
    +        if (LOG.isInfoEnabled()) {
    +          LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" +
    +            memUsed + " maxMem=" + maxMem);
    +        }
    +        numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) -
    +          (maxMem - memUsed));
    +        if (LOG.isInfoEnabled()) {
    +          LOG.info("gcCompleted: tight memory usage. Starting to offload " 
+
    +            "until " + numBytesToOffload.get() + " bytes are offloaded");
    +        }
    +        state = State.OFFLOADING;
    +        updateRates(1, 1);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Given an estimate for the current memory usage and the maximum 
available
    +   * memory, it updates the active threads and flow control credit in the
    +   * OOC engine.
    +   *
    +   * @param usageEstimateMem Estimate of memory usage.
    +   * @param maxMemory Maximum memory.
    +   */
    +  private void updateRates(long usageEstimateMem, long maxMemory) {
    +    double usageEstimate = (double) usageEstimateMem / maxMemory;
    +    if (usageEstimate > 0) {
    +      if (usageEstimate >= amHighThreshold) {
    +        oocEngine.updateActiveThreadsFraction(0);
    +      } else if (usageEstimate < amLowThreshold) {
    +        oocEngine.updateActiveThreadsFraction(1);
    +      } else {
    +        oocEngine.updateActiveThreadsFraction(1 -
    +          (usageEstimate - amLowThreshold) /
    +            (amHighThreshold - amLowThreshold));
    +      }
    +
    +      if (usageEstimate >= creditHighThreshold) {
    +        oocEngine.updateRequestsCreditFraction(0);
    +      } else if (usageEstimate < creditLowThreshold) {
    +        oocEngine.updateRequestsCreditFraction(1);
    +      } else {
    +        oocEngine.updateRequestsCreditFraction(1 -
    +          (usageEstimate - creditLowThreshold) /
    +            (creditHighThreshold - creditLowThreshold));
    +      }
    +    } else {
    +      oocEngine.updateActiveThreadsFraction(1);
    +      oocEngine.updateRequestsCreditFraction(1);
    +    }
    +  }
    +
    +  /**
    +   * Returns statistics about the old gen pool.
    +   * @return {@link MemoryUsage}.
    +   */
    +  private MemoryUsage getOldGenUsed() {
    +    List<MemoryPoolMXBean> memoryPoolList =
    +      ManagementFactory.getMemoryPoolMXBeans();
    +    for (MemoryPoolMXBean pool : memoryPoolList) {
    +      String normalName = pool.getName().toLowerCase();
    +      if (normalName.contains("old") || normalName.contains("tenured")) {
    +        return pool.getUsage();
    +      }
    +    }
    +    throw new IllegalStateException("Bad Memory Pool");
    +  }
    +
    +  /**
    +   * Maintains statistics about the current state and progress of the
    +   * computation and produces estimates of memory usage using a technique
    +   * based on linear regression.
    +   *
    +   * Upon a GC events, it gets updated with the most recent statistics 
through
    +   * the {@link #addRecord} method.
    +   */
    +  private static class MemoryEstimator {
    +    /** Stores the (x1,x2,...,x5) arrays of data samples, one for each 
sample */
    +    private Vector<double[]> dataSamples = new Vector<>();
    +    /** Stores the y memory usage dataSamples, one for each sample */
    +    private Vector<Double> memorySamples = new Vector<>();
    --- End diff --
    
    `DoubleArrayList` from fastutil is useful when we have a lot of entry in 
the list. The number of entries in our case is very small (the number of full 
GCs in one superstep, which is in the range of 1-100, getting closer to 1000 in 
the very extreme cases). So, the choice of either `Vector` or `ArrayList` is 
more natural. I agree that we are not using the synchronized guarantees of 
`Vector`, hence using `ArrayList` is an even more natural choice. I'll change 
these to `ArrayList`. Conversion from `ArrayList` (or even `Vector`) to 
primitive array can be done through `toArray` method in either of the 
collections. I'll fix that too.


> Add memory estimation mechanism to out-of-core
> ----------------------------------------------
>
>                 Key: GIRAPH-1125
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-1125
>             Project: Giraph
>          Issue Type: Improvement
>            Reporter: Hassan Eslami
>            Assignee: Hassan Eslami
>
> The new out-of-core mechanism is designed with the adaptivity goal in mind, 
> meaning that we wanted out-of-core mechanism to kick in only when it is 
> necessary. In other words, when the amount of data (graph, messages, and 
> mutations) all fit in memory, we want to take advantage of the entire memory. 
> And, when in a stage the memory is short, only enough (minimal) amount of 
> data goes out of core (to disk). This ensures a good performance for the 
> out-of-core mechanism.
> To satisfy the adaptiveness goal, we need to know how much memory is used at 
> each point of time. The default out-of-core mechanism (ThresholdBasedOracle) 
> get memory information based on JVM's internal methods (Runtime's 
> freeMemory()). This method is inaccurate (and pessimistic), meaning that it 
> does not account for garbage data that has not been purged by GC. Using JVM's 
> default methods, OOC behaves pessimistically and move data out of core even 
> if it is not necessary. For instance, consider the case where there are a lot 
> of garbage on the heap, but GC has not happened for a while. In this case, 
> the default OOC pushes data on disk and immediately after a major GC it 
> brings back the data to memory. This causes inefficiency in the default out 
> of core mechanism. If out-of-core is used but the data can entirely fit in 
> memory, the job goes out of core even though going out of core is not 
> necessary.
> To address this issue, we need to have a mechanism to more accurately know 
> how much of heap is filled with non-garbage data. Consequently, we need to 
> change the Oracle (OOC policy) to take advantage of a more accurate memory 
> usage estimation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to