Integrating out-of-core mechanism with credit-based flow-control and data generation tethering
Summary: This diff integrates out-of-core infrastructure with credit-based flow control and adds the ability to tether the rate of data generation/processing. Data generation/processing rate is controlled by changing the number of active processing (input/compute) threads. This diff also implements a new (and more performant) adaptive out-of-core policy. Test Plan: mvn clean verify all snapshot tests including ones with large data pass Running adaptive out-of-core on large graph with very limited memory does not fail. This diff should enable us to avoid *any* reasonable job to fail! Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis Reviewed By: dionysis.logothetis Subscribers: ramesh-muthusamy Differential Revision: https://reviews.facebook.net/D55479 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6256a761 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6256a761 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6256a761 Branch: refs/heads/trunk Commit: 6256a761d61a5b27a05878da2449ce8537d60c99 Parents: 4321e44 Author: Sergey Edunov <[email protected]> Authored: Fri May 20 15:14:08 2016 -0700 Committer: Sergey Edunov <[email protected]> Committed: Fri May 20 15:14:08 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/giraph/comm/ServerData.java | 14 +- .../flow_control/CreditBasedFlowControl.java | 400 ++++++- .../giraph/comm/flow_control/FlowControl.java | 19 +- .../comm/flow_control/NoOpFlowControl.java | 14 +- .../comm/flow_control/StaticFlowControl.java | 32 +- .../apache/giraph/comm/netty/NettyClient.java | 39 +- .../handler/AddressRequestIdGenerator.java | 53 - .../netty/handler/RequestServerHandler.java | 6 +- .../netty/handler/ResponseClientHandler.java | 4 +- .../netty/handler/TaskRequestIdGenerator.java | 51 + .../giraph/comm/requests/RequestType.java | 4 +- .../giraph/comm/requests/SendResumeRequest.java | 80 ++ .../apache/giraph/conf/GiraphConfiguration.java | 4 - .../org/apache/giraph/conf/GiraphConstants.java | 18 +- .../org/apache/giraph/counters/GiraphStats.java | 34 +- .../apache/giraph/edge/AbstractEdgeStore.java | 23 +- .../apache/giraph/graph/ComputeCallable.java | 62 +- .../org/apache/giraph/graph/GlobalStats.java | 30 + .../apache/giraph/graph/GraphTaskManager.java | 66 +- .../apache/giraph/master/BspServiceMaster.java | 8 + .../apache/giraph/metrics/AggregatedMetric.java | 46 +- .../giraph/metrics/AggregatedMetricDouble.java | 50 + .../giraph/metrics/AggregatedMetricLong.java | 50 + .../giraph/metrics/AggregatedMetrics.java | 62 +- .../giraph/metrics/ValueWithHostname.java | 18 +- .../giraph/metrics/WorkerSuperstepMetrics.java | 62 + .../apache/giraph/ooc/FixedOutOfCoreEngine.java | 147 --- .../giraph/ooc/FixedOutOfCoreIOScheduler.java | 211 ---- .../giraph/ooc/FixedPartitionsOracle.java | 139 +++ .../org/apache/giraph/ooc/OutOfCoreEngine.java | 382 +++++- .../apache/giraph/ooc/OutOfCoreIOCallable.java | 71 +- .../apache/giraph/ooc/OutOfCoreIOScheduler.java | 182 ++- .../giraph/ooc/OutOfCoreIOStatistics.java | 360 ++++++ .../org/apache/giraph/ooc/OutOfCoreOracle.java | 131 +++ .../giraph/ooc/SimpleGCMonitoringOracle.java | 355 ++++++ .../apache/giraph/ooc/ThresholdBasedOracle.java | 364 ++++++ .../giraph/ooc/data/DiskBackedEdgeStore.java | 22 +- .../giraph/ooc/data/DiskBackedMessageStore.java | 43 +- .../ooc/data/DiskBackedPartitionStore.java | 46 +- .../giraph/ooc/data/MetaPartitionManager.java | 677 +++++------ .../giraph/ooc/data/OutOfCoreDataManager.java | 42 +- .../org/apache/giraph/ooc/io/IOCommand.java | 47 +- .../giraph/ooc/io/LoadPartitionIOCommand.java | 20 +- .../giraph/ooc/io/StoreDataBufferIOCommand.java | 18 +- .../ooc/io/StoreIncomingMessageIOCommand.java | 13 +- .../giraph/ooc/io/StorePartitionIOCommand.java | 21 +- .../org/apache/giraph/ooc/io/WaitIOCommand.java | 8 +- .../giraph/utils/AdjustableSemaphore.java | 6 + .../apache/giraph/worker/BspServiceWorker.java | 5 + .../giraph/worker/EdgeInputSplitsCallable.java | 10 + .../giraph/worker/InputSplitsCallable.java | 20 +- .../worker/VertexInputSplitsCallable.java | 10 + .../giraph/partition/TestPartitionStores.java | 6 +- .../java/org/apache/giraph/TestOutOfCore.java | 4 +- src/site/xdoc/options.xml | 1095 ------------------ 55 files changed, 3593 insertions(+), 2111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index be34820..69fbfee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -44,7 +44,6 @@ import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.ooc.data.DiskBackedEdgeStore; import org.apache.giraph.ooc.data.DiskBackedMessageStore; import org.apache.giraph.ooc.data.DiskBackedPartitionStore; -import org.apache.giraph.ooc.FixedOutOfCoreEngine; import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStore; @@ -149,16 +148,7 @@ public class ServerData<I extends WritableComparable, PartitionStore<I, V, E> inMemoryPartitionStore = new SimplePartitionStore<I, V, E>(conf, context); if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) { - int maxPartitionsInMemory = - GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf); - if (maxPartitionsInMemory == 0) { - throw new IllegalStateException("ServerData: Adaptive " + - "out-of-core engine is not supported yet! Number of partitions in" + - " memory should be greater than 0."); - } else { - oocEngine = new FixedOutOfCoreEngine(conf, service, - maxPartitionsInMemory); - } + oocEngine = new OutOfCoreEngine(conf, service); partitionStore = new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore, conf, context, service, oocEngine); @@ -312,7 +302,7 @@ public class ServerData<I extends WritableComparable, currentMessageStore = nextCurrentMessageStore; incomingMessageStore = nextIncomingMessageStore; if (oocEngine != null) { - oocEngine.getMetaPartitionManager().resetMessages(); + oocEngine.reset(); oocEngine.getSuperstepLock().writeLock().unlock(); } currentMessageStore.finalizeStore(); http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java index ff82dd1..36d4f20 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java @@ -18,18 +18,36 @@ package org.apache.giraph.comm.flow_control; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.comm.netty.handler.AckSignalFlag; +import org.apache.giraph.comm.requests.SendResumeRequest; import org.apache.giraph.comm.requests.WritableRequest; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.IntConfOption; import org.apache.giraph.utils.AdjustableSemaphore; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.ThreadUtils; import org.apache.log4j.Logger; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Deque; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -38,9 +56,27 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; /** - * Representation of credit-based flow control policy where each worker has a - * constant user-defined credit. The number of open requests to a particular - * worker cannot be more than its specified credit. + * Representation of credit-based flow control policy. With this policy there + * can be limited number of open requests from any worker x to any other worker + * y. This number is called 'credit'. Let's denote this number by C{x->y}. This + * implementation assumes that for a particular worker W, all values of C{x->W} + * are the same. Let's denote this value by CR_W. CR_W may change due to other + * reasons (e.g. memory pressure observed in an out-of-core mechanism). However, + * CR_W is always in range [0, MAX_CR], where MAX_CR is a user-defined constant. + * Note that MAX_CR should be representable by at most 14 bits. + * + * In this implementation, the value of CR_W is announced to other workers along + * with the ACK response envelope for all ACK response envelope going out of W. + * Therefore, for non-zero values of CR_W, other workers know the instant value + * of CR_W, hence they can control the number of open requests they have to W. + * However, it is possible that W announces 0 as CR_W. In this case, other + * workers stop opening more requests to W, hence they will not get any new + * response envelope from W. This means other workers should be notified with + * a dedicated request to resume sending more requests once CR_W becomes + * non-zero. In this implementation, once W_CR is announced as 0 to a particular + * worker U, we keep U in a set, so later on we can send 'resume signal' to U + * once CR_W becomes non-zero. Sending resume signals are done through a + * separate thread. */ public class CreditBasedFlowControl implements FlowControl { /** @@ -69,28 +105,87 @@ public class CreditBasedFlowControl implements FlowControl { private final int unsentWaitMsecs; /** Waiting interval for checking outstanding requests msecs */ private final int waitingRequestMsecs; - /** Maximum number of open requests we can have for each worker */ - private final int maxOpenRequestsPerWorker; + /** + * Maximum number of open requests each worker can have to this work at each + * moment (CR_W -define above- for this worker) + */ + private volatile short maxOpenRequestsPerWorker; /** Total number of unsent, cached requests */ private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0); /** - * Map of requests permits per worker. Key in the map is the worker id and the - * value is the semaphore to control the number of open requests for the - * particular worker. Basically, the number of available permits on this - * semaphore is the credit available for the worker. + * Map of requests permits per worker. Keys in the map are worker ids and + * values are pairs (X, Y) where: + * X: is the semaphore to control the number of open requests for a + * particular worker. Basically, the number of available permits on a + * semaphore is the credit available for the worker associated with that + * semaphore. + * Y: is the timestamp of the latest message (resume signal or ACK response) + * that changed the number of permits in the semaphore. + * The idea behind keeping a timestamp is to avoid any issue that may happen + * due to out-of-order message delivery. For example, consider this scenario: + * an ACK response is sent to a worker announcing the credit is 0. Later on, + * a resume signal announcing a non-zero credit is sent to the same worker. + * Now, if the resume signal receives before the ACK message, the worker + * would incorrectly assume credit value of 0, and would avoid sending any + * messages, which may lead to a live-lock. + * + * The timestamp value is simply the request id generated by NettyClient. + * These ids are generated in consecutive order, hence simulating the concept + * of timestamp. However, the timestamp value should be sent along with + * any ACK response envelope. The ACK response envelope is already very small + * (maybe 10-20 bytes). So, the timestamp value should not add much overhead + * to it. Instead of sending the whole long value request id (8 bytes) as the + * timestamp, we can simply send the least significant 2 bytes of it. This is + * a valid timestamp, as the credit value can be 0x3FFF (=16383) at most. This + * means there will be at most 0x3FFF messages on the fly at each moment, + * which means that the timestamp value sent by all messages in fly will fall + * into a range of size 0x3FFF. So, it is enough to only consider timestamp + * values twice as big as the mentioned range to be able to accurately + * determine ordering even when values wrap around. This means we only need to + * consider 15 least significant bits of request ids as timestamp values. + * + * The ACK response value contains following information (from least + * significant to most significant): + * - 16 bits timestamp + * - 14 bits credit value + * - 1 bit specifying whether one end of communication is master and hence + * credit based flow control should be ignored + * - 1 bit response flag */ - private final ConcurrentMap<Integer, AdjustableSemaphore> + private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>> perWorkerOpenRequestMap = Maps.newConcurrentMap(); /** Map of unsent cached requests per worker */ private final ConcurrentMap<Integer, Deque<WritableRequest>> perWorkerUnsentRequestMap = Maps.newConcurrentMap(); /** + * Set of workers that should be notified to resume sending more requests if + * the credit becomes non-zero + */ + private final Set<Integer> workersToResume = Sets.newHashSet(); + /** + * Resume signals are not using any credit, so they should be treated + * differently than normal requests. Resume signals should be ignored in + * accounting for credits in credit-based flow control. The following map + * keeps sets of request ids, for resume signals sent to other workers that + * are still "open". The set of request ids used for resume signals for a + * worker is important so we can determine if a received response is for a + * resume signal or not. + */ + private final Map<Integer, Set<Long>> resumeRequestsId = + Maps.newConcurrentMap(); + /** * Semaphore to control number of cached unsent requests. Maximum number of * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS. */ private final Semaphore unsentRequestPermit; /** Netty client used for sending requests */ private final NettyClient nettyClient; + /** + * Result of execution for the thread responsible for sending resume signals + */ + private final Future<Void> resumeThreadResult; + /** Whether we are shutting down the execution */ + private volatile boolean shouldTerminate; /** * Constructor @@ -100,31 +195,92 @@ public class CreditBasedFlowControl implements FlowControl { public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf, NettyClient nettyClient) { this.nettyClient = nettyClient; - maxOpenRequestsPerWorker = MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf); + maxOpenRequestsPerWorker = + (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf); checkState(maxOpenRequestsPerWorker < 0x4000 && maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " + "requests should be in range (0, " + 0x4FFF + ")"); unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf)); unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf); waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf); + shouldTerminate = false; + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + while (true) { + synchronized (workersToResume) { + if (shouldTerminate) { + break; + } + for (Integer workerId : workersToResume) { + if (maxOpenRequestsPerWorker != 0) { + sendResumeSignal(workerId); + } else { + break; + } + } + try { + workersToResume.wait(); + } catch (InterruptedException e) { + throw new IllegalStateException("call: caught exception " + + "while waiting for resume-sender thread to be notified!", + e); + } + } + } + return null; + } + }; + } + }; + + ExecutorService executor = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("resume-sender")); + resumeThreadResult = executor.submit(new LogStacktraceCallable<>( + callableFactory.newCallable(0))); + executor.shutdown(); + } + + /** + * Send resume signal request to a given worker + * + * @param workerId id of the worker to send the resume signal to + */ + private void sendResumeSignal(int workerId) { + WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker); + Long resumeId = nettyClient.doSend(workerId, request); + checkState(resumeId != null); + if (LOG.isDebugEnabled()) { + LOG.debug("sendResumeSignal: sending signal to worker " + workerId + + " with credit=" + maxOpenRequestsPerWorker + ", ID=" + + (resumeId & 0xFFFF)); + } + resumeRequestsId.get(workerId).add(resumeId); } @Override public void sendRequest(int destTaskId, WritableRequest request) { - AdjustableSemaphore openRequestPermit = + Pair<AdjustableSemaphore, Integer> pair = perWorkerOpenRequestMap.get(destTaskId); // Check if this is the first time sending a request to a worker. If so, we // should the worker id to necessary bookkeeping data structure. - if (openRequestPermit == null) { - openRequestPermit = new AdjustableSemaphore(maxOpenRequestsPerWorker); - AdjustableSemaphore temp = perWorkerOpenRequestMap.putIfAbsent(destTaskId, - openRequestPermit); - perWorkerUnsentRequestMap - .putIfAbsent(destTaskId, new ArrayDeque<WritableRequest>()); + if (pair == null) { + pair = new MutablePair<>( + new AdjustableSemaphore(maxOpenRequestsPerWorker), -1); + Pair<AdjustableSemaphore, Integer> temp = + perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair); + perWorkerUnsentRequestMap.putIfAbsent( + destTaskId, new ArrayDeque<WritableRequest>()); + resumeRequestsId.putIfAbsent( + destTaskId, Sets.<Long>newConcurrentHashSet()); if (temp != null) { - openRequestPermit = temp; + pair = temp; } } + AdjustableSemaphore openRequestPermit = pair.getLeft(); // Try to reserve a spot for the request amongst the open requests of // the destination worker. boolean shouldSend = openRequestPermit.tryAcquire(); @@ -198,8 +354,8 @@ public class CreditBasedFlowControl implements FlowControl { * @param response response received * @return true iff credit should be ignored, false otherwise */ - private boolean shouldIgnoreCredit(short response) { - return ((short) ((response >> 14) & 1)) == 1; + private boolean shouldIgnoreCredit(int response) { + return ((short) ((response >> (14 + 16)) & 1)) == 1; } /** @@ -208,8 +364,18 @@ public class CreditBasedFlowControl implements FlowControl { * @param response response received * @return credit from the received response */ - private short getCredit(short response) { - return (short) (response & 0x3FFF); + private short getCredit(int response) { + return (short) ((response >> 16) & 0x3FFF); + } + + /** + * Get the timestamp from a response + * + * @param response response received + * @return timestamp from the received response + */ + private int getTimestamp(int response) { + return response & 0xFFFF; } /** @@ -219,15 +385,73 @@ public class CreditBasedFlowControl implements FlowControl { * @return AckSignalFlag coming with the response */ @Override - public AckSignalFlag getAckSignalFlag(short response) { - return AckSignalFlag.values()[(response >> 15) & 1]; + public AckSignalFlag getAckSignalFlag(int response) { + return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1]; } @Override - public short calculateResponse(AckSignalFlag flag, int taskId) { + public int calculateResponse(AckSignalFlag flag, int taskId) { boolean ignoreCredit = nettyClient.masterInvolved(taskId); - return (short) ((flag.ordinal() << 15) | - ((ignoreCredit ? 1 : 0) << 14) | (maxOpenRequestsPerWorker & 0x3FFF)); + if (!ignoreCredit && maxOpenRequestsPerWorker == 0) { + synchronized (workersToResume) { + workersToResume.add(taskId); + } + } + int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF); + return (flag.ordinal() << (16 + 14 + 1)) | + ((ignoreCredit ? 1 : 0) << (16 + 14)) | + (maxOpenRequestsPerWorker << 16) | + timestamp; + } + + @Override + public void shutdown() { + synchronized (workersToResume) { + shouldTerminate = true; + workersToResume.notifyAll(); + } + try { + resumeThreadResult.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException("shutdown: caught exception while" + + "getting result of resume-sender thread"); + } + } + + @Override + public void logInfo() { + if (LOG.isInfoEnabled()) { + // Count how many unsent requests each task has + Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap(); + for (Map.Entry<Integer, Deque<WritableRequest>> entry : + perWorkerUnsentRequestMap.entrySet()) { + unsentRequestCounts.put(entry.getKey(), entry.getValue().size()); + } + ArrayList<Map.Entry<Integer, Integer>> sorted = + Lists.newArrayList(unsentRequestCounts.entrySet()); + Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() { + @Override + public int compare(Map.Entry<Integer, Integer> entry1, + Map.Entry<Integer, Integer> entry2) { + int value1 = entry1.getValue(); + int value2 = entry2.getValue(); + return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1); + } + }); + StringBuilder message = new StringBuilder(); + message.append("logInfo: ").append(aggregateUnsentRequests.get()) + .append(" unsent requests in total. "); + int itemsToPrint = Math.min(10, sorted.size()); + for (int i = 0; i < itemsToPrint; ++i) { + message.append(sorted.get(i).getValue()) + .append(" unsent requests for taskId=") + .append(sorted.get(i).getKey()).append(" (credit=") + .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey()) + .getKey().getMaxPermits()) + .append("), "); + } + LOG.info(message); + } } @Override @@ -240,7 +464,7 @@ public class CreditBasedFlowControl implements FlowControl { try { aggregateUnsentRequests.wait(waitingRequestMsecs); } catch (InterruptedException e) { - throw new IllegalStateException("waitSomeRequests: failed while " + + throw new IllegalStateException("waitAllRequests: failed while " + "waiting on open/cached requests"); } } @@ -257,21 +481,52 @@ public class CreditBasedFlowControl implements FlowControl { } @Override - public void messageAckReceived(int taskId, short response) { - boolean shouldIgnoreCredit = shouldIgnoreCredit(response); + public void messageAckReceived(int taskId, long requestId, int response) { + boolean ignoreCredit = shouldIgnoreCredit(response); short credit = getCredit(response); - AdjustableSemaphore openRequestPermit = - perWorkerOpenRequestMap.get(taskId); - openRequestPermit.release(); - if (!shouldIgnoreCredit) { - openRequestPermit.setMaxPermits(credit); + int timestamp = getTimestamp(response); + MutablePair<AdjustableSemaphore, Integer> pair = + (MutablePair<AdjustableSemaphore, Integer>) + perWorkerOpenRequestMap.get(taskId); + AdjustableSemaphore openRequestPermit = pair.getLeft(); + // Release a permit on open requests if we received ACK of a request other + // than a Resume request (resume requests are always sent regardless of + // number of open requests) + if (!resumeRequestsId.get(taskId).remove(requestId)) { + openRequestPermit.release(); + } else if (LOG.isDebugEnabled()) { + LOG.debug("messageAckReceived: ACK of resume received from " + taskId + + " timestamp=" + timestamp); + } + if (!ignoreCredit) { + synchronized (pair) { + if (compareTimestamps(timestamp, pair.getRight()) > 0) { + pair.setRight(timestamp); + openRequestPermit.setMaxPermits(credit); + } else if (LOG.isDebugEnabled()) { + LOG.debug("messageAckReceived: received out-of-order messages." + + "Received timestamp=" + timestamp + " and current timestamp=" + + pair.getRight()); + } + } } - Deque<WritableRequest> requestDeque = - perWorkerUnsentRequestMap.get(taskId); // Since we received a response and we changed the credit of the sender // client, we may be able to send some more requests to the sender // client. So, we try to send as much request as we can to the sender // client. + trySendCachedRequests(taskId); + } + + /** + * Try to send as much as cached requests to a given worker + * + * @param taskId id of the worker to send cached requests to + */ + private void trySendCachedRequests(int taskId) { + Deque<WritableRequest> requestDeque = + perWorkerUnsentRequestMap.get(taskId); + AdjustableSemaphore openRequestPermit = + perWorkerOpenRequestMap.get(taskId).getLeft(); while (true) { WritableRequest request; synchronized (requestDeque) { @@ -285,6 +540,7 @@ public class CreditBasedFlowControl implements FlowControl { break; } } + unsentRequestPermit.release(); // At this point, we have a request, and we reserved a credit for the // sender client. So, we send the request to the client and update // the state. @@ -294,7 +550,73 @@ public class CreditBasedFlowControl implements FlowControl { aggregateUnsentRequests.notifyAll(); } } - unsentRequestPermit.release(); } } + + /** + * Update the max credit that is announced to other workers + * + * @param newCredit new credit + */ + public void updateCredit(short newCredit) { + newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit)); + // Check whether we should send resume signals to some workers + if (maxOpenRequestsPerWorker == 0 && newCredit != 0) { + maxOpenRequestsPerWorker = newCredit; + synchronized (workersToResume) { + workersToResume.notifyAll(); + } + } else { + maxOpenRequestsPerWorker = newCredit; + } + } + + /** + * Compare two timestamps accounting for wrap around. Note that the timestamp + * values should be in a range that fits into 14 bits values. This means if + * the difference of the two given timestamp is large, we are dealing with one + * value being wrapped around. + * + * @param ts1 first timestamp + * @param ts2 second timestamp + * @return positive value if first timestamp is later than second timestamp, + * negative otherwise + */ + private int compareTimestamps(int ts1, int ts2) { + int diff = ts1 - ts2; + if (Math.abs(diff) < 0x7FFF) { + return diff; + } else { + return -diff; + } + } + + /** + * Process a resume signal came from a given worker + * + * @param clientId id of the worker that sent the signal + * @param credit the credit value sent along with the resume signal + * @param requestId timestamp (request id) of the resume signal + */ + public void processResumeSignal(int clientId, short credit, long requestId) { + int timestamp = (int) (requestId & 0xFFFF); + if (LOG.isDebugEnabled()) { + LOG.debug("processResumeSignal: resume signal from " + clientId + + " with timestamp=" + timestamp); + } + MutablePair<AdjustableSemaphore, Integer> pair = + (MutablePair<AdjustableSemaphore, Integer>) + perWorkerOpenRequestMap.get(clientId); + synchronized (pair) { + if (compareTimestamps(timestamp, pair.getRight()) > 0) { + pair.setRight(timestamp); + pair.getLeft().setMaxPermits(credit); + } else if (LOG.isDebugEnabled()) { + LOG.debug("processResumeSignal: received out-of-order messages. " + + "Received timestamp=" + timestamp + " and current timestamp=" + + pair.getRight()); + } + } + trySendCachedRequests(clientId); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java index 4eda193..4072af7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java @@ -45,9 +45,10 @@ public interface FlowControl { * Notify the flow control policy that an open request is completed. * * @param taskId id of the task to which the open request is completed - * @param response the response heard from the task + * @param requestId id of the open request which is completed + * @param response the response heard from the client */ - void messageAckReceived(int taskId, short response); + void messageAckReceived(int taskId, long requestId, int response); /** * Decode the acknowledgement signal from the response after an open request @@ -56,7 +57,7 @@ public interface FlowControl { * @param response the response heard after completion of a request * @return the Acknowledgement signal decoded from the response */ - AckSignalFlag getAckSignalFlag(short response); + AckSignalFlag getAckSignalFlag(int response); /** * There may be requests in possession of the flow control mechanism, as the @@ -79,5 +80,15 @@ public interface FlowControl { * @param taskId id of the task the acknowledgement is for * @return the response to piggyback along with the acknowledgement message */ - short calculateResponse(AckSignalFlag flag, int taskId); + int calculateResponse(AckSignalFlag flag, int taskId); + + /** + * Shutdown the flow control policy + */ + void shutdown(); + + /** + * Log the status of the flow control + */ + void logInfo(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java index d50fe92..c97c967 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java @@ -45,10 +45,10 @@ public class NoOpFlowControl implements FlowControl { } @Override - public void messageAckReceived(int taskId, short response) { } + public void messageAckReceived(int taskId, long requestId, int response) { } @Override - public AckSignalFlag getAckSignalFlag(short response) { + public AckSignalFlag getAckSignalFlag(int response) { return AckSignalFlag.values()[response]; } @@ -61,7 +61,13 @@ public class NoOpFlowControl implements FlowControl { } @Override - public short calculateResponse(AckSignalFlag alreadyDone, int taskId) { - return (short) alreadyDone.ordinal(); + public int calculateResponse(AckSignalFlag alreadyDone, int taskId) { + return alreadyDone.ordinal(); } + + @Override + public void shutdown() { } + + @Override + public void logInfo() { } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java index 1fc43a7..6d67afd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java @@ -31,6 +31,8 @@ import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.log4j.Logger; +import java.util.concurrent.atomic.AtomicInteger; + import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; /** @@ -69,6 +71,8 @@ public class StaticFlowControl implements private final Object requestSpotAvailable = new Object(); /** Counter for time spent waiting on too many open requests */ private Counter timeWaitingOnOpenRequests; + /** Number of threads waiting on too many open requests */ + private final AtomicInteger numWaitingThreads = new AtomicInteger(0); /** * Constructor @@ -108,11 +112,12 @@ public class StaticFlowControl implements } /** - * Ensure that at most maxOpenRequests are not complete. Periodically, - * check the state of every request. If we find the connection failed, - * re-establish it and re-send the request. + * Ensure that at most numberOfRequestsToProceed are not complete. + * Periodically, check the state of every request. If we find the connection + * failed, re-establish it and re-send the request. */ private void waitSomeRequests() { + numWaitingThreads.getAndIncrement(); while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) { // Wait for requests to complete for some time synchronized (requestSpotAvailable) { @@ -129,23 +134,36 @@ public class StaticFlowControl implements } nettyClient.logAndSanityCheck(); } + numWaitingThreads.getAndDecrement(); } @Override - public void messageAckReceived(int taskId, short response) { + public void messageAckReceived(int taskId, long requestId, int response) { synchronized (requestSpotAvailable) { requestSpotAvailable.notifyAll(); } } @Override - public AckSignalFlag getAckSignalFlag(short response) { + public AckSignalFlag getAckSignalFlag(int response) { return AckSignalFlag.values()[response]; } @Override - public short calculateResponse(AckSignalFlag alreadyDone, int taskId) { - return (short) alreadyDone.ordinal(); + public int calculateResponse(AckSignalFlag alreadyDone, int clientId) { + return alreadyDone.ordinal(); + } + + @Override + public void shutdown() { } + + @Override + public void logInfo() { + if (LOG.isInfoEnabled()) { + LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " + + "until number of open requests falls below " + + numberOfRequestsToProceed); + } } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index c185fdc..217dba6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -23,7 +23,7 @@ import org.apache.giraph.comm.flow_control.FlowControl; import org.apache.giraph.comm.flow_control.NoOpFlowControl; import org.apache.giraph.comm.flow_control.StaticFlowControl; import org.apache.giraph.comm.netty.handler.AckSignalFlag; -import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator; +import org.apache.giraph.comm.netty.handler.TaskRequestIdGenerator; import org.apache.giraph.comm.netty.handler.ClientRequestId; import org.apache.giraph.comm.netty.handler.RequestEncoder; import org.apache.giraph.comm.netty.handler.RequestInfo; @@ -174,12 +174,12 @@ public class NettyClient { /** Waiting interval for checking outstanding requests msecs */ private final int waitingRequestMsecs; /** Timed logger for printing request debugging */ - private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG); + private final TimedLogger requestLogger; /** Worker executor group */ private final EventLoopGroup workerGroup; - /** Address request id generator */ - private final AddressRequestIdGenerator addressRequestIdGenerator = - new AddressRequestIdGenerator(); + /** Task request id generator */ + private final TaskRequestIdGenerator taskRequestIdGenerator = + new TaskRequestIdGenerator(); /** Task info */ private final TaskInfo myTaskInfo; /** Maximum thread pool size */ @@ -245,6 +245,7 @@ public class NettyClient { waitTimeBetweenConnectionRetriesMs = WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf); waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf); + requestLogger = new TimedLogger(waitingRequestMsecs, LOG); maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf); maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf); @@ -621,6 +622,7 @@ public class NettyClient { if (LOG.isInfoEnabled()) { LOG.info("stop: Halting netty client"); } + flowControl.shutdown(); // Close connections asynchronously, in a Netty-approved // way, without cleaning up thread pools until all channels // in addressChannelMap are closed (success or failure) @@ -730,14 +732,16 @@ public class NettyClient { * * @param destTaskId destination to send the request to * @param request request itself + * @return request id generated for sending the request */ - public void doSend(int destTaskId, WritableRequest request) { + public Long doSend(int destTaskId, WritableRequest request) { InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId); if (clientRequestIdRequestInfoMap.isEmpty()) { inboundByteCounter.resetAll(); outboundByteCounter.resetAll(); } boolean registerRequest = true; + Long requestId = null; /*if_not[HADOOP_NON_SECURE]*/ if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) { registerRequest = false; @@ -748,8 +752,8 @@ public class NettyClient { RequestInfo newRequestInfo = new RequestInfo(remoteServer, request); if (registerRequest) { request.setClientId(myTaskInfo.getTaskId()); - request.setRequestId( - addressRequestIdGenerator.getNextRequestId(remoteServer)); + requestId = taskRequestIdGenerator.getNextRequestId(destTaskId); + request.setRequestId(requestId); ClientRequestId clientRequestId = new ClientRequestId(destTaskId, request.getRequestId()); RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent( @@ -769,6 +773,7 @@ public class NettyClient { ChannelFuture writeFuture = channel.write(request); newRequestInfo.setWriteFuture(writeFuture); writeFuture.addListener(logErrorListener); + return requestId; } /** @@ -779,7 +784,7 @@ public class NettyClient { * @param response Actual response * @param shouldDrop Drop the message? */ - public void messageReceived(int senderId, long requestId, short response, + public void messageReceived(int senderId, long requestId, int response, boolean shouldDrop) { if (shouldDrop) { synchronized (clientRequestIdRequestInfoMap) { @@ -806,7 +811,7 @@ public class NettyClient { requestInfo + ". Waiting on " + clientRequestIdRequestInfoMap.size() + " requests"); } - flowControl.messageAckReceived(senderId, response); + flowControl.messageAckReceived(senderId, requestId, response); // Help #waitAllRequests() to finish faster synchronized (clientRequestIdRequestInfoMap) { clientRequestIdRequestInfoMap.notifyAll(); @@ -862,8 +867,7 @@ public class NettyClient { LOG.info("logInfoAboutOpenRequests: Waiting interval of " + waitingRequestMsecs + " msecs, " + clientRequestIdRequestInfoMap.size() + - " open requests, " + flowControl.getNumberOfUnsentRequests() + - " cached unsent requests, " + inboundByteCounter.getMetrics() + "\n" + + " open requests, " + inboundByteCounter.getMetrics() + "\n" + outboundByteCounter.getMetrics()); if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) { @@ -907,6 +911,7 @@ public class NettyClient { .append(", "); } LOG.info(message); + flowControl.logInfo(); } } @@ -1022,6 +1027,16 @@ public class NettyClient { } /** + * Generate and get the next request id to be used for a given worker + * + * @param taskId id of the worker to generate the next request id + * @return request id + */ + public Long getNextRequestId(int taskId) { + return taskRequestIdGenerator.getNextRequestId(taskId); + } + + /** * @return number of open requests */ public int getNumberOfOpenRequests() { http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java deleted file mode 100644 index 8ba5b96..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.netty.handler; - -import com.google.common.collect.Maps; -import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Generate different request ids based on the address of the well known - * port on the workers. Thread-safe. - */ -public class AddressRequestIdGenerator { - /** Address request generator map */ - private final ConcurrentMap<InetSocketAddress, AtomicLong> - addressRequestGeneratorMap = Maps.newConcurrentMap(); - - /** - * Get the next request id for a given destination. Thread-safe. - * - * @param address Address of the worker (consistent during a superstep) - * @return Valid request id - */ - public Long getNextRequestId(InetSocketAddress address) { - AtomicLong requestGenerator = addressRequestGeneratorMap.get(address); - if (requestGenerator == null) { - requestGenerator = new AtomicLong(0); - AtomicLong oldRequestGenerator = - addressRequestGeneratorMap.putIfAbsent(address, requestGenerator); - if (oldRequestGenerator != null) { - requestGenerator = oldRequestGenerator; - } - } - return requestGenerator.getAndIncrement(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java index df50e2a..7bb4464 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java @@ -44,7 +44,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES public abstract class RequestServerHandler<R> extends ChannelInboundHandlerAdapter { /** Number of bytes in the encoded response */ - public static final int RESPONSE_BYTES = 14; + public static final int RESPONSE_BYTES = 16; /** Time class to use */ private static Time TIME = SystemTime.get(); /** Class logger */ @@ -137,9 +137,9 @@ public abstract class RequestServerHandler<R> extends ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES); buffer.writeInt(myTaskInfo.getTaskId()); buffer.writeLong(request.getRequestId()); - short signal = + int signal = flowControl.calculateResponse(alreadyDone, request.getClientId()); - buffer.writeShort(signal); + buffer.writeInt(signal); ctx.write(buffer); // NettyServer is bootstrapped with auto-read set to true by default. After // the first request is processed, we set auto-read to false. This prevents http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java index 54cb201..12dde3b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java @@ -64,11 +64,11 @@ public class ResponseClientHandler extends ChannelInboundHandlerAdapter { ByteBuf buf = (ByteBuf) msg; int senderId = -1; long requestId = -1; - short response = -1; + int response = -1; try { senderId = buf.readInt(); requestId = buf.readLong(); - response = buf.readShort(); + response = buf.readInt(); } catch (IndexOutOfBoundsException e) { throw new IllegalStateException( "channelRead: Got IndexOutOfBoundsException ", e); http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java new file mode 100644 index 0000000..b172e9a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.netty.handler; + +import com.google.common.collect.Maps; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Generate different request ids based on the task id. Thread-safe. + */ +public class TaskRequestIdGenerator { + /** Task request generator map */ + private final ConcurrentMap<Integer, AtomicLong> + taskRequestGeneratorMap = Maps.newConcurrentMap(); + + /** + * Get the next request id for a given destination. Thread-safe. + * + * @param taskId id of the task(consistent during a superstep) + * @return Valid request id + */ + public Long getNextRequestId(Integer taskId) { + AtomicLong requestGenerator = taskRequestGeneratorMap.get(taskId); + if (requestGenerator == null) { + requestGenerator = new AtomicLong(0); + AtomicLong oldRequestGenerator = + taskRequestGeneratorMap.putIfAbsent(taskId, requestGenerator); + if (oldRequestGenerator != null) { + requestGenerator = oldRequestGenerator; + } + } + return requestGenerator.getAndIncrement(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java index bebac28..627c2af 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java @@ -64,7 +64,9 @@ else[HADOOP_NON_SECURE]*/ /** Send request for input split from worker to master */ ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class), /** Send request with granted input split from master to workers */ - REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class); + REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class), + /** Send request to resume sending messages (used in flow-control) */ + SEND_RESUME_REQUEST(SendResumeRequest.class); /** Class of request which this type corresponds to */ private final Class<? extends WritableRequest> requestClass; http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java new file mode 100644 index 0000000..0e5e8bb --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import org.apache.giraph.comm.ServerData; +import org.apache.giraph.comm.flow_control.CreditBasedFlowControl; +import org.apache.giraph.comm.flow_control.FlowControl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Send to a worker a signal to resume sending messages to sender worker. This + * type of request is used in adaptive credit-based flow control, where a + * worker (W) may assign credit value of 0 to some worker (U), so that U would + * stop sending messages to W. Later on, W may want to notify U to continue + * sending messages to W. Along with the resume signal, W also announces a new + * credit value to U. + */ +public class SendResumeRequest extends WritableRequest + implements WorkerRequest { + /** credit value */ + private short credit; + + /** Constructor used for reflection only */ + public SendResumeRequest() { } + + /** + * Constructor + * + * @param credit credit value + */ + public SendResumeRequest(short credit) { + checkState(credit > 0); + this.credit = credit; + } + + @Override + public void doRequest(ServerData serverData) { + FlowControl flowControl = + serverData.getServiceWorker().getWorkerClient().getFlowControl(); + checkState(flowControl != null); + ((CreditBasedFlowControl) flowControl).processResumeSignal(getClientId(), + credit, getRequestId()); + } + + @Override + public RequestType getType() { + return RequestType.SEND_RESUME_REQUEST; + } + + @Override + void readFieldsRequest(DataInput input) throws IOException { + credit = input.readShort(); + } + + @Override + void writeRequest(DataOutput output) throws IOException { + output.writeShort(credit); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 78bd5ef..df79b7f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -957,10 +957,6 @@ public class GiraphConfiguration extends Configuration return NUM_COMPUTE_THREADS.get(this); } - public int getNumOocThreads() { - return NUM_OOC_THREADS.get(this); - } - /** * Set the number of input split threads * http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index b5bb9ed..17e030f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -71,6 +71,8 @@ import org.apache.giraph.mapping.translate.TranslateEdge; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; +import org.apache.giraph.ooc.OutOfCoreOracle; +import org.apache.giraph.ooc.ThresholdBasedOracle; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.HashPartitionerFactory; import org.apache.giraph.partition.Partition; @@ -995,10 +997,16 @@ public interface GiraphConstants { new BooleanConfOption("giraph.useOutOfCoreGraph", false, "Enable out-of-core graph."); - /** Number of threads participating in swapping graph/messages to disk. */ - IntConfOption NUM_OOC_THREADS = - new IntConfOption("giraph.numOutOfCoreThreads", 1, - "Number of threads participating in swapping data to disk."); + /** + * Out-of-core oracle that is to be used for adaptive out-of-core engine. If + * the `MAX_PARTITIONS_IN_MEMORY` is already set, this will be over-written + * to be `FixedPartitionsOracle`. + */ + ClassConfOption<OutOfCoreOracle> OUT_OF_CORE_ORACLE = + ClassConfOption.create("giraph.outOfCoreOracle", + ThresholdBasedOracle.class, OutOfCoreOracle.class, + "Out-of-core oracle that is to be used for adaptive out-of-core " + + "engine"); /** Maximum number of partitions to hold in memory for each worker. */ IntConfOption MAX_PARTITIONS_IN_MEMORY = @@ -1006,8 +1014,6 @@ public interface GiraphConstants { "Maximum number of partitions to hold in memory for each worker. By" + " default it is set to 0 (for adaptive out-of-core mechanism"); - - /** Directory to write YourKit snapshots to */ String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir"; /** Default directory to write YourKit snapshots to */ http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java index d96b474..0cb8486 100644 --- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java @@ -47,7 +47,7 @@ public class GiraphStats extends HadoopCountersBase { = "Aggregate sent messages"; /** aggregate sent messages bytes counter name */ public static final String AGGREGATE_SENT_MESSAGE_BYTES_NAME - = "Aggregate sent message message bytes"; + = "Aggregate sent message bytes"; /** workers counter name */ public static final String CURRENT_WORKERS_NAME = "Current workers"; /** current master partition task counter name */ @@ -56,6 +56,12 @@ public class GiraphStats extends HadoopCountersBase { /** last checkpointed superstep counter name */ public static final String LAST_CHECKPOINTED_SUPERSTEP_NAME = "Last checkpointed superstep"; + /** aggregate bytes loaded from local disks in out-of-core */ + public static final String OOC_BYTES_LOADED_NAME = + "Aggregate bytes loaded from local disks (out-of-core)"; + /** aggregate bytes stored to local disks in out-of-core */ + public static final String OOC_BYTES_STORED_NAME = + "Aggregate bytes stored to local disks (out-of-core)"; /** Singleton instance for everyone to use */ private static GiraphStats INSTANCE; @@ -82,8 +88,12 @@ public class GiraphStats extends HadoopCountersBase { private static final int AGG_SENT_MESSAGES = 9; /** Aggregate sent message bytes counter */ private static final int AGG_SENT_MESSAGE_BYTES = 10; + /** Aggregate OOC loaded bytes counter */ + private static final int OOC_BYTES_LOADED = 11; + /** Aggregate OOC stored bytes counter */ + private static final int OOC_BYTES_STORED = 12; /** Number of counters in this class */ - private static final int NUM_COUNTERS = 11; + private static final int NUM_COUNTERS = 13; /** All the counters stored */ private final GiraphHadoopCounter[] counters; @@ -111,6 +121,8 @@ public class GiraphStats extends HadoopCountersBase { getCounter(AGGREGATE_SENT_MESSAGES_NAME); counters[AGG_SENT_MESSAGE_BYTES] = getCounter(AGGREGATE_SENT_MESSAGE_BYTES_NAME); + counters[OOC_BYTES_LOADED] = getCounter(OOC_BYTES_LOADED_NAME); + counters[OOC_BYTES_STORED] = getCounter(OOC_BYTES_STORED_NAME); } /** @@ -230,6 +242,24 @@ public class GiraphStats extends HadoopCountersBase { return counters[LAST_CHECKPOINTED_SUPERSTEP]; } + /** + * Get OOCBytesLoaded counter + * + * @return OOCBytesLoaded counter + */ + public GiraphHadoopCounter getAggregateOOCBytesLoaded() { + return counters[OOC_BYTES_LOADED]; + } + + /** + * Get OOCBytesStored counter + * + * @return OOCBytesStored counter + */ + public GiraphHadoopCounter getAggregateOOCBytesStored() { + return counters[OOC_BYTES_STORED]; + } + @Override public Iterator<GiraphHadoopCounter> iterator() { return Arrays.asList(counters).iterator(); http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java index 0f3d668..104cae2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java @@ -23,6 +23,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.partition.Partition; import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.utils.ProgressableUtils; @@ -78,6 +79,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, * from the one used during computation. */ protected boolean useInputOutEdges; + /** Whether we spilled edges on disk */ + private boolean hasEdgesOnDisk = false; /** * Constructor. @@ -169,6 +172,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId); if (edges != null) { output.writeInt(edges.size()); + if (edges.size() > 0) { + hasEdgesOnDisk = true; + } for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) { writeVertexKey(edge.getKey(), output); edge.getValue().write(output); @@ -242,7 +248,7 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, @Override public void moveEdgesToVertices() { final boolean createSourceVertex = configuration.getCreateSourceVertex(); - if (transientEdges.isEmpty()) { + if (transientEdges.isEmpty() && !hasEdgesOnDisk) { if (LOG.isInfoEnabled()) { LOG.info("moveEdgesToVertices: No edges to move"); } @@ -264,6 +270,10 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, public Void call() throws Exception { Integer partitionId; I representativeVertexId = configuration.createVertexId(); + OutOfCoreEngine oocEngine = service.getServerData().getOocEngine(); + if (oocEngine != null) { + oocEngine.processingThreadStart(); + } while (true) { Partition<I, V, E> partition = service.getPartitionStore().getNextPartition(); @@ -280,7 +290,15 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, Iterator<Et> iterator = getPartitionEdgesIterator(partitionEdges); // process all vertices in given partition + int count = 0; while (iterator.hasNext()) { + // If out-of-core mechanism is used, check whether this thread + // can stay active or it should temporarily suspend and stop + // processing and generating more data for the moment. + if (oocEngine != null && + (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) { + oocEngine.activeThreadCheckIn(); + } Et entry = iterator.next(); I vertexId = getVertexId(entry, representativeVertexId); OutEdges<I, E> outEdges = convertInputToComputeEdges( @@ -320,6 +338,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, // partition after modifying it. service.getPartitionStore().putPartition(partition); } + if (oocEngine != null) { + oocEngine.processingThreadFinish(); + } return null; } }; http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index d59d044..78c1ec3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -31,6 +31,7 @@ import org.apache.giraph.io.SimpleVertexWriter; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.MetricNames; import org.apache.giraph.metrics.SuperstepMetricsRegistry; +import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStore; @@ -99,6 +100,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, private final Counter messageBytesSentCounter; /** Compute time per partition */ private final Histogram histogramComputePerPartition; + /** GC time per compute thread */ + private final Histogram histogramGCTimePerThread; + /** Wait time per compute thread */ + private final Histogram histogramWaitTimePerThread; + /** Processing time per compute thread */ + private final Histogram histogramProcessingTimePerThread; /** * Constructor @@ -125,6 +132,11 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT); histogramComputePerPartition = metrics.getUniformHistogram( MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION); + histogramGCTimePerThread = metrics.getUniformHistogram("gc-per-thread-ms"); + histogramWaitTimePerThread = + metrics.getUniformHistogram("wait-per-thread-ms"); + histogramProcessingTimePerThread = + metrics.getUniformHistogram("processing-per-thread-ms"); } @Override @@ -148,17 +160,32 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, List<PartitionStats> partitionStatsList = Lists.newArrayList(); PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore(); + OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine(); + GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager(); + if (oocEngine != null) { + oocEngine.processingThreadStart(); + } + long timeWaiting = 0; + long timeProcessing = 0; + long timeDoingGC = 0; while (true) { long startTime = System.currentTimeMillis(); + long startGCTime = taskManager.getSuperstepGCTime(); Partition<I, V, E> partition = partitionStore.getNextPartition(); + long timeDoingGCWhileWaiting = + taskManager.getSuperstepGCTime() - startGCTime; + timeDoingGC += timeDoingGCWhileWaiting; + timeWaiting += System.currentTimeMillis() - startTime - + timeDoingGCWhileWaiting; if (partition == null) { break; } - + long startProcessingTime = System.currentTimeMillis(); + startGCTime = taskManager.getSuperstepGCTime(); try { serviceWorker.getServerData().resolvePartitionMutation(partition); PartitionStats partitionStats = - computePartition(computation, partition); + computePartition(computation, partition, oocEngine); partitionStatsList.add(partitionStats); long partitionMsgs = workerClientRequestProcessor.resetMessageCount(); partitionStats.addMessagesSentCount(partitionMsgs); @@ -180,10 +207,17 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, } finally { partitionStore.putPartition(partition); } + long timeDoingGCWhileProcessing = + taskManager.getSuperstepGCTime() - startGCTime; + timeDoingGC += timeDoingGCWhileProcessing; + timeProcessing += System.currentTimeMillis() - startProcessingTime - + timeDoingGCWhileProcessing; histogramComputePerPartition.update( System.currentTimeMillis() - startTime); } - + histogramGCTimePerThread.update(timeDoingGC); + histogramWaitTimePerThread.update(timeWaiting); + histogramProcessingTimePerThread.update(timeProcessing); computation.postSuperstep(); // Return VertexWriter after the usage @@ -194,7 +228,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, Time.NS_PER_SECOND_AS_FLOAT; LOG.info("call: Computation took " + seconds + " secs for " + partitionStatsList.size() + " partitions on superstep " + - graphState.getSuperstep() + ". Flushing started"); + graphState.getSuperstep() + ". Flushing started (time waiting on " + + "partitions was " + + String.format("%.2f s", timeWaiting / 1000.0) + ", time processing " + + "partitions was " + String.format("%.2f s", timeProcessing / 1000.0) + + ", time spent on gc was " + + String.format("%.2f s", timeDoingGC / 1000.0) + ")"); } try { workerClientRequestProcessor.flush(); @@ -211,6 +250,9 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, } catch (IOException e) { throw new IllegalStateException("call: Flushing failed.", e); } + if (oocEngine != null) { + oocEngine.processingThreadFinish(); + } return partitionStatsList; } @@ -219,17 +261,27 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, * * @param computation Computation to use * @param partition Partition to compute + * @param oocEngine out-of-core engine * @return Partition stats for this computed partition */ private PartitionStats computePartition( Computation<I, V, E, M1, M2> computation, - Partition<I, V, E> partition) throws IOException, InterruptedException { + Partition<I, V, E> partition, OutOfCoreEngine oocEngine) + throws IOException, InterruptedException { PartitionStats partitionStats = new PartitionStats(partition.getId(), 0, 0, 0, 0, 0); long verticesComputedProgress = 0; // Make sure this is thread-safe across runs synchronized (partition) { + int count = 0; for (Vertex<I, V, E> vertex : partition) { + // If out-of-core mechanism is used, check whether this thread + // can stay active or it should temporarily suspend and stop + // processing and generating more data for the moment. + if (oocEngine != null && + (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) { + oocEngine.activeThreadCheckIn(); + } Iterable<M1> messages = messageStore.getVertexMessages(vertex.getId()); if (vertex.isHalted() && !Iterables.isEmpty(messages)) { vertex.wakeUp(); http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java index 499c862..dab3c2f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java @@ -42,6 +42,10 @@ public class GlobalStats implements Writable { private long messageBytesCount = 0; /** Whether the computation should be halted */ private boolean haltComputation = false; + /** Bytes of data stored to disk in the last superstep */ + private long oocStoreBytesCount = 0; + /** Bytes of data loaded to disk in the last superstep */ + private long oocLoadBytesCount = 0; /** * Master's decision on whether we should checkpoint and * what to do next. @@ -88,6 +92,14 @@ public class GlobalStats implements Writable { haltComputation = value; } + public long getOocStoreBytesCount() { + return oocStoreBytesCount; + } + + public long getOocLoadBytesCount() { + return oocLoadBytesCount; + } + public CheckpointStatus getCheckpointStatus() { return checkpointStatus; } @@ -97,6 +109,24 @@ public class GlobalStats implements Writable { } /** + * Add bytes loaded to the global stats. + * + * @param oocLoadBytesCount number of bytes to be added + */ + public void addOocLoadBytesCount(long oocLoadBytesCount) { + this.oocLoadBytesCount += oocLoadBytesCount; + } + + /** + * Add bytes stored to the global stats. + * + * @param oocStoreBytesCount number of bytes to be added + */ + public void addOocStoreBytesCount(long oocStoreBytesCount) { + this.oocStoreBytesCount += oocStoreBytesCount; + } + + /** * Add messages to the global stats. * * @param messageCount Number of messages to be added. http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 19ac615..87d5248 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -19,6 +19,8 @@ package org.apache.giraph.graph; import java.io.IOException; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; import java.net.URL; import java.net.URLDecoder; import java.util.ArrayList; @@ -29,6 +31,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.sun.management.GarbageCollectionNotificationInfo; +import com.yammer.metrics.core.Counter; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.CentralizedServiceWorker; @@ -46,6 +50,7 @@ import org.apache.giraph.metrics.GiraphTimer; import org.apache.giraph.metrics.GiraphTimerContext; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; +import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStore; @@ -70,6 +75,11 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; +import javax.management.Notification; +import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.openmbean.CompositeData; + /** * The Giraph-specific business logic for a single BSP * compute node in whatever underlying type of cluster @@ -110,6 +120,8 @@ end[PURE_YARN]*/ "time-to-first-message-ms"; /** Name of metric for time from first message till last message flushed */ public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms"; + /** Name of metric for time spent doing GC per superstep in msec */ + public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms"; /** Class logger */ private static final Logger LOG = Logger.getLogger(GraphTaskManager.class); @@ -156,6 +168,8 @@ end[PURE_YARN]*/ private GiraphTimerContext communicationTimerContext; /** Timer for WorkerContext#preSuperstep() */ private GiraphTimer wcPreSuperstepTimer; + /** Timer to keep aggregated time spent in GC in a superstep */ + private Counter gcTimeMetric; /** The Hadoop Mapper#Context for this job */ private final Mapper<?, ?, ?, ?>.Context context; /** is this GraphTaskManager the master? */ @@ -293,7 +307,9 @@ end[PURE_YARN]*/ return; } preLoadOnWorkerObservers(); + GiraphTimerContext superstepTimerContext = superstepTimer.time(); finishedSuperstepStats = serviceWorker.setup(); + superstepTimerContext.stop(); if (collectInputSuperstepStats(finishedSuperstepStats)) { return; } @@ -304,8 +320,7 @@ end[PURE_YARN]*/ // main superstep processing loop while (!finishedSuperstepStats.allVerticesHalted()) { final long superstep = serviceWorker.getSuperstep(); - GiraphTimerContext superstepTimerContext = - getTimerForThisSuperstep(superstep); + superstepTimerContext = getTimerForThisSuperstep(superstep); GraphState graphState = new GraphState(superstep, finishedSuperstepStats.getVertexCount(), finishedSuperstepStats.getEdgeCount(), @@ -619,6 +634,7 @@ end[PURE_YARN]*/ LOG.info("setup: Starting up BspServiceWorker..."); } serviceWorker = new BspServiceWorker<I, V, E>(context, this); + installGCMonitoring(); if (LOG.isInfoEnabled()) { LOG.info("setup: Registering health of this worker..."); } @@ -626,6 +642,45 @@ end[PURE_YARN]*/ } /** + * Install GC monitoring. This method intercepts all GC, log the gc, and + * notifies an out-of-core engine (if any is used) about the GC. + */ + private void installGCMonitoring() { + List<GarbageCollectorMXBean> mxBeans = ManagementFactory + .getGarbageCollectorMXBeans(); + final OutOfCoreEngine oocEngine = + serviceWorker.getServerData().getOocEngine(); + for (GarbageCollectorMXBean gcBean : mxBeans) { + NotificationEmitter emitter = (NotificationEmitter) gcBean; + NotificationListener listener = new NotificationListener() { + @Override + public void handleNotification(Notification notification, + Object handle) { + if (notification.getType().equals(GarbageCollectionNotificationInfo + .GARBAGE_COLLECTION_NOTIFICATION)) { + GarbageCollectionNotificationInfo info = + GarbageCollectionNotificationInfo.from( + (CompositeData) notification.getUserData()); + + if (LOG.isInfoEnabled()) { + LOG.info("installGCMonitoring: name = " + info.getGcName() + + ", action = " + info.getGcAction() + ", cause = " + + info.getGcCause() + ", duration = " + + info.getGcInfo().getDuration() + "ms"); + } + gcTimeMetric.inc(info.getGcInfo().getDuration()); + if (oocEngine != null) { + oocEngine.gcCompleted(info); + } + } + } + }; + //Add the listener + emitter.addNotificationListener(listener, null, null); + } + } + + /** * Initialize the root logger and appender to the settings in conf. */ private void initializeAndConfigureLogging() { @@ -680,6 +735,7 @@ end[PURE_YARN]*/ TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS); communicationTimer = new GiraphTimer(superstepMetrics, TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS); + gcTimeMetric = superstepMetrics.getCounter(TIMER_SUPERSTEP_GC_TIME); wcPreSuperstepTimer = new GiraphTimer(superstepMetrics, "worker-context-pre-superstep", TimeUnit.MILLISECONDS); } @@ -1000,6 +1056,12 @@ end[PURE_YARN]*/ return conf; } + /** + * @return Time spent in GC recorder by the GC listener + */ + public long getSuperstepGCTime() { + return gcTimeMetric.count(); + } /** * Default handler for uncaught exceptions. http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index cf8d1bd..8372bd3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -942,6 +942,10 @@ public class BspServiceMaster<I extends WritableComparable, workerFinishedInfoObj.getString( JSONOBJ_METRICS_KEY)), workerMetrics); + globalStats.addOocLoadBytesCount( + workerMetrics.getBytesLoadedFromDisk()); + globalStats.addOocStoreBytesCount( + workerMetrics.getBytesStoredOnDisk()); aggregatedMetrics.add(workerMetrics, hostnamePartitionId); } } catch (JSONException e) { @@ -2050,5 +2054,9 @@ public class BspServiceMaster<I extends WritableComparable, gs.getAggregateSentMessages().increment(globalStats.getMessageCount()); gs.getAggregateSentMessageBytes() .increment(globalStats.getMessageBytesCount()); + gs.getAggregateOOCBytesLoaded() + .increment(globalStats.getOocLoadBytesCount()); + gs.getAggregateOOCBytesStored() + .increment(globalStats.getOocStoreBytesCount()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java index 9d782c4..f99043f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java @@ -20,41 +20,26 @@ package org.apache.giraph.metrics; /** * An aggregator over metrics from multiple hosts. Computes min, max, and mean. + * + * @param <T> value type */ -public class AggregatedMetric { +public abstract class AggregatedMetric<T extends Number> { /** Minimum value seen with the host that it came from */ - private ValueWithHostname min; + protected ValueWithHostname<T> min; /** Maximum value seen with the host that it came from */ - private ValueWithHostname max; + protected ValueWithHostname<T> max; /** Total of all the values seen */ - private long sum; + protected T sum; /** Number of values seen */ - private long count; - - /** - * Create new aggregated metric. - */ - public AggregatedMetric() { - min = new ValueWithHostname(Long.MAX_VALUE); - max = new ValueWithHostname(Long.MIN_VALUE); - } + protected long count; /** * Add another item to the aggregation. * - * @param value long value to add + * @param value value to add * @param hostnamePartitionId String hostname it came from */ - public void addItem(long value, String hostnamePartitionId) { - if (value < min.getValue()) { - min.set(value, hostnamePartitionId); - } - if (value > max.getValue()) { - max.set(value, hostnamePartitionId); - } - sum += value; - count++; - } + public abstract void addItem(T value, String hostnamePartitionId); /** * Whether this AggregatedMetric has any data. @@ -70,7 +55,7 @@ public class AggregatedMetric { * * @return ValueWithHostname for minimum */ - public ValueWithHostname min() { + public ValueWithHostname<T> min() { return min; } @@ -79,16 +64,16 @@ public class AggregatedMetric { * * @return ValueWithHostname for maximum */ - public ValueWithHostname max() { + public ValueWithHostname<T> max() { return max; } /** * Get total of all the values seen * - * @return long total of values seen + * @return total of values seen */ - public long sum() { + public T sum() { return sum; } @@ -97,9 +82,7 @@ public class AggregatedMetric { * * @return computed average of all the values */ - public double mean() { - return sum / (double) count; - } + public abstract double mean(); /** * Get number of values seen @@ -110,3 +93,4 @@ public class AggregatedMetric { return count; } } + http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java new file mode 100644 index 0000000..abd0ff6 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.metrics; + +/** + * Aggregator over metrics with double values + */ +public final class AggregatedMetricDouble extends AggregatedMetric<Double> { + /** + * Constructor + */ + public AggregatedMetricDouble() { + min = new ValueWithHostname<>(Double.MAX_VALUE); + max = new ValueWithHostname<>(Double.MIN_VALUE); + sum = 0.0; + } + + @Override + public void addItem(Double value, String hostnamePartitionId) { + if (value < min.getValue()) { + min.set(value, hostnamePartitionId); + } + if (value > max.getValue()) { + max.set(value, hostnamePartitionId); + } + sum += value; + count++; + } + + @Override + public double mean() { + return sum / count; + } +}
