[
https://issues.apache.org/jira/browse/GIRAPH-1125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762437#comment-15762437
]
ASF GitHub Bot commented on GIRAPH-1125:
----------------------------------------
Github user heslami commented on a diff in the pull request:
https://github.com/apache/giraph/pull/12#discussion_r93129286
--- Diff:
giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
---
@@ -0,0 +1,851 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.ooc.policy;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
+import org.apache.giraph.comm.NetworkMetrics;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.edge.AbstractEdgeStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.WaitIOCommand;
+import org.apache.giraph.worker.EdgeInputSplitsCallable;
+import org.apache.giraph.worker.VertexInputSplitsCallable;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.log4j.Logger;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of {@link OutOfCoreOracle} that uses a linear regression
model
+ * to estimate actual memory usage based on the current state of
computation.
+ * The model takes into consideration 5 parameters:
+ *
+ * y = c0 + c1*x1 + c2*x2 + c3*x3 + c4*x4 + c5*x5
+ *
+ * y: memory usage
+ * x1: edges loaded
+ * x2: vertices loaded
+ * x3: vertices processed
+ * x4: bytes received due to messages
+ * x5: bytes loaded/stored from/to disk due to OOC.
+ *
+ */
+public class MemoryEstimatorOracle implements OutOfCoreOracle {
+ /** Memory check interval in msec */
+ public static final LongConfOption CHECK_MEMORY_INTERVAL =
+ new LongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000,
+ "The interval where memory checker thread wakes up and " +
+ "monitors memory footprint (in milliseconds)");
+ /**
+ * If mem-usage is above this threshold and no Full GC has been called,
+ * we call it manually
+ */
+ public static final FloatConfOption MANUAL_GC_MEMORY_PRESSURE =
+ new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f,
+ "The threshold above which GC is called manually if Full GC has
not " +
+ "happened in a while");
+ /** Used to detect a high memory pressure situation */
+ public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION =
+ new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f,
+ "Minimum percentage of memory we expect to be reclaimed after a
Full " +
+ "GC. If less than this amount is reclaimed, it is sage to say
" +
+ "we are in a high memory situation and the estimation
mechanism " +
+ "has not recognized it yet!");
+ /** If mem-usage is above this threshold, active threads are set to 0 */
+ public static final FloatConfOption AM_HIGH_THRESHOLD =
+ new FloatConfOption("giraph.amHighThreshold", 0.95f,
+ "If mem-usage is above this threshold, all active threads " +
+ "(compute/input) are paused.");
+ /** If mem-usage is below this threshold, active threads are set to max
*/
+ public static final FloatConfOption AM_LOW_THRESHOLD =
+ new FloatConfOption("giraph.amLowThreshold", 0.90f,
+ "If mem-usage is below this threshold, all active threads " +
+ "(compute/input) are running.");
+ /** If mem-usage is above this threshold, credit is set to 0 */
+ public static final FloatConfOption CREDIT_HIGH_THRESHOLD =
+ new FloatConfOption("giraph.creditHighThreshold", 0.95f,
+ "If mem-usage is above this threshold, credit is set to 0");
+ /** If mem-usage is below this threshold, credit is set to max */
+ public static final FloatConfOption CREDIT_LOW_THRESHOLD =
+ new FloatConfOption("giraph.creditLowThreshold", 0.90f,
+ "If mem-usage is below this threshold, credit is set to max");
+ /** OOC starts if mem-usage is above this threshold */
+ public static final FloatConfOption OOC_THRESHOLD =
+ new FloatConfOption("giraph.oocThreshold", 0.90f,
+ "If mem-usage is above this threshold, out of core threads starts
" +
+ "writing data to disk");
+
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(MemoryEstimatorOracle.class);
+
+ /** Cached value for {@link #MANUAL_GC_MEMORY_PRESSURE} */
+ private final float manualGCMemoryPressure;
+ /** Cached value for {@link #GC_MINIMUM_RECLAIM_FRACTION} */
+ private final float gcReclaimFraction;
+ /** Cached value for {@link #AM_HIGH_THRESHOLD} */
+ private final float amHighThreshold;
+ /** Cached value for {@link #AM_LOW_THRESHOLD} */
+ private final float amLowThreshold;
+ /** Cached value for {@link #CREDIT_HIGH_THRESHOLD} */
+ private final float creditHighThreshold;
+ /** Cached value for {@link #CREDIT_LOW_THRESHOLD} */
+ private final float creditLowThreshold;
+ /** Cached value for {@link #OOC_THRESHOLD} */
+ private final float oocThreshold;
+
+ /** Reference to running OOC engine */
+ private final OutOfCoreEngine oocEngine;
+ /** Memory estimator instance */
+ private final MemoryEstimator memoryEstimator;
+ /** Keeps track of the number of bytes stored/loaded by OOC */
+ private final AtomicLong oocBytesInjected = new AtomicLong(0);
+ /** How many bytes to offload */
+ private final AtomicLong numBytesToOffload = new AtomicLong(0);
+ /** Current state of the OOC */
+ private volatile State state = State.STABLE;
+ /** Timestamp of the last major GC */
+ private volatile long lastMajorGCTime = 0;
+
+ /**
+ * Different states the OOC can be in.
+ */
+ private enum State {
+ /** No offloading */
+ STABLE,
+ /** Current offloading */
+ OFFLOADING,
+ }
+
+ /**
+ * Constructor.
+ * @param conf Configuration
+ * @param oocEngine OOC engine.:w
+ *
+ */
+ public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,
+ final OutOfCoreEngine oocEngine) {
+ this.oocEngine = oocEngine;
+ this.memoryEstimator = new MemoryEstimator(this.oocBytesInjected,
+ oocEngine.getNetworkMetrics());
+
+ this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf);
+ this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf);
+ this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf);
+ this.amLowThreshold = AM_LOW_THRESHOLD.get(conf);
+ this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf);
+ this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf);
+ this.oocThreshold = OOC_THRESHOLD.get(conf);
+
+ final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
+
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ long oldGenUsageEstimate = memoryEstimator.getUsageEstimate();
+ MemoryUsage usage = getOldGenUsed();
+ if (oldGenUsageEstimate > 0) {
+ updateRates(oldGenUsageEstimate, usage.getMax());
+ } else {
+ long time = System.currentTimeMillis();
+ if (time - lastMajorGCTime >= 10000) {
+ double used = (double) usage.getUsed() / usage.getMax();
+ if (used > manualGCMemoryPressure) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "High memory pressure with no full GC from the JVM. " +
+ "Calling GC manually. Used fraction of old-gen is " +
+ String.format("%.2f", used) + ".");
+ }
+ System.gc();
+ time = System.currentTimeMillis() - time;
+ usage = getOldGenUsed();
+ used = (double) usage.getUsed() / usage.getMax();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Manual GC done. It took " +
+ String.format("%.2f", time / 1000.0) +
+ " seconds. Used fraction of old-gen is " +
+ String.format("%.2f", used) + ".");
+ }
+ }
+ }
+ }
+ try {
+ Thread.sleep(checkMemoryInterval);
+ } catch (InterruptedException e) {
+ LOG.warn("run: exception occurred!", e);
+ return;
+ }
+ }
+ }
+ });
+ thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
+ .getGraphTaskManager().createUncaughtExceptionHandler());
+ thread.setName("ooc-memory-checker");
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Resets all the counters used in the memory estimation. This is called
at
+ * the beginning of a new superstep.
+ * <p>
+ * The number of vertices to compute in the next superstep gets reset in
+ * {@link
org.apache.giraph.graph.GraphTaskManager#processGraphPartitions}
+ * right before
+ * {@link org.apache.giraph.partition.PartitionStore#startIteration()}
gets
+ * called.
+ */
+ @Override
+ public void startIteration() {
+ oocBytesInjected.set(0);
+ memoryEstimator.clear();
+ memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep());
+ oocEngine.updateRequestsCreditFraction(1);
+ oocEngine.updateActiveThreadsFraction(1);
+ }
+
+
+ @Override
+ public IOAction[] getNextIOActions() {
+ if (state == State.OFFLOADING) {
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION};
+ }
+ long oldGenUsage = memoryEstimator.getUsageEstimate();
+ MemoryUsage usage = getOldGenUsed();
+ if (oldGenUsage > 0) {
+ double usageEstimate = (double) oldGenUsage / usage.getMax();
+ if (usageEstimate > oocThreshold) {
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PARTITION};
+ } else {
+ return new IOAction[]{IOAction.LOAD_PARTITION};
+ }
+ } else {
+ return new IOAction[]{IOAction.LOAD_PARTITION};
+ }
+ }
+
+ @Override
+ public boolean approve(IOCommand command) {
+ return true;
+ }
+
+ @Override
+ public void commandCompleted(IOCommand command) {
+ if (command instanceof LoadPartitionIOCommand) {
+ oocBytesInjected.getAndAdd(command.bytesTransferred());
+ if (state == State.OFFLOADING) {
+ numBytesToOffload.getAndAdd(command.bytesTransferred());
+ }
+ } else if (!(command instanceof WaitIOCommand)) {
+ oocBytesInjected.getAndAdd(0 - command.bytesTransferred());
+ if (state == State.OFFLOADING) {
+ numBytesToOffload.getAndAdd(0 - command.bytesTransferred());
+ }
+ }
+
+ if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) {
+ numBytesToOffload.set(0);
+ state = State.STABLE;
+ updateRates(-1, 1);
+ }
+ }
+
+ /**
+ * When a new GC has completed, we can get an accurate measurement of the
+ * memory usage. We use this to update the linear regression model.
+ *
+ * @param gcInfo GC information
+ */
+ @Override
+ public synchronized void gcCompleted(
+ GarbageCollectionNotificationInfo gcInfo) {
+ String action = gcInfo.getGcAction().toLowerCase();
+ String cause = gcInfo.getGcCause().toLowerCase();
+ if (action.contains("major") &&
+ (cause.contains("ergo") || cause.contains("system"))) {
+ lastMajorGCTime = System.currentTimeMillis();
+ MemoryUsage before = null;
+ MemoryUsage after = null;
+
+ for (Map.Entry<String, MemoryUsage> entry :
+ gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) {
+ String poolName = entry.getKey();
+ if (poolName.toLowerCase().contains("old")) {
+ before = entry.getValue();
+ after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName);
+ break;
+ }
+ }
+ if (after == null) {
+ throw new IllegalStateException("Missing Memory Usage After GC
info");
+ }
+ if (before == null) {
+ throw new IllegalStateException("Missing Memory Usage Before GC
info");
+ }
+
+ // Compare the estimation with the actual value
+ long usedMemoryEstimate = memoryEstimator.getUsageEstimate();
+ long usedMemoryReal = after.getUsed();
+ if (usedMemoryEstimate >= 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + "
real=" +
+ usedMemoryReal + " error=" +
+ ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) /
+ usedMemoryReal * 100));
+ }
+ }
+
+ // Number of edges loaded so far (if in input superstep)
+ long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
+ EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
+ // Number of vertices loaded so far (if in input superstep)
+ long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
+ VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
+ // Number of vertices computed (if either in compute or store phase)
+ long verticesComputed = WorkerProgress.get().getVerticesComputed() +
+ WorkerProgress.get().getVerticesStored() +
+ AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
+ // Number of bytes received
+ long receivedBytes =
+ oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep();
+ // Number of OOC bytes
+ long oocBytes = oocBytesInjected.get();
+
+ memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded,
+ verticesLoaded, verticesComputed, receivedBytes, oocBytes);
+
+ long garbage = before.getUsed() - after.getUsed();
+ long maxMem = after.getMax();
+ long memUsed = after.getUsed();
+ boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction *
maxMem &&
+ garbage < gcReclaimFraction * maxMem;
+ boolean predictionExist = memoryEstimator.getUsageEstimate() > 0;
+ if (isTight && !predictionExist) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" +
+ memUsed + " maxMem=" + maxMem);
+ }
+ numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) -
+ (maxMem - memUsed));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("gcCompleted: tight memory usage. Starting to offload "
+
+ "until " + numBytesToOffload.get() + " bytes are offloaded");
+ }
+ state = State.OFFLOADING;
+ updateRates(1, 1);
+ }
+ }
+ }
+
+ /**
+ * Given an estimate for the current memory usage and the maximum
available
+ * memory, it updates the active threads and flow control credit in the
+ * OOC engine.
+ *
+ * @param usageEstimateMem Estimate of memory usage.
+ * @param maxMemory Maximum memory.
+ */
+ private void updateRates(long usageEstimateMem, long maxMemory) {
+ double usageEstimate = (double) usageEstimateMem / maxMemory;
+ if (usageEstimate > 0) {
+ if (usageEstimate >= amHighThreshold) {
+ oocEngine.updateActiveThreadsFraction(0);
+ } else if (usageEstimate < amLowThreshold) {
+ oocEngine.updateActiveThreadsFraction(1);
+ } else {
+ oocEngine.updateActiveThreadsFraction(1 -
+ (usageEstimate - amLowThreshold) /
+ (amHighThreshold - amLowThreshold));
+ }
+
+ if (usageEstimate >= creditHighThreshold) {
+ oocEngine.updateRequestsCreditFraction(0);
+ } else if (usageEstimate < creditLowThreshold) {
+ oocEngine.updateRequestsCreditFraction(1);
+ } else {
+ oocEngine.updateRequestsCreditFraction(1 -
+ (usageEstimate - creditLowThreshold) /
+ (creditHighThreshold - creditLowThreshold));
+ }
+ } else {
+ oocEngine.updateActiveThreadsFraction(1);
+ oocEngine.updateRequestsCreditFraction(1);
+ }
+ }
+
+ /**
+ * Returns statistics about the old gen pool.
+ * @return {@link MemoryUsage}.
+ */
+ private MemoryUsage getOldGenUsed() {
+ List<MemoryPoolMXBean> memoryPoolList =
+ ManagementFactory.getMemoryPoolMXBeans();
+ for (MemoryPoolMXBean pool : memoryPoolList) {
+ String normalName = pool.getName().toLowerCase();
+ if (normalName.contains("old") || normalName.contains("tenured")) {
+ return pool.getUsage();
+ }
+ }
+ throw new IllegalStateException("Bad Memory Pool");
+ }
+
+ /**
+ * Maintains statistics about the current state and progress of the
+ * computation and produces estimates of memory usage using a technique
+ * based on linear regression.
+ *
+ * Upon a GC events, it gets updated with the most recent statistics
through
+ * the {@link #addRecord} method.
+ */
+ private static class MemoryEstimator {
+ /** Stores the (x1,x2,...,x5) arrays of data samples, one for each
sample */
+ private Vector<double[]> dataSamples = new Vector<>();
+ /** Stores the y memory usage dataSamples, one for each sample */
+ private Vector<Double> memorySamples = new Vector<>();
+ /** Stores the coefficients computed by the linear regression model */
+ private double[] coefficient = new double[6];
+ /** Stores the column indices that can be used in the regression model
*/
+ private Vector<Integer> validColumnIndices = new Vector<>();
+ /** Potentially out-of-range coefficient values */
+ private double[] extreme = new double[6];
+ /** Indicates whether current coefficients can be used for estimation
*/
+ private boolean isValid = false;
+ /** Implementation of linear regression */
+ private OLSMultipleLinearRegression mlr = new
OLSMultipleLinearRegression();
+ /** Used to synchronize access to the data samples */
+ private Lock lock = new ReentrantLock();
+ /** The estimation method depends on the current superstep. */
+ private long currentSuperstep = -1;
+ /** The estimation method depends on the bytes injected. */
+ private final AtomicLong oocBytesInjected;
+ /** Provides network statistics */
+ private final NetworkMetrics networkMetrics;
+
+ /**
+ * Constructor
+ * @param oocBytesInjected Reference to {@link AtomicLong} object
+ * maintaining the number of OOC bytes stored.
+ * @param networkMetrics Interface to get network stats.
+ */
+ public MemoryEstimator(AtomicLong oocBytesInjected,
+ NetworkMetrics networkMetrics) {
+ this.oocBytesInjected = oocBytesInjected;
+ this.networkMetrics = networkMetrics;
+ }
+
+
+ /**
+ * Clear data structure (called from single threaded program).
+ */
+ public void clear() {
+ dataSamples.clear();
+ memorySamples.clear();
+ isValid = false;
+ }
+
+ public void setCurrentSuperstep(long superstep) {
+ this.currentSuperstep = superstep;
+ }
+
+ /**
+ * Given the current state of computation (i.e. current edges loaded,
+ * vertices computed etc) and the current model (i.e. the regression
+ * coefficient), it returns a prediction about the memory usage in
bytes.
+ *
+ * @return Memory estimate in bytes.
+ */
+ public long getUsageEstimate() {
+ long usage = -1;
+ lock.lock();
+ try {
+ if (isValid) {
+ // Number of edges loaded so far (if in input superstep)
+ long edgesLoaded = currentSuperstep >= 0 ? 0 :
+ EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
+ // Number of vertices loaded so far (if in input superstep)
+ long verticesLoaded = currentSuperstep >= 0 ? 0 :
+
VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
+ // Number of vertices computed (if either in compute or store
phase)
+ long verticesComputed =
WorkerProgress.get().getVerticesComputed() +
+ WorkerProgress.get().getVerticesStored() +
+ AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
+ // Number of bytes received
+ long receivedBytes =
networkMetrics.getBytesReceivedPerSuperstep();
+ // Number of OOC bytes
+ long oocBytes = this.oocBytesInjected.get();
+
+ usage = (long) (edgesLoaded * coefficient[0] +
+ verticesLoaded * coefficient[1] +
+ verticesComputed * coefficient[2] +
+ receivedBytes * coefficient[3] +
+ oocBytes * coefficient[4] +
+ coefficient[5]);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return usage;
+ }
+
+ /**
+ * Updates the linear regression model with a new data point.
+ *
+ * @param memUsed Current real value of memory usage.
+ * @param edges Number of edges loaded.
+ * @param vertices Number of vertices loaded.
+ * @param verticesProcessed Number of vertices processed.
+ * @param bytesReceived Number of bytes received.
+ * @param oocBytesInjected Number of bytes stored/loaded due to OOC.
+ */
+ public void addRecord(long memUsed, long edges, long vertices,
+ long verticesProcessed,
+ long bytesReceived, long oocBytesInjected) {
+ checkState(memUsed > 0, "Memory Usage cannot be negative");
+ if (dataSamples.size() > 0) {
+ double[] last = dataSamples.get(dataSamples.size() - 1);
+ if (edges == last[0] && vertices == last[1] &&
+ verticesProcessed == last[2] && bytesReceived == last[3] &&
+ oocBytesInjected == last[4]) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "addRecord: avoiding to add the same entry as the last
one!");
+ }
+ return;
+ }
+ }
+ dataSamples.add(new double[] {edges, vertices, verticesProcessed,
+ bytesReceived, oocBytesInjected});
+ memorySamples.add((double) memUsed);
+
+ // Weed out the columns that are all zero
+ validColumnIndices.clear();
+ for (int i = 0; i < 5; ++i) {
+ boolean validIndex = false;
+ // Check if there is a non-zero entry in the column
+ for (double[] value : dataSamples) {
+ if (value[i] != 0) {
+ validIndex = true;
+ break;
+ }
+ }
+ if (validIndex) {
+ // check if all entries are not equal to each other
+ double firstValue = -1;
+ boolean allEqual = true;
+ for (double[] value : dataSamples) {
+ if (firstValue == -1) {
+ firstValue = value[i];
+ } else {
+ if (Math.abs((value[i] - firstValue) / firstValue) > 0.01) {
+ allEqual = false;
+ break;
+ }
+ }
+ }
+ validIndex = !allEqual;
+ if (validIndex) {
+ // Check if the column has linear dependency with another
column
+ for (int col = i + 1; col < 5; ++col) {
+ if (isLinearDependence(dataSamples, i, col)) {
+ validIndex = false;
+ break;
+ }
+ }
+ }
+ }
+
+ if (validIndex) {
+ validColumnIndices.add(i);
+ }
+ }
+
+ // If we filtered out columns in the previous step, we are going to
run
+ // the regression without those columns.
+
+ // Create the coefficient table
+ boolean setIsValid = false;
+ lock.lock();
+ try {
+ if (validColumnIndices.size() >= 1 &&
+ dataSamples.size() >= validColumnIndices.size() + 1) {
+
+ double[][] xValues = new double[dataSamples.size()][];
+ double[] yValues = new double[memorySamples.size()];
+ fillXMatrix(dataSamples, validColumnIndices, xValues);
+ copyVectorToArray(memorySamples, yValues);
+ mlr.newSampleData(yValues, xValues);
+ calculateRegression(coefficient, validColumnIndices, mlr);
+
+ // After the computation of the regression, some coefficients
may have
+ // values outside the valid value range. In this case, we set the
+ // coefficient to the minimum or maximum value allowed, and
re-run the
+ // regression.
+ boolean changed;
+ extreme[3] = -1;
+ extreme[4] = -1;
+ do {
+ changed = refineCoefficient(4, 1, 2, xValues, yValues);
+ changed |= refineCoefficient(3, 0, 2, xValues, yValues);
+ } while (changed);
+ if (extreme[3] != -1) {
+ coefficient[3] = extreme[3];
+ }
+ if (extreme[4] != -1) {
+ coefficient[4] = extreme[4];
+ }
+ setIsValid = true;
+ return; // the finally-block will execute before return
+ }
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ LOG.warn("addRecord: exception occurred!", e);
+ } finally {
+ // This inner try-finally block is necessary to ensure that the
+ // lock is always released.
+ try {
+ isValid = setIsValid;
+ printStats();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Certain coefficients need to be within a specific range. For
instance,
+ * If the coefficient is not in this range, we set it to the closest
bound
+ * and re-run the linear regression.
+ *
+ * @param coefIndex Coefficient index
+ * @param lowerBound Lower bound
+ * @param upperBound Upper bound
+ * @param xValues double[][] matrix with data samples
+ * @param yValues double[] matrix with y samples
+ * @return True if coefficients were out-of-range
+ * @throws Exception
+ */
+ private boolean refineCoefficient(int coefIndex, double lowerBound,
+ double upperBound, double[][] xValues, double[] yValues)
+ throws Exception {
+
+ boolean result = false;
+ if (coefficient[coefIndex] < lowerBound ||
+ coefficient[coefIndex] > upperBound) {
+
+ double value;
+ if (coefficient[coefIndex] < lowerBound) {
+ value = lowerBound;
+ } else {
+ value = upperBound;
+ }
+ int ptr = -1;
+ if (validColumnIndices.size() >= 1 &&
+ validColumnIndices.get(validColumnIndices.size() - 1) ==
coefIndex) {
+ ptr = validColumnIndices.size() - 1;
+ } else if (validColumnIndices.size() >= 2 &&
+ validColumnIndices.get(validColumnIndices.size() - 2) ==
coefIndex) {
+ ptr = validColumnIndices.size() - 2;
+ }
+ if (ptr != -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addRecord: coefficient at index " + coefIndex +
+ " is wrong in the regression, setting it to " + value);
+ }
+ // remove from valid column
+ validColumnIndices.remove(ptr);
+ // re-create the X matrix
+ fillXMatrix(dataSamples, validColumnIndices, xValues);
+ // adjust Y values
+ for (int i = 0; i < memorySamples.size(); ++i) {
+ yValues[i] -= value * dataSamples.get(i)[coefIndex];
+ }
+ // save new coefficient value in intermediate array
+ extreme[coefIndex] = value;
+ // re-run regression
+ mlr.newSampleData(yValues, xValues);
+ calculateRegression(coefficient, validColumnIndices, mlr);
+ result = true;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "addRecord: coefficient was not in the regression, " +
+ "setting it to the lower bound");
+ }
+ result = false;
+ }
+ coefficient[coefIndex] = value;
+ }
+ return result;
+ }
+
+ /**
+ * Calculates the regression.
+ * @param coefficient Array of coefficients
+ * @param validColumnIndices List of valid columns
+ * @param mlr {@link OLSMultipleLinearRegression} instance.
+ * @throws Exception
+ */
+ private static void calculateRegression(double[] coefficient,
+ Vector<Integer> validColumnIndices, OLSMultipleLinearRegression mlr)
+ throws Exception {
+
+ if (coefficient.length != validColumnIndices.size()) {
+ throw new Exception("There are " + coefficient.length +
+ " coefficients, but " + validColumnIndices.size() +
+ " valid columns in the regression");
+ }
+
+ double[] beta = mlr.estimateRegressionParameters();
+ for (int i = 0; i < coefficient.length; ++i) {
+ coefficient[i] = 0;
+ }
+ for (int i = 0; i < validColumnIndices.size(); ++i) {
+ coefficient[validColumnIndices.get(i)] = beta[i];
+ }
+ coefficient[5] = beta[validColumnIndices.size()];
+ }
+
+ /**
+ * Copies the values from a Vector to an array.
+ * @param source Source vector of values
+ * @param target Target array.
+ */
+ private static void copyVectorToArray(Vector<Double> source,
+ double[] target) {
+ for (int i = 0; i < source.size(); ++i) {
+ target[i] = source.get(i);
+ }
+ }
+
+ /**
+ * Copies values from a Vector of double[] to a double[][]. Takes into
+ * consideration the list of valid column indices.
+ * @param sourceValues Source Vector of double[]
+ * @param validColumnIndices Valid column indices
+ * @param xValues Target double[][] matrix.
+ */
+ private static void fillXMatrix(Vector<double[]> sourceValues,
+ Vector<Integer> validColumnIndices, double[][] xValues) {
+
+ for (int i = 0; i < sourceValues.size(); ++i) {
+ xValues[i] = new double[validColumnIndices.size() + 1];
+ for (int j = 0; j < validColumnIndices.size(); ++j) {
+ xValues[i][j] = sourceValues.get(i)[validColumnIndices.get(j)];
+ }
+ xValues[i][validColumnIndices.size()] = 1;
+ }
+ }
+
+ /**
+ * Utility function that checks whether two doubles are equals given an
+ * accuracy tolerance.
+ *
+ * @param val1 First value
+ * @param val2 Second value
+ * @return True if within a threshold
+ */
+ private static boolean equal(double val1, double val2) {
+ return Math.abs(val1 - val2) < 0.01;
+ }
+
+ /**
+ * Utility function that checks if two columns have linear dependence.
+ *
+ * @param values Matrix in the form of a Vector of double[] values.
+ * @param col1 First column index
+ * @param col2 Second column index
+ * @return True if there is linear dependence.
+ */
+ private static boolean isLinearDependence(Vector<double[]> values,
+ int col1, int col2) {
+
+ Vector<Double> factor = new Vector<>();
+ for (double[] value : values) {
+ double val1 = value[col1];
+ double val2 = value[col2];
+ if (equal(val1, 0)) {
+ if (equal(val2, 0)) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ if (equal(val2, 0)) {
+ return false;
+ }
+ factor.add(val1 / val2);
+ }
+
+ if (factor.size() < 2) {
--- End diff --
Not really. `factors.size()` is the number of non-zero elements in the two
columns of the given matrix. For instance, the following can be the matrix:
0, 0
1, 2
3, 5
0, 0
Here `values.length` is 4, but `factor.size()` is 2. Basically, the loop
above this line weed out the rows that are 0 in both columns from the
dependency check.
> Add memory estimation mechanism to out-of-core
> ----------------------------------------------
>
> Key: GIRAPH-1125
> URL: https://issues.apache.org/jira/browse/GIRAPH-1125
> Project: Giraph
> Issue Type: Improvement
> Reporter: Hassan Eslami
> Assignee: Hassan Eslami
>
> The new out-of-core mechanism is designed with the adaptivity goal in mind,
> meaning that we wanted out-of-core mechanism to kick in only when it is
> necessary. In other words, when the amount of data (graph, messages, and
> mutations) all fit in memory, we want to take advantage of the entire memory.
> And, when in a stage the memory is short, only enough (minimal) amount of
> data goes out of core (to disk). This ensures a good performance for the
> out-of-core mechanism.
> To satisfy the adaptiveness goal, we need to know how much memory is used at
> each point of time. The default out-of-core mechanism (ThresholdBasedOracle)
> get memory information based on JVM's internal methods (Runtime's
> freeMemory()). This method is inaccurate (and pessimistic), meaning that it
> does not account for garbage data that has not been purged by GC. Using JVM's
> default methods, OOC behaves pessimistically and move data out of core even
> if it is not necessary. For instance, consider the case where there are a lot
> of garbage on the heap, but GC has not happened for a while. In this case,
> the default OOC pushes data on disk and immediately after a major GC it
> brings back the data to memory. This causes inefficiency in the default out
> of core mechanism. If out-of-core is used but the data can entirely fit in
> memory, the job goes out of core even though going out of core is not
> necessary.
> To address this issue, we need to have a mechanism to more accurately know
> how much of heap is filled with non-garbage data. Consequently, we need to
> change the Oracle (OOC policy) to take advantage of a more accurate memory
> usage estimation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)