http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java new file mode 100644 index 0000000..fa8e6bd --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.giraph.ooc.io.IOCommand; + +/** + * Interface for any out-of-core oracle. An out-of-core oracle is the brain of + * the out-of-core mechanism, determining/deciding on out-of-core actions (load + * or store) that should happen. + */ +public interface OutOfCoreOracle { + /** + * Different types of IO actions that can potentially lead to a more desired + * state of computation for out-of-core mechanism. These actions are issued + * based on the status of the memory (memory pressure, rate of data transfer + * to memory, etc.) + */ + enum IOAction { + /** + * Either of: + * - storing incoming messages of any partition currently on disk, or + * - storing incoming messages' raw data buffer of any partition + * currently on disk, or + * - storing partitions' raw data buffer for those partitions that are + * currently on disk. + */ + STORE_MESSAGES_AND_BUFFERS, + /** + * Storing a partition that is *processed* in the current iteration cycle. + * This action is also known as "soft store" + */ + STORE_PROCESSED_PARTITION, + /** + * Storing a partition from memory on disk, prioritizing to *processed* + * partitions on memory. However, if there is no *processed* partition, + * store should happen at any cost, even if an *unprocessed* partition has + * to be stored. This action is also know as "hard store". + */ + STORE_PARTITION, + /** + * Loading an *unprocessed* partition from disk to memory, only if there are + * *processed* partitions in memory. This action basically initiates a swap + * operation. + */ + LOAD_TO_SWAP_PARTITION, + /** + * Loading an *unprocessed* partition from disk to memory. This action is + * also known as "soft load". + */ + LOAD_UNPROCESSED_PARTITION, + /** + * Loading a partition (prioritizing *unprocessed* over *processed*) from + * disk to memory. Loading a *processed* partition to memory is a prefetch + * of that partition to be processed in the next superstep. This action is + * also known as "hard load". + */ + LOAD_PARTITION, + /** + * Loading a partition regardless of the memory situation. An out-of-core + * mechanism may use this action to signal IO threads that it is allowed to + * load a partition that is specifically requested. + */ + URGENT_LOAD_PARTITION + } + + /** + * Get the next set of viable IO actions to help bring memory to a more + * desired state. + * + * @return an array of viable IO actions, sorted from highest priority to + * lowest priority + */ + IOAction[] getNextIOActions(); + + /** + * Whether a command is appropriate to bring the memory to a more desired + * state. A command is not executed unless it is approved by the oracle. This + * method is specially important where there are multiple IO threads + * performing IO operations for the out-of-core mechanism. The approval + * becomes significantly important to prevent all IO threads from performing + * identical command type, if that is a necessity. For instance, execution of + * a particular command type by only one thread may bring the memory to a + * desired state, and the rest of IO threads may perform other types of + * commands. + * + * @param command the IO command that is about to execute + * @return 'true' if the command is approved for execution. 'false' if the + * command should not be executed + */ + boolean approve(IOCommand command); + + /** + * Notification of command completion. Oracle may update its status and commit + * the changes a command may cause. + * + * @param command the IO command that is completed + */ + void commandCompleted(IOCommand command); + + /** + * Notification of GC completion. Oracle may take certain decisions based on + * GC information (such as amount of time it took, memory it reclaimed, etc.) + * + * @param gcInfo GC information + */ + void gcCompleted(GarbageCollectionNotificationInfo gcInfo); + + /** + * Shut down the out-of-core oracle. Necessary specifically for cases where + * out-of-core oracle is using additional monitoring threads. + */ + void shutdown(); +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java new file mode 100644 index 0000000..0dfc9de --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import com.google.common.collect.Maps; +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.ooc.io.IOCommand; +import org.apache.giraph.ooc.io.LoadPartitionIOCommand; +import org.apache.giraph.ooc.io.WaitIOCommand; +import org.apache.log4j.Logger; + +import java.lang.management.MemoryUsage; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Out-of-core oracle to adaptively control data kept in memory, with the goal + * of keeping the memory state constantly at a desired state. This oracle + * monitors GC behavior to keep track of memory pressure. + * + * After each GC is done, this oracle retrieve statistics about the memory + * pressure (memory used, max memory, and how far away memory is compared to a + * max optimal pressure). Based on the the past 2 recent memory statistics, + * the oracle predicts the status of the memory, and sets the rate of load/store + * of data from/to disk. If the rate of loading data from disk is 'l', and the + * rate of storing data to disk is 's', the rate of data injection to memory + * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should + * be based on the prediction of memory status. + * + * Assume that based on the previous GC call the memory usage at time t_0 is + * m_0, and based on the most recent GC call the memory usage at time t_1 is + * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0). + * Assume that the ideal memory pressure happens when the memory usage is + * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means + * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date + * injection rate to memory so far was i, the new injection rate should be: + * i_new = i - (alpha - beta) + */ +public class SimpleGCMonitoringOracle implements OutOfCoreOracle { + /** + * The optimal memory pressure at which GC behavior is close to ideal. This + * fraction may be dependant on the GC strategy used for running a job, but + * generally should not be dependent on the graph processing application. + */ + public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE = + new FloatConfOption("giraph.optimalMemoryPressure", 0.8f, + "The memory pressure (fraction of used memory) at which the job " + + "shows the optimal GC behavior. This fraction may be dependent " + + "on the GC strategy used in running the job."); + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(SimpleGCMonitoringOracle.class); + /** Cached value for OPTIMAL_MEMORY_PRESSURE */ + private final float optimalMemoryPressure; + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + /** Status of memory from the last GC call */ + private GCObservation lastGCObservation; + /** Desired rate of data injection to memory */ + private final AtomicLong desiredDiskToMemoryDataRate = + new AtomicLong(0); + /** Number of on the fly (outstanding) IO commands for each command type */ + private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences = + Maps.newConcurrentMap(); + + /** + * Constructor + * + * @param conf configuration + * @param oocEngine out-of-core engine + */ + public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf, + OutOfCoreEngine oocEngine) { + this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf); + this.oocEngine = oocEngine; + this.lastGCObservation = new GCObservation(-1, 0, 0); + for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) { + commandOccurrences.put(type, new AtomicInteger(0)); + } + } + + @Override + public synchronized void gcCompleted(GarbageCollectionNotificationInfo + gcInfo) { + long time = System.currentTimeMillis(); + Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo() + .getMemoryUsageAfterGc(); + long usedMemory = 0; + long maxMemory = 0; + for (MemoryUsage memDetail : memAfter.values()) { + usedMemory += memDetail.getUsed(); + maxMemory += memDetail.getMax(); + } + GCObservation observation = new GCObservation(time, usedMemory, maxMemory); + if (LOG.isInfoEnabled()) { + LOG.info("gcCompleted: GC completed with: " + observation); + } + // Whether this is not the first GC call in the application + if (lastGCObservation.isValid()) { + long deltaDataRate = + lastGCObservation.getDesiredDeltaDataRate(observation); + long diskBandwidthEstimate = + oocEngine.getIOStatistics().getDiskBandwidth(); + // Update the desired data injection rate to memory. The data injection + // rate cannot be less than -disk_bandwidth (the extreme case happens if + // we only do 'store'), and cannot be more than disk_bandwidth (the + // extreme case happens if we only do 'load'). + long dataInjectionRate = desiredDiskToMemoryDataRate.get(); + desiredDiskToMemoryDataRate.set(Math.max( + Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate, + diskBandwidthEstimate), -diskBandwidthEstimate)); + if (LOG.isInfoEnabled()) { + LOG.info("gcCompleted: changing data injection rate from " + + String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) + + " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() / + 1024.0 / 1024.0)); + } + } + lastGCObservation = observation; + } + + /** + * Get the current data injection rate to memory based on the commands ran + * in the history (retrieved from statistics collector), and outstanding + * commands issued by the IO scheduler. + * + * @return the current data injection rate to memory + */ + private long getCurrentDataInjectionRate() { + long effectiveBytesTransferred = 0; + long effectiveDuration = 0; + for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) { + OutOfCoreIOStatistics.BytesDuration stats = + oocEngine.getIOStatistics().getCommandTypeStats(type); + int occurrence = commandOccurrences.get(type).get(); + long typeBytesTransferred = stats.getBytes(); + long typeDuration = stats.getDuration(); + // If there is an outstanding command, we still do not know how many bytes + // it will transfer, and how long it will take. So, we guesstimate these + // numbers based on other similar commands happened in the history. We + // simply take the average number of bytes transferred for the particular + // command, and we take average duration for the particular command. We + // should multiply these numbers by the number of outstanding commands of + // this particular command type. + if (stats.getOccurrence() != 0) { + typeBytesTransferred += stats.getBytes() / stats.getOccurrence() * + occurrence; + typeDuration += stats.getDuration() / stats.getOccurrence() * + occurrence; + } + if (type == IOCommand.IOCommandType.LOAD_PARTITION) { + effectiveBytesTransferred += typeBytesTransferred; + } else { + // Store (data going out of memory), or wait (no data transferred) + effectiveBytesTransferred -= typeBytesTransferred; + } + effectiveDuration += typeDuration; + } + if (effectiveDuration == 0) { + return 0; + } else { + return effectiveBytesTransferred / effectiveDuration; + } + } + + @Override + public IOAction[] getNextIOActions() { + long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05); + long desiredRate = desiredDiskToMemoryDataRate.get(); + long currentRate = getCurrentDataInjectionRate(); + if (desiredRate > error) { + // 'l-s' is positive, we should do more load than store. + if (currentRate > desiredRate + error) { + // We should decrease 'l-s'. This can be done either by increasing 's' + // or issuing wait command. We prioritize wait over hard store. + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION}; + } else if (currentRate < desiredRate - error) { + // We should increase 'l-s'. We can simply load partitions/data. + return new IOAction[]{IOAction.LOAD_PARTITION}; + } else { + // We are in a proper state and we should keep up with the rate. We can + // either soft store data or load data (hard load, since we desired rate + // is positive). + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION, + IOAction.LOAD_PARTITION}; + } + } else if (desiredRate < -error) { + // 'l-s' is negative, we should do more store than load. + if (currentRate < desiredRate - error) { + // We should increase 'l-s', but we should be cautious. We only do soft + // load, or wait. + return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION}; + } else if (currentRate > desiredRate + error) { + // We should reduce 'l-s', we do hard store. + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PARTITION}; + } else { + // We should keep up with the rate. We can either soft store data, or + // soft load data. + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION, + IOAction.LOAD_UNPROCESSED_PARTITION}; + } + } else { + // 'l-s' is almost zero. If current rate is over the desired rate, we do + // soft store. If the current rate is below the desired rate, we do soft + // load. + if (currentRate > desiredRate + error) { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION}; + } else if (currentRate < desiredRate - error) { + return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION}; + } else { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION, + IOAction.LOAD_UNPROCESSED_PARTITION}; + } + } + } + + @Override + public synchronized boolean approve(IOCommand command) { + long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05); + long desiredRate = desiredDiskToMemoryDataRate.get(); + long currentRate = getCurrentDataInjectionRate(); + // The command is denied iff the current rate is above the desired rate and + // we are doing load (instead of store), or the current rate is below the + // desired rate and we are doing store (instead of loading). + if (currentRate > desiredRate + error && + command instanceof LoadPartitionIOCommand) { + return false; + } + if (currentRate < desiredRate - error && + !(command instanceof LoadPartitionIOCommand) && + !(command instanceof WaitIOCommand)) { + return false; + } + commandOccurrences.get(command.getType()).getAndIncrement(); + return true; + } + + @Override + public void commandCompleted(IOCommand command) { + commandOccurrences.get(command.getType()).getAndDecrement(); + } + + @Override + public void shutdown() { } + + /** Helper class to record memory status after GC calls */ + private class GCObservation { + /** The time at which the GC happened (in milliseconds) */ + private long time; + /** Amount of memory used after the GC call */ + private long usedMemory; + /** Maximum amounts of memory reported by GC listener */ + private long maxMemory; + + /** + * Constructor + * + * @param time time of GC + * @param usedMemory amount of used memory after GC + * @param maxMemory amount of all available memory based on GC observation + */ + public GCObservation(long time, long usedMemory, long maxMemory) { + this.time = time; + this.usedMemory = usedMemory; + this.maxMemory = maxMemory; + } + + /** + * Is this a valid observation? + * + * @return true iff it is a valid observation + */ + public boolean isValid() { + return time > 0; + } + + /** + * Considering a new observation of memory status after the most recent GC, + * what is the desired rate for data injection to memory. + * + * @param newObservation the most recent GC observation + * @return desired rate of data injection to memory + */ + public long getDesiredDeltaDataRate(GCObservation newObservation) { + long newUsedMemory = newObservation.usedMemory; + long newMaxMemory = newObservation.maxMemory; + long lastUsedMemory = usedMemory; + long lastMaxMemory = maxMemory; + // Scale the memory status of two GC observation to be the same + long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory); + newUsedMemory = + (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory); + lastUsedMemory = + (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory); + long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory); + if (LOG.isInfoEnabled()) { + LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " + + "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format( + "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) + + String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 / + 1024.0)); + } + long interval = newObservation.time - time; + if (interval == 0) { + interval = 1; + LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " + + "time!"); + } + long currentDataRate = (long) ((double) (newUsedMemory - + lastUsedMemory) / interval * 1000); + long desiredDataRate = (long) ((double) (desiredUsedMemory - + newUsedMemory) / interval * 1000); + return currentDataRate - desiredDataRate; + } + + @Override + public String toString() { + return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " + + "time: %d ms)", usedMemory / 1024.0 / 1024.0, + maxMemory / 1024.0 / 1024.0, time); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java new file mode 100644 index 0000000..3e05dce --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.giraph.comm.flow_control.CreditBasedFlowControl; +import org.apache.giraph.comm.flow_control.FlowControl; +import org.apache.giraph.comm.netty.NettyClient; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.LongConfOption; +import org.apache.giraph.ooc.io.IOCommand; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.MemoryUtils; +import org.apache.giraph.utils.ThreadUtils; +import org.apache.log4j.Logger; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Out-of-core oracle to adaptively control data kept in memory, with the goal + * of keeping the memory usage at a desired state. Out-of-core policy in this + * oracle is based on several user-defined thresholds. Also, this oracle spawns + * a thread to periodically check the memory usage. This thread would issue + * manual GC calls if JVM fails to call major/full GC for a while and the amount + * of used memory is about to cause high-memory pressure. This oracle, also, + * monitors GC activities. The monitoring mechanism looks for major/full GC + * calls, and updates out-of-core decisions based on the amount of available + * memory after such GCs. There are three out-of-core decisions: + * - Which IO operations should be done (load/offload of partitions and + * messages) + * - What the incoming messages rate should be (updating credits announced by + * this worker in credit-based flow-control mechanism) + * - How many processing threads should remain active (tethering rate of + * data generation) + * + * The following table shows the relationship of these decisions and + * used-defined thresholds. + * -------------------------------------------------------------- + * Memory Pressure | Manual | IO | Credit | Active | + * (memory usage) | GC? | Action | | Threads | + * -------------------------------------------------------------- + * | Yes | hard | 0 | 0 | + * | | store | | | + * failPressure ------------------------------------------------- + * | Yes | hard | 0 | fraction | + * | | store | | | + * emergencyPressure -------------------------------------------- + * | Yes | hard | fraction | max | + * | | store | | | + * highPressure ------------------------------------------------- + * | No | soft | fraction | max | + * | | store | | | + * optimalPressure ---------------------------------------------- + * | No | soft | max | max | + * | | load | | | + * lowPressure -------------------------------------------------- + * | No | hard | max | max | + * | | load | | | + * -------------------------------------------------------------- + * + */ +public class ThresholdBasedOracle implements OutOfCoreOracle { + /** The memory pressure at/above which the job would fail */ + public static final FloatConfOption FAIL_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.failPressure", 0.975f, + "The memory pressure (fraction of used memory) at/above which the " + + "job would fail."); + /** + * The memory pressure at which the job is cloe to fail, even though we were + * using maximal disk bandwidth and minimal network rate. We should reduce + * job processing rate. + */ + public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.emergencyPressure", 0.925f, + "The memory pressure (fraction of used memory) at which the job " + + "is close to fail, hence we should reduce its processing rate " + + "as much as possible."); + /** The memory pressure at which the job is suffering from GC overhead. */ + public static final FloatConfOption HIGH_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.highPressure", 0.875f, + "The memory pressure (fraction of used memory) at which the job " + + "is suffering from GC overhead."); + /** + * The memory pressure at which we expect GC to perform optimally for a + * memory intensive job. + */ + public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.optimalPressure", 0.8f, + "The memory pressure (fraction of used memory) at which a " + + "memory-intensive job shows the optimal GC behavior."); + /** + * The memory pressure at/below which the job can use more memory without + * suffering from GC overhead. + */ + public static final FloatConfOption LOW_MEMORY_PRESSURE = + new FloatConfOption("giraph.memory.lowPressure", 0.7f, + "The memory pressure (fraction of used memory) at/below which the " + + "job can use more memory without suffering the performance."); + /** The interval at which memory observer thread wakes up. */ + public static final LongConfOption CHECK_MEMORY_INTERVAL = + new LongConfOption("giraph.checkMemoryInterval", 2500, + "The interval/period where memory observer thread wakes up and " + + "monitors memory footprint (in milliseconds)"); + /** + * Memory observer thread would manually call GC if major/full GC has not + * been called for a while. The period where we expect GC to be happened in + * past is specified in this parameter + */ + public static final LongConfOption LAST_GC_CALL_INTERVAL = + new LongConfOption("giraph.lastGcCallInterval", 10 * 1000, + "How long after last major/full GC should we call manual GC?"); + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(ThresholdBasedOracle.class); + /** Cached value for FAIL_MEMORY_PRESSURE */ + private final float failMemoryPressure; + /** Cached value for EMERGENCY_MEMORY_PRESSURE */ + private final float emergencyMemoryPressure; + /** Cached value for HIGH_MEMORY_PRESSURE */ + private final float highMemoryPressure; + /** Cached value for OPTIMAL_MEMORY_PRESSURE */ + private final float optimalMemoryPressure; + /** Cached value for LOW_MEMORY_PRESSURE */ + private final float lowMemoryPressure; + /** Cached value for CHECK_MEMORY_INTERVAL */ + private final long checkMemoryInterval; + /** Cached value for LAST_GC_CALL_INTERVAL */ + private final long lastGCCallInterval; + /** + * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max + * credit used for credit-based flow-control mechanism) + */ + private final short maxRequestsCredit; + /** + * Whether the job is shutting down. Used for terminating the memory + * observer thread. + */ + private final CountDownLatch shouldTerminate; + /** Result of memory observer thread */ + private final Future<Void> checkMemoryThreadResult; + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + /** Last time a major/full GC has been called (in milliseconds) */ + private volatile long lastMajorGCTime; + /** Last time a non major/full GC has been called (in milliseconds) */ + private volatile long lastMinorGCTime; + + /** + * Constructor + * + * @param conf configuration + * @param oocEngine out-of-core engine + */ + public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf, + OutOfCoreEngine oocEngine) { + this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf); + this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf); + this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf); + this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf); + this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf); + this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf); + this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf); + this.maxRequestsCredit = (short) + CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf); + NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true); + boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf); + checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " + + "must be enabled. Use giraph.waitForPerWorkerRequests=true"); + this.shouldTerminate = new CountDownLatch(1); + this.oocEngine = oocEngine; + this.lastMajorGCTime = 0; + + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + while (true) { + boolean done = shouldTerminate.await(checkMemoryInterval, + TimeUnit.MILLISECONDS); + if (done) { + break; + } + double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); + long time = System.currentTimeMillis(); + if ((usedMemoryFraction > highMemoryPressure && + time - lastMajorGCTime >= lastGCCallInterval) || + (usedMemoryFraction > optimalMemoryPressure && + time - lastMajorGCTime >= lastGCCallInterval && + time - lastMinorGCTime >= lastGCCallInterval)) { + if (LOG.isInfoEnabled()) { + LOG.info("call: last GC happened a while ago and the " + + "amount of used memory is high (used memory " + + "fraction is " + + String.format("%.2f", usedMemoryFraction) + "). " + + "Calling GC manually"); + } + System.gc(); + time = System.currentTimeMillis() - time; + usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); + if (LOG.isInfoEnabled()) { + LOG.info("call: manual GC is done. It took " + + String.format("%.2f", (double) time / 1000) + + " seconds. Used memory fraction is " + + String.format("%.2f", usedMemoryFraction)); + } + } + updateRates(usedMemoryFraction); + } + return null; + } + }; + } + }; + ExecutorService executor = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("check-memory")); + this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>( + callableFactory.newCallable(0))); + executor.shutdown(); + } + + /** + * upon major/full GC calls. + */ + /** + * Update statistics and rate regarding communication credits and number of + * active threads. + * + * @param usedMemoryFraction the fraction of used memory over max memory + */ + public void updateRates(double usedMemoryFraction) { + // Update the fraction of processing threads that should remain active + if (usedMemoryFraction >= failMemoryPressure) { + oocEngine.updateActiveThreadsFraction(0); + } else if (usedMemoryFraction < emergencyMemoryPressure) { + oocEngine.updateActiveThreadsFraction(1); + } else { + oocEngine.updateActiveThreadsFraction(1 - + (usedMemoryFraction - emergencyMemoryPressure) / + (failMemoryPressure - emergencyMemoryPressure)); + } + + // Update the fraction of credit that should be used in credit-based flow- + // control + if (usedMemoryFraction >= emergencyMemoryPressure) { + updateRequestsCredit((short) 0); + } else if (usedMemoryFraction < optimalMemoryPressure) { + updateRequestsCredit(maxRequestsCredit); + } else { + updateRequestsCredit((short) (maxRequestsCredit * + (1 - (usedMemoryFraction - optimalMemoryPressure) / + (emergencyMemoryPressure - optimalMemoryPressure)))); + } + } + + @Override + public IOAction[] getNextIOActions() { + double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction(); + if (LOG.isInfoEnabled()) { + LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f", + usedMemoryFraction)); + } + if (usedMemoryFraction > highMemoryPressure) { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PARTITION}; + } else if (usedMemoryFraction > optimalMemoryPressure) { + return new IOAction[]{ + IOAction.LOAD_UNPROCESSED_PARTITION, + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PROCESSED_PARTITION}; + } else if (usedMemoryFraction > lowMemoryPressure) { + return new IOAction[]{ + IOAction.LOAD_UNPROCESSED_PARTITION, + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.LOAD_PARTITION}; + } else { + return new IOAction[]{IOAction.LOAD_PARTITION}; + } + } + + @Override + public boolean approve(IOCommand command) { + return true; + } + + @Override + public void commandCompleted(IOCommand command) { + // Do nothing + } + + @Override + public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { + String gcAction = gcInfo.getGcAction().toLowerCase(); + if (gcAction.contains("full") || gcAction.contains("major")) { + if (!gcInfo.getGcCause().contains("No GC")) { + lastMajorGCTime = System.currentTimeMillis(); + } + } else { + lastMinorGCTime = System.currentTimeMillis(); + } + } + + @Override + public void shutdown() { + shouldTerminate.countDown(); + try { + checkMemoryThreadResult.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("shutdown: caught exception while waiting on check-memory " + + "thread to terminate!"); + throw new IllegalStateException(e); + } + if (LOG.isInfoEnabled()) { + LOG.info("shutdown: ThresholdBasedOracle shutdown complete!"); + } + } + + /** + * Update the credit announced for this worker in Netty. The lower the credit + * is, the lower rate incoming messages arrive at this worker. Thus, credit + * is an indirect way of controlling amount of memory incoming messages would + * take. + * + * @param newCredit the new credit to announce to other workers + */ + private void updateRequestsCredit(short newCredit) { + if (LOG.isInfoEnabled()) { + LOG.info("updateRequestsCredit: updating the credit to " + newCredit); + } + FlowControl flowControl = oocEngine.getFlowControl(); + if (flowControl != null) { + ((CreditBasedFlowControl) flowControl).updateCredit(newCredit); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java index 7909100..53de52f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java @@ -125,21 +125,21 @@ public class DiskBackedEdgeStore<I extends WritableComparable, } @Override - public void loadPartitionData(int partitionId, String basePath) + public long loadPartitionData(int partitionId, String basePath) throws IOException { - super.loadPartitionData(partitionId, getPath(basePath)); + return super.loadPartitionData(partitionId, getPath(basePath)); } @Override - public void offloadPartitionData(int partitionId, String basePath) + public long offloadPartitionData(int partitionId, String basePath) throws IOException { - super.offloadPartitionData(partitionId, getPath(basePath)); + return super.offloadPartitionData(partitionId, getPath(basePath)); } @Override - public void offloadBuffers(int partitionId, String basePath) + public long offloadBuffers(int partitionId, String basePath) throws IOException { - super.offloadBuffers(partitionId, getPath(basePath)); + return super.offloadBuffers(partitionId, getPath(basePath)); } @Override @@ -157,8 +157,9 @@ public class DiskBackedEdgeStore<I extends WritableComparable, } @Override - protected void loadInMemoryPartitionData(int partitionId, String path) + protected long loadInMemoryPartitionData(int partitionId, String path) throws IOException { + long numBytes = 0; File file = new File(path); if (file.exists()) { if (LOG.isDebugEnabled()) { @@ -170,14 +171,17 @@ public class DiskBackedEdgeStore<I extends WritableComparable, DataInputStream dis = new DataInputStream(bis); edgeStore.readPartitionEdgeStore(partitionId, dis); dis.close(); + numBytes = file.length(); checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + "%s.", file.getAbsoluteFile()); } + return numBytes; } @Override - protected void offloadInMemoryPartitionData(int partitionId, String path) + protected long offloadInMemoryPartitionData(int partitionId, String path) throws IOException { + long numBytes = 0; if (edgeStore.hasEdgesForPartition(partitionId)) { File file = new File(path); checkState(!file.exists(), "offloadInMemoryPartitionData: edge store " + @@ -190,7 +194,9 @@ public class DiskBackedEdgeStore<I extends WritableComparable, DataOutputStream dos = new DataOutputStream(bos); edgeStore.writePartitionEdgeStore(partitionId, dos); dos.close(); + numBytes = dos.size(); } + return numBytes; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java index 2a40a58..9e56d99 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java @@ -20,6 +20,7 @@ package org.apache.giraph.ooc.data; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.utils.ByteArrayOneMessageToManyIds; import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.VertexIdMessages; @@ -60,6 +61,8 @@ public class DiskBackedMessageStore<I extends WritableComparable, private final boolean useMessageCombiner; /** Which superstep this message store is used for */ private final long superstep; + /** Message value class */ + private final MessageValueFactory<M> messageValueFactory; /** * Type of VertexIdMessage class (container for serialized messages) received @@ -94,6 +97,7 @@ public class DiskBackedMessageStore<I extends WritableComparable, this.messageStore = messageStore; this.useMessageCombiner = useMessageCombiner; this.superstep = superstep; + this.messageValueFactory = config.createOutgoingMessageValueFactory(); } @Override @@ -148,26 +152,33 @@ public class DiskBackedMessageStore<I extends WritableComparable, } @Override - public void loadPartitionData(int partitionId, String basePath) + public long loadPartitionData(int partitionId, String basePath) throws IOException { if (!useMessageCombiner) { - super.loadPartitionData(partitionId, getPath(basePath, superstep)); + return super.loadPartitionData(partitionId, getPath(basePath, superstep)); + } else { + return 0; } } @Override - public void offloadPartitionData(int partitionId, String basePath) + public long offloadPartitionData(int partitionId, String basePath) throws IOException { if (!useMessageCombiner) { - super.offloadPartitionData(partitionId, getPath(basePath, superstep)); + return + super.offloadPartitionData(partitionId, getPath(basePath, superstep)); + } else { + return 0; } } @Override - public void offloadBuffers(int partitionId, String basePath) + public long offloadBuffers(int partitionId, String basePath) throws IOException { if (!useMessageCombiner) { - super.offloadBuffers(partitionId, getPath(basePath, superstep)); + return super.offloadBuffers(partitionId, getPath(basePath, superstep)); + } else { + return 0; } } @@ -210,26 +221,24 @@ public class DiskBackedMessageStore<I extends WritableComparable, throw new IllegalStateException("writeEntry: serialized message " + "type is not supported"); } - out.writeInt(messageClass.ordinal()); + out.writeByte(messageClass.ordinal()); messages.write(out); } @Override protected VertexIdMessages<I, M> readNextEntry(DataInput in) throws IOException { - int messageType = in.readInt(); + byte messageType = in.readByte(); SerializedMessageClass messageClass = SerializedMessageClass.values()[messageType]; VertexIdMessages<I, M> vim; switch (messageClass) { case BYTE_ARRAY_VERTEX_ID_MESSAGES: - vim = new ByteArrayVertexIdMessages<>( - config.<M>createOutgoingMessageValueFactory()); + vim = new ByteArrayVertexIdMessages<>(messageValueFactory); vim.setConf(config); break; case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS: - vim = new ByteArrayOneMessageToManyIds<>( - config.<M>createOutgoingMessageValueFactory()); + vim = new ByteArrayOneMessageToManyIds<>(messageValueFactory); vim.setConf(config); break; default: @@ -241,8 +250,9 @@ public class DiskBackedMessageStore<I extends WritableComparable, } @Override - protected void loadInMemoryPartitionData(int partitionId, String basePath) + protected long loadInMemoryPartitionData(int partitionId, String basePath) throws IOException { + long numBytes = 0; File file = new File(basePath); if (file.exists()) { if (LOG.isDebugEnabled()) { @@ -254,14 +264,17 @@ public class DiskBackedMessageStore<I extends WritableComparable, DataInputStream dis = new DataInputStream(bis); messageStore.readFieldsForPartition(dis, partitionId); dis.close(); + numBytes = file.length(); checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + "%s.", file.getAbsoluteFile()); } + return numBytes; } @Override - protected void offloadInMemoryPartitionData(int partitionId, String basePath) + protected long offloadInMemoryPartitionData(int partitionId, String basePath) throws IOException { + long numBytes = 0; if (messageStore.hasMessagesForPartition(partitionId)) { File file = new File(basePath); checkState(!file.exists(), "offloadInMemoryPartitionData: message store" + @@ -275,7 +288,9 @@ public class DiskBackedMessageStore<I extends WritableComparable, messageStore.writePartition(outputStream, partitionId); messageStore.clearPartition(partitionId); outputStream.close(); + numBytes += outputStream.size(); } + return numBytes; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java index 5854c8d..2a5e47a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java @@ -130,7 +130,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, public Partition<I, V, E> removePartition(Integer partitionId) { // Set the partition as 'in process' so its data and messages do not get // spilled to disk until the remove is complete. - oocEngine.getMetaPartitionManager().makePartitionInaccessible(partitionId); + oocEngine.getMetaPartitionManager().markPartitionAsInProcess(partitionId); oocEngine.retrievePartition(partitionId); Partition<I, V, E> partition = partitionStore.removePartition(partitionId); checkNotNull(partition, "removePartition: partition " + partitionId + @@ -262,8 +262,12 @@ public class DiskBackedPartitionStore<I extends WritableComparable, throws IOException { I id = conf.createVertexId(); id.readFields(in); - V value = conf.createVertexValue(); - value.readFields(in); + V value = null; + boolean hasNullValue = in.readBoolean(); + if (!hasNullValue) { + value = conf.createVertexValue(); + value.readFields(in); + } OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0); vertex.initialize(id, value, edges); if (in.readBoolean()) { @@ -291,8 +295,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - protected void loadInMemoryPartitionData(int partitionId, String path) + protected long loadInMemoryPartitionData(int partitionId, String path) throws IOException { + long numBytes = 0; // Load vertices File file = new File(getVerticesPath(path)); if (file.exists()) { @@ -312,6 +317,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, partition.putVertex(vertex); } inputStream.close(); + numBytes += file.length(); checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + "%s", file.getAbsolutePath()); @@ -329,15 +335,17 @@ public class DiskBackedPartitionStore<I extends WritableComparable, readOutEdges(inputStream, partition); } inputStream.close(); + numBytes += file.length(); // If the graph is static and it is not INPUT_SUPERSTEP, keep the file // around. if (!conf.isStaticGraph() || - serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) { + oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) { checkState(file.delete(), "loadPartition: failed to delete %s", file.getAbsolutePath()); } partitionStore.addPartition(partition); } + return numBytes; } @Override @@ -355,15 +363,15 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - public void loadPartitionData(int partitionId, String basePath) + public long loadPartitionData(int partitionId, String basePath) throws IOException { - super.loadPartitionData(partitionId, getPath(basePath)); + return super.loadPartitionData(partitionId, getPath(basePath)); } @Override - public void offloadPartitionData(int partitionId, String basePath) + public long offloadPartitionData(int partitionId, String basePath) throws IOException { - super.offloadPartitionData(partitionId, getPath(basePath)); + return super.offloadPartitionData(partitionId, getPath(basePath)); } /** @@ -376,7 +384,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable, private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex) throws IOException { vertex.getId().write(output); - vertex.getValue().write(output); + V value = vertex.getValue(); + if (value != null) { + output.writeBoolean(false); + value.write(output); + } else { + output.writeBoolean(true); + } output.writeBoolean(vertex.isHalted()); } @@ -395,8 +409,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - protected void offloadInMemoryPartitionData(int partitionId, String path) + protected long offloadInMemoryPartitionData(int partitionId, String path) throws IOException { + long numBytes = 0; if (partitionStore.hasPartition(partitionId)) { partitionVertexCount.put(partitionId, partitionStore.getPartitionVertexCount(partitionId)); @@ -423,13 +438,14 @@ public class DiskBackedPartitionStore<I extends WritableComparable, writeVertexData(outputStream, vertex); } outputStream.close(); + numBytes += outputStream.size(); // Avoid writing back edges if we have already written them once and // the graph is not changing. // If we are in the input superstep, we need to write the files // at least the first time, even though the graph is static. file = new File(getEdgesPath(path)); - if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP || + if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP || partitionVertexCount.get(partitionId) == null || partitionVertexCount.get(partitionId) != partition.getVertexCount() || !conf.isStaticGraph() || !file.exists()) { @@ -446,8 +462,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable, writeOutEdges(outputStream, vertex); } outputStream.close(); + numBytes += outputStream.size(); } } + return numBytes; } @Override @@ -457,9 +475,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - public void offloadBuffers(int partitionId, String basePath) + public long offloadBuffers(int partitionId, String basePath) throws IOException { - super.offloadBuffers(partitionId, getPath(basePath)); + return super.offloadBuffers(partitionId, getPath(basePath)); } @Override
