Updated Branches: refs/heads/trunk f31c62575 -> 40bc599b8
GIRAPH-537: Fix log messages produced by aggregators (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/40bc599b Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/40bc599b Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/40bc599b Branch: refs/heads/trunk Commit: 40bc599b8a7e7caaeb0a284fd2d2103926b79288 Parents: f31c625 Author: Maja Kabiljo <[email protected]> Authored: Wed Mar 20 11:38:18 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Wed Mar 20 11:38:18 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../comm/aggregators/AllAggregatorServerData.java | 34 ++-- .../aggregators/OwnerAggregatorServerData.java | 18 +- .../giraph/comm/netty/NettyMasterClient.java | 3 +- .../NettyWorkerAggregatorRequestProcessor.java | 6 +- .../requests/ByteArrayWithSenderTaskIdRequest.java | 71 +++++++ .../requests/SendAggregatorsToOwnerRequest.java | 12 +- .../requests/SendAggregatorsToWorkerRequest.java | 10 +- .../requests/SendWorkerAggregatorsRequest.java | 10 +- .../org/apache/giraph/utils/ExpectedBarrier.java | 125 ------------ .../apache/giraph/utils/TaskIdsPermitsBarrier.java | 155 +++++++++++++++ .../giraph/worker/WorkerAggregatorHandler.java | 31 +++- 12 files changed, 309 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index e4430f2..462f104 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-537: Fix log messages produced by aggregators (majakabiljo) + GIRAPH-480: Add convergence detection to org.apache.giraph.examples.RandomWalkVertex (ssc) GIRAPH-565: Make an easy way to gather some logs from workers on master (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java index dddd1cb..f38c6cd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java @@ -19,7 +19,8 @@ package org.apache.giraph.comm.aggregators; import org.apache.giraph.aggregators.Aggregator; -import org.apache.giraph.utils.ExpectedBarrier; +import org.apache.giraph.master.MasterInfo; +import org.apache.giraph.utils.TaskIdsPermitsBarrier; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -30,6 +31,7 @@ import com.google.common.collect.Maps; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; /** @@ -65,7 +67,7 @@ public class AllAggregatorServerData { * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR) * to know how many requests it has to receive. */ - private final ExpectedBarrier masterBarrier; + private final TaskIdsPermitsBarrier masterBarrier; /** * Aggregator data which this worker received from master and which it is * going to distribute before starting next superstep. Thread-safe. @@ -78,7 +80,7 @@ public class AllAggregatorServerData { * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR) * to know how many requests it has to receive. */ - private final ExpectedBarrier workersBarrier; + private final TaskIdsPermitsBarrier workersBarrier; /** Progressable used to report progress */ private final Progressable progressable; @@ -89,8 +91,8 @@ public class AllAggregatorServerData { */ public AllAggregatorServerData(Progressable progressable) { this.progressable = progressable; - workersBarrier = new ExpectedBarrier(progressable); - masterBarrier = new ExpectedBarrier(progressable); + workersBarrier = new TaskIdsPermitsBarrier(progressable); + masterBarrier = new TaskIdsPermitsBarrier(progressable); } /** @@ -154,9 +156,10 @@ public class AllAggregatorServerData { * arrive from master. * * @param requestCount Number of requests which should arrive + * @param taskId Task id of master */ - public void receivedRequestCountFromMaster(long requestCount) { - masterBarrier.requirePermits(requestCount); + public void receivedRequestCountFromMaster(long requestCount, int taskId) { + masterBarrier.requirePermits(requestCount, taskId); } /** @@ -172,19 +175,22 @@ public class AllAggregatorServerData { * arrive from one of the workers. * * @param requestCount Number of requests which should arrive + * @param taskId Task id of that worker */ - public void receivedRequestCountFromWorker(long requestCount) { - workersBarrier.requirePermits(requestCount); + public void receivedRequestCountFromWorker(long requestCount, int taskId) { + workersBarrier.requirePermits(requestCount, taskId); } /** * This function will wait until all aggregator requests from master have * arrived, and return that data afterwards. * + * @param masterInfo Master info * @return Iterable through data received from master */ - public Iterable<byte[]> getDataFromMasterWhenReady() { - masterBarrier.waitForRequiredPermits(1); + public Iterable<byte[]> getDataFromMasterWhenReady(MasterInfo masterInfo) { + masterBarrier.waitForRequiredPermits( + Collections.singleton(masterInfo.getTaskId())); if (LOG.isDebugEnabled()) { LOG.debug("getDataFromMasterWhenReady: " + "Aggregator data for distribution ready"); @@ -196,7 +202,7 @@ public class AllAggregatorServerData { * This function will wait until all aggregator requests from workers have * arrived, and fill the maps for next superstep when ready. * - * @param numberOfWorkers Total number of workers in the job + * @param workerIds All workers in the job apart from the current one * @param previousAggregatedValuesMap Map of values from previous * superstep to fill out * @param currentAggregatorMap Map of aggregators for current superstep to @@ -204,10 +210,10 @@ public class AllAggregatorServerData { * be set to initial value. */ public void fillNextSuperstepMapsWhenReady( - int numberOfWorkers, + Set<Integer> workerIds, Map<String, Writable> previousAggregatedValuesMap, Map<String, Aggregator<Writable>> currentAggregatorMap) { - workersBarrier.waitForRequiredPermits(numberOfWorkers - 1); + workersBarrier.waitForRequiredPermits(workerIds); if (LOG.isDebugEnabled()) { LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready"); } http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java index 70ff7fe..bd6068a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java @@ -19,7 +19,7 @@ package org.apache.giraph.comm.aggregators; import org.apache.giraph.aggregators.Aggregator; -import org.apache.giraph.utils.ExpectedBarrier; +import org.apache.giraph.utils.TaskIdsPermitsBarrier; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -30,6 +30,7 @@ import com.google.common.collect.Maps; import java.util.AbstractMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; /** @@ -68,7 +69,7 @@ public class OwnerAggregatorServerData { * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR) * to know how many requests it has to receive. */ - private final ExpectedBarrier workersBarrier; + private final TaskIdsPermitsBarrier workersBarrier; /** Progressable used to report progress */ private final Progressable progressable; @@ -79,7 +80,7 @@ public class OwnerAggregatorServerData { */ public OwnerAggregatorServerData(Progressable progressable) { this.progressable = progressable; - workersBarrier = new ExpectedBarrier(progressable); + workersBarrier = new TaskIdsPermitsBarrier(progressable); } /** @@ -143,9 +144,10 @@ public class OwnerAggregatorServerData { * arrive from one of the workers. Thread-safe. * * @param requestCount Number of requests which should arrive + * @param taskId Task id of that worker */ - public void receivedRequestCountFromWorker(long requestCount) { - workersBarrier.requirePermits(requestCount); + public void receivedRequestCountFromWorker(long requestCount, int taskId) { + workersBarrier.requirePermits(requestCount, taskId); } /** @@ -153,12 +155,12 @@ public class OwnerAggregatorServerData { * workers are ready and aggregated, and return final aggregated values * afterwards. * - * @param numberOfWorkers Total number of workers in the job + * @param workerIds All workers in the job apart from the current one * @return Iterable through final aggregated values which this worker owns */ public Iterable<Map.Entry<String, Writable>> - getMyAggregatorValuesWhenReady(int numberOfWorkers) { - workersBarrier.waitForRequiredPermits(numberOfWorkers - 1); + getMyAggregatorValuesWhenReady(Set<Integer> workerIds) { + workersBarrier.waitForRequiredPermits(workerIds); if (LOG.isDebugEnabled()) { LOG.debug("getMyAggregatorValuesWhenReady: Values ready"); } http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java index 86ea8dc..319f41a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java @@ -104,7 +104,8 @@ public class NettyMasterClient implements MasterClient { byte[] aggregatorData = sendAggregatorCache.removeAggregators(worker.getTaskId()); nettyClient.sendWritableRequest( - worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData)); + worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData, + service.getMasterInfo().getTaskId())); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java index cd24219..d1cce64 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java @@ -109,7 +109,8 @@ public class NettyWorkerAggregatorRequestProcessor byte[] aggregatorData = sendAggregatedValueCache.removeAggregators(worker.getTaskId()); workerClient.sendWritableRequest(worker.getTaskId(), - new SendWorkerAggregatorsRequest(aggregatorData)); + new SendWorkerAggregatorsRequest(aggregatorData, + serviceWorker.getWorkerInfo().getTaskId())); } @Override @@ -124,7 +125,8 @@ public class NettyWorkerAggregatorRequestProcessor Iterable<byte[]> aggregatorDataList) throws IOException { for (byte[] aggregatorData : aggregatorDataList) { SendAggregatorsToWorkerRequest request = - new SendAggregatorsToWorkerRequest(aggregatorData); + new SendAggregatorsToWorkerRequest(aggregatorData, + serviceWorker.getWorkerInfo().getTaskId()); for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) { if (!isThisWorker(worker)) { workerClient.sendWritableRequest(worker.getTaskId(), request); http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java new file mode 100644 index 0000000..d66339e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayWithSenderTaskIdRequest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Abstract request which has a byte array and task id of the sender as its + * data + */ +public abstract class ByteArrayWithSenderTaskIdRequest + extends ByteArrayRequest { + /** Task id of the sender of request */ + private int senderTaskId; + + /** + * Constructor + * + * @param data Request data + * @param senderTaskId Sender task id + */ + public ByteArrayWithSenderTaskIdRequest(byte[] data, int senderTaskId) { + super(data); + this.senderTaskId = senderTaskId; + } + + /** + * Default constructor + */ + public ByteArrayWithSenderTaskIdRequest() { + } + + public int getSenderTaskId() { + return senderTaskId; + } + + @Override + void writeRequest(DataOutput output) throws IOException { + super.writeRequest(output); + output.writeInt(senderTaskId); + } + + @Override + void readFieldsRequest(DataInput input) throws IOException { + super.readFieldsRequest(input); + senderTaskId = input.readInt(); + } + + @Override + public int getSerializedSize() { + return super.getSerializedSize() + 4; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java index 21b1b2d..e2681ee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java @@ -32,16 +32,17 @@ import java.io.IOException; * Request to send final aggregatd values from master to worker which owns * the aggregators */ -public class SendAggregatorsToOwnerRequest extends ByteArrayRequest - implements WorkerRequest { +public class SendAggregatorsToOwnerRequest + extends ByteArrayWithSenderTaskIdRequest implements WorkerRequest { /** * Constructor * * @param data Serialized aggregator data + * @param senderTaskId Sender task id */ - public SendAggregatorsToOwnerRequest(byte[] data) { - super(data); + public SendAggregatorsToOwnerRequest(byte[] data, int senderTaskId) { + super(data, senderTaskId); } /** @@ -62,7 +63,8 @@ public class SendAggregatorsToOwnerRequest extends ByteArrayRequest if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); count.readFields(input); - aggregatorData.receivedRequestCountFromMaster(count.get()); + aggregatorData.receivedRequestCountFromMaster(count.get(), + getSenderTaskId()); } else { Class<Aggregator<Writable>> aggregatorClass = AggregatorUtils.getAggregatorClass(aggregatorClassName); http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java index 7e84e17..52e4cba 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java @@ -33,15 +33,16 @@ import java.io.IOException; * other workers */ public class SendAggregatorsToWorkerRequest extends - ByteArrayRequest implements WorkerRequest { + ByteArrayWithSenderTaskIdRequest implements WorkerRequest { /** * Constructor * * @param data Serialized aggregator data + * @param senderTaskId Sender task id */ - public SendAggregatorsToWorkerRequest(byte[] data) { - super(data); + public SendAggregatorsToWorkerRequest(byte[] data, int senderTaskId) { + super(data, senderTaskId); } /** @@ -62,7 +63,8 @@ public class SendAggregatorsToWorkerRequest extends if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); count.readFields(input); - aggregatorData.receivedRequestCountFromWorker(count.get()); + aggregatorData.receivedRequestCountFromWorker(count.get(), + getSenderTaskId()); } else { Class<Aggregator<Writable>> aggregatorClass = AggregatorUtils.getAggregatorClass(aggregatorClassName); http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java index 264f03a..00a0c26 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java @@ -32,15 +32,16 @@ import java.io.IOException; * which were computed by one worker's vertices) */ public class SendWorkerAggregatorsRequest extends - ByteArrayRequest implements WorkerRequest { + ByteArrayWithSenderTaskIdRequest implements WorkerRequest { /** * Constructor * * @param data Serialized aggregator data + * @param senderTaskId Sender task id */ - public SendWorkerAggregatorsRequest(byte[] data) { - super(data); + public SendWorkerAggregatorsRequest(byte[] data, int senderTaskId) { + super(data, senderTaskId); } /** @@ -62,7 +63,8 @@ public class SendWorkerAggregatorsRequest extends AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); count.readFields(input); - aggregatorData.receivedRequestCountFromWorker(count.get()); + aggregatorData.receivedRequestCountFromWorker(count.get(), + getSenderTaskId()); } else { Writable aggregatedValue = aggregatorData.createAggregatorInitialValue(aggregatorName); http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java deleted file mode 100644 index ccd137c..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.utils; - -import org.apache.hadoop.util.Progressable; -import org.apache.log4j.Logger; - -/** - * User must follow this protocol for concurrent access: - * - * (1) an object instance is constructed - * (2) arbitrarily many times - * (2a) concurrent calls to requirePermits(), releasePermits() and - * waitForRequiredPermits() are issued - * (2b) waitForRequiredPermits() returns - * - * Note that the next cycle of calls to requirePermits() or releasePermits() - * cannot start until the previous call to waitForRequiredPermits() - * has returned. - * - * Methods of this class are thread-safe. - */ -public class ExpectedBarrier { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(ExpectedBarrier.class); - /** Msecs to refresh the progress meter */ - private static final int MSEC_PERIOD = 10000; - /** Progressable for reporting progress */ - private final Progressable progressable; - /** Number of times permits were added */ - private long timesRequired = 0; - /** Number of permits we are currently waiting for */ - private long waitingOnPermits = 0; - /** Logger */ - private final TimedLogger logger; - - /** - * Constructor - * - * @param progressable Progressable for reporting progress - */ - public ExpectedBarrier(Progressable progressable) { - this.progressable = progressable; - logger = new TimedLogger(MSEC_PERIOD, LOG); - } - - /** - * Wait until permits have been required desired number of times, - * and all required permits are available - * - * @param desiredTimesRequired How many times should permits have been - * required - */ - public synchronized void waitForRequiredPermits( - long desiredTimesRequired) { - while (timesRequired < desiredTimesRequired || waitingOnPermits > 0) { - try { - wait(MSEC_PERIOD); - } catch (InterruptedException e) { - throw new IllegalStateException("waitForRequiredPermits: " + - "InterruptedException occurred"); - } - progressable.progress(); - if (LOG.isInfoEnabled()) { - if (timesRequired < desiredTimesRequired) { - logger.info("waitForRequiredPermits: " + - "Waiting for times required to be " + desiredTimesRequired + - " (currently " + timesRequired + ") "); - } else { - logger.info("waitForRequiredPermits: " + - "Waiting for " + waitingOnPermits + " more permits."); - } - } - } - - // Reset for the next time to use - timesRequired = 0; - waitingOnPermits = 0; - } - - /** - * Require more permits. This will increase the number of times permits - * were required. Doesn't wait for permits to become available. - * - * @param permits Number of permits to require - */ - public synchronized void requirePermits(long permits) { - timesRequired++; - waitingOnPermits += permits; - notifyAll(); - } - - /** - * Release one permit. - */ - public synchronized void releaseOnePermit() { - releasePermits(1); - } - - /** - * Release some permits. - * - * @param permits Number of permits to release - */ - public synchronized void releasePermits(long permits) { - waitingOnPermits -= permits; - notifyAll(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java new file mode 100644 index 0000000..e2c22cc --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/TaskIdsPermitsBarrier.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Logger; + +import com.google.common.collect.Sets; + +import java.util.HashSet; +import java.util.Set; + +/** + * This barrier is used when we don't know how many events are we waiting on + * from the start. Instead we have a set of task ids, and each of those will, + * at some point of time, give the information about how many events from it + * should we expect. Barrier will be waiting for all the tasks to notify it + * about that number of events, and than it will also wait for all the events + * to happen. + * + * requirePermits() corresponds to task notifying us how many events from it + * to expect, and releasePermits() notifies us about events happening. + * + * This class is currently used during preparation of aggregators. + * + * User must follow this protocol for concurrent access: + * + * (1) an object instance is constructed + * (2) arbitrarily many times + * (2a) concurrent calls to requirePermits(), releasePermits() and + * waitForRequiredPermits() are issued + * (2b) waitForRequiredPermits() returns + * + * Note that the next cycle of calls to requirePermits() or releasePermits() + * cannot start until the previous call to waitForRequiredPermits() + * has returned. + * + * Methods of this class are thread-safe. + */ +public class TaskIdsPermitsBarrier { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(TaskIdsPermitsBarrier.class); + /** Msecs to refresh the progress meter */ + private static final int MSEC_PERIOD = 10000; + /** Maximum number of task ids to list in the log */ + private static final int MAX_TASK_IDS_TO_LOG = 10; + /** Progressable for reporting progress */ + private final Progressable progressable; + /** Number of permits we are currently waiting for */ + private long waitingOnPermits = 0; + /** Set of task ids which required permits already */ + private final Set<Integer> arrivedTaskIds = new HashSet<Integer>(); + /** Logger */ + private final TimedLogger logger; + + /** + * Constructor + * + * @param progressable Progressable for reporting progress + */ + public TaskIdsPermitsBarrier(Progressable progressable) { + this.progressable = progressable; + logger = new TimedLogger(MSEC_PERIOD, LOG); + } + + /** + * Wait until permits have been required desired number of times, + * and all required permits are available + * + * @param expectedTaskIds List of task ids which we are waiting permits from + */ + public synchronized void waitForRequiredPermits( + Set<Integer> expectedTaskIds) { + while (arrivedTaskIds.size() < expectedTaskIds.size() || + waitingOnPermits > 0) { + try { + wait(MSEC_PERIOD); + } catch (InterruptedException e) { + throw new IllegalStateException("waitForRequiredPermits: " + + "InterruptedException occurred"); + } + progressable.progress(); + if (LOG.isInfoEnabled()) { + if (arrivedTaskIds.size() < expectedTaskIds.size()) { + String logSuffix = ""; + if (expectedTaskIds.size() - arrivedTaskIds.size() <= + MAX_TASK_IDS_TO_LOG) { + Sets.SetView<Integer> difference = + Sets.difference(expectedTaskIds, arrivedTaskIds); + logSuffix = ", task ids: " + difference; + } + logger.info("waitForRequiredPermits: " + + "Waiting for " + + (expectedTaskIds.size() - arrivedTaskIds.size()) + + " more tasks to send their aggregator data" + + logSuffix); + } else { + logger.info("waitForRequiredPermits: " + + "Waiting for " + waitingOnPermits + " more aggregator requests"); + } + } + } + + // Reset for the next time to use + arrivedTaskIds.clear(); + waitingOnPermits = 0; + } + + /** + * Require more permits. This will increase the number of times permits + * were required. Doesn't wait for permits to become available. + * + * @param permits Number of permits to require + * @param taskId Task id which required permits + */ + public synchronized void requirePermits(long permits, int taskId) { + arrivedTaskIds.add(taskId); + waitingOnPermits += permits; + notifyAll(); + } + + /** + * Release one permit. + */ + public synchronized void releaseOnePermit() { + releasePermits(1); + } + + /** + * Release some permits. + * + * @param permits Number of permits to release + */ + public synchronized void releasePermits(long permits) { + waitingOnPermits -= permits; + notifyAll(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/40bc599b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java index 001cf59..3c18449 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java @@ -31,9 +31,11 @@ import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.io.IOException; import java.util.Map; +import java.util.Set; /** * Handler for aggregators on worker. Provides the aggregated values and @@ -129,7 +131,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { serviceWorker.getServerData().getAllAggregatorData(); // Wait for my aggregators Iterable<byte[]> dataToDistribute = - allAggregatorData.getDataFromMasterWhenReady(); + allAggregatorData.getDataFromMasterWhenReady( + serviceWorker.getMasterInfo()); try { // Distribute my aggregators requestProcessor.distributeAggregators(dataToDistribute); @@ -139,7 +142,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { } // Wait for all other aggregators and store them allAggregatorData.fillNextSuperstepMapsWhenReady( - serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap, + getOtherWorkerIdsSet(), previousAggregatedValueMap, currentAggregatorMap); allAggregatorData.reset(); if (LOG.isDebugEnabled()) { @@ -154,8 +157,10 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { */ public void finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor) { - if (LOG.isDebugEnabled()) { - LOG.debug("finishSuperstep: Start finishing aggregators"); + if (LOG.isInfoEnabled()) { + LOG.info("finishSuperstep: Start gathering aggregators, " + + "workers will send their aggregated values " + + "once they are done with superstep computation"); } OwnerAggregatorServerData ownerAggregatorData = serviceWorker.getServerData().getOwnerAggregatorData(); @@ -189,7 +194,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { // Wait to receive partial aggregated values from all other workers Iterable<Map.Entry<String, Writable>> myAggregators = ownerAggregatorData.getMyAggregatorValuesWhenReady( - serviceWorker.getWorkerInfoList().size()); + getOtherWorkerIdsSet()); // Send final aggregated values to master AggregatedValueOutputStream aggregatorOutput = @@ -245,6 +250,22 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { } /** + * Get set of all worker task ids except the current one + * + * @return Set of all other worker task ids + */ + public Set<Integer> getOtherWorkerIdsSet() { + Set<Integer> otherWorkers = Sets.newHashSetWithExpectedSize( + serviceWorker.getWorkerInfoList().size()); + for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) { + if (workerInfo.getTaskId() != serviceWorker.getWorkerInfo().getTaskId()) { + otherWorkers.add(workerInfo.getTaskId()); + } + } + return otherWorkers; + } + + /** * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}. * We can use one instance of this object per thread to prevent * synchronizing on each aggregate() call. In the end of superstep,
