Updated Branches: refs/heads/trunk c6c86aa65 -> a95066cd0
Refactored out SendCache to SendVertexIdDataCache and SendCache. Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e26b51e0 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e26b51e0 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e26b51e0 Branch: refs/heads/trunk Commit: e26b51e0dc416fe026734b7079441baeefc33247 Parents: c6c86aa Author: Avery Ching <[email protected]> Authored: Mon Jul 15 22:57:16 2013 -0700 Committer: Avery Ching <[email protected]> Committed: Mon Oct 7 17:05:04 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/giraph/comm/SendCache.java | 252 ------------------- .../org/apache/giraph/comm/SendDataCache.java | 227 +++++++++++++++++ .../org/apache/giraph/comm/SendEdgeCache.java | 2 +- .../apache/giraph/comm/SendMessageCache.java | 8 +- .../giraph/comm/SendVertexIdDataCache.java | 92 +++++++ 5 files changed, 324 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java deleted file mode 100644 index 30c07ee..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.utils.ByteArrayVertexIdData; -import org.apache.giraph.utils.PairList; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.WritableComparable; - -import java.util.List; -import java.util.Map; - -/** - * An abstract structure for caching data indexed by vertex id, - * to be sent to workers in bulk. Not thread-safe. - * - * @param <I> Vertex id - * @param <T> Data - * @param <B> Specialization of {@link ByteArrayVertexIdData} for T - */ -@SuppressWarnings("unchecked") -public abstract class SendCache<I extends WritableComparable, T, - B extends ByteArrayVertexIdData<I, T>> { - /** How big to initially make output streams for each worker's partitions */ - private final int[] initialBufferSizes; - /** Giraph configuration */ - private final ImmutableClassesGiraphConfiguration conf; - /** Service worker */ - private final CentralizedServiceWorker serviceWorker; - /** Internal cache */ - private final ByteArrayVertexIdData<I, T>[] dataCache; - /** Size of data (in bytes) for each worker */ - private final int[] dataSizes; - /** Total number of workers */ - private final int numWorkers; - /** List of partition ids belonging to a worker */ - private final Map<WorkerInfo, List<Integer>> workerPartitions = - Maps.newHashMap(); - - /** - * Constructor. - * - * @param conf Giraph configuration - * @param serviceWorker Service worker - * @param maxRequestSize Maximum request size (in bytes) - * @param additionalRequestSize Additional request size (expressed as a - * ratio of the average request size) - */ - public SendCache(ImmutableClassesGiraphConfiguration conf, - CentralizedServiceWorker<?, ?, ?> serviceWorker, - int maxRequestSize, - float additionalRequestSize) { - this.conf = conf; - this.serviceWorker = serviceWorker; - int maxPartition = 0; - for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) { - List<Integer> workerPartitionIds = - workerPartitions.get(partitionOwner.getWorkerInfo()); - if (workerPartitionIds == null) { - workerPartitionIds = Lists.newArrayList(); - workerPartitions.put(partitionOwner.getWorkerInfo(), - workerPartitionIds); - } - workerPartitionIds.add(partitionOwner.getPartitionId()); - maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition); - } - dataCache = new ByteArrayVertexIdData[maxPartition + 1]; - - int maxWorker = 0; - for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) { - maxWorker = Math.max(maxWorker, workerInfo.getTaskId()); - } - dataSizes = new int[maxWorker + 1]; - - int initialRequestSize = - (int) (maxRequestSize * (1 + additionalRequestSize)); - initialBufferSizes = new int[maxWorker + 1]; - for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) { - initialBufferSizes[workerInfo.getTaskId()] = - initialRequestSize / workerPartitions.get(workerInfo).size(); - } - numWorkers = maxWorker + 1; - } - - /** - * Create a new {@link ByteArrayVertexIdData} specialized for the use case. - * - * @return A new instance of {@link ByteArrayVertexIdData} - */ - public abstract B createByteArrayVertexIdData(); - - /** - * Add data to the cache. - * - * @param workerInfo the remote worker destination - * @param partitionId the remote Partition this message belongs to - * @param destVertexId vertex id that is ultimate destination - * @param data Data to send to remote worker - * @return Size of messages for the worker. - */ - public int addData(WorkerInfo workerInfo, - int partitionId, I destVertexId, T data) { - // Get the data collection - ByteArrayVertexIdData<I, T> partitionData = - getPartitionData(workerInfo, partitionId); - int originalSize = partitionData.getSize(); - partitionData.add(destVertexId, data); - // Update the size of cached, outgoing data per worker - dataSizes[workerInfo.getTaskId()] += - partitionData.getSize() - originalSize; - return dataSizes[workerInfo.getTaskId()]; - } - - /** - * This method is similar to the method above, - * but use a serialized id to replace original I type - * destVertexId. - * - * @param workerInfo The remote worker destination - * @param partitionId The remote Partition this message belongs to - * @param serializedId The byte array to store the serialized target vertex id - * @param idPos The length of bytes of serialized id in the byte array above - * @param data Data to send to remote worker - * @return The number of bytes added to the target worker - */ - public int addData(WorkerInfo workerInfo, int partitionId, - byte[] serializedId, int idPos, T data) { - // Get the data collection - ByteArrayVertexIdData<I, T> partitionData = - getPartitionData(workerInfo, partitionId); - int originalSize = partitionData.getSize(); - partitionData.add(serializedId, idPos, data); - // Update the size of cached, outgoing data per worker - dataSizes[workerInfo.getTaskId()] += - partitionData.getSize() - originalSize; - return dataSizes[workerInfo.getTaskId()]; - } - - /** - * This method tries to get a partition data from the data cache. - * If null, it will create one. - * - * @param workerInfo The remote worker destination - * @param partitionId The remote Partition this message belongs to - * @return The partition data in data cache - */ - private ByteArrayVertexIdData<I, T> getPartitionData(WorkerInfo workerInfo, - int partitionId) { - ByteArrayVertexIdData<I, T> partitionData = dataCache[partitionId]; - if (partitionData == null) { - partitionData = createByteArrayVertexIdData(); - partitionData.setConf(conf); - partitionData.initialize(initialBufferSizes[workerInfo.getTaskId()]); - dataCache[partitionId] = partitionData; - } - return partitionData; - } - - /** - * Gets the data for a worker and removes it from the cache. - * - * @param workerInfo The address of the worker who owns the data - * partitions that are receiving the data - * @return List of pairs (partitionId, ByteArrayVertexIdData), - * where all partition ids belong to workerInfo - */ - public PairList<Integer, B> - removeWorkerData(WorkerInfo workerInfo) { - PairList<Integer, B> workerData = new PairList<Integer, B>(); - List<Integer> partitions = workerPartitions.get(workerInfo); - workerData.initialize(partitions.size()); - for (Integer partitionId : partitions) { - if (dataCache[partitionId] != null) { - workerData.add(partitionId, (B) dataCache[partitionId]); - dataCache[partitionId] = null; - } - } - dataSizes[workerInfo.getTaskId()] = 0; - return workerData; - } - - /** - * Gets all the data and removes it from the cache. - * - * @return All data for all vertices for all partitions - */ - public PairList<WorkerInfo, PairList<Integer, B>> removeAllData() { - PairList<WorkerInfo, PairList<Integer, B>> allData = - new PairList<WorkerInfo, PairList<Integer, B>>(); - allData.initialize(dataSizes.length); - for (WorkerInfo workerInfo : workerPartitions.keySet()) { - PairList<Integer, B> workerData = removeWorkerData(workerInfo); - if (!workerData.isEmpty()) { - allData.add(workerInfo, workerData); - } - dataSizes[workerInfo.getTaskId()] = 0; - } - return allData; - } - - protected ImmutableClassesGiraphConfiguration getConf() { - return conf; - } - - /** - * Get the service worker. - * - * @return CentralizedServiceWorker - */ - protected CentralizedServiceWorker getServiceWorker() { - return serviceWorker; - } - - /** - * Get the initial buffer size for the messages sent to a worker. - * - * @param taskId The task ID of a worker. - * @return The initial buffer size for a worker. - */ - protected int getSendWorkerInitialBufferSize(int taskId) { - return initialBufferSizes[taskId]; - } - - protected int getNumWorkers() { - return this.numWorkers; - } - - protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() { - return workerPartitions; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java new file mode 100644 index 0000000..6973785 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendDataCache.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.utils.ByteArrayVertexIdData; +import org.apache.giraph.utils.PairList; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import javax.annotation.concurrent.NotThreadSafe; +import java.util.List; +import java.util.Map; + +/** + * An abstract structure for caching data by partitions + * to be sent to workers in bulk. Not thread-safe. + * + * @param <I> Vertex id + * @param <D> Data type + */ +@NotThreadSafe +@SuppressWarnings("unchecked") +public abstract class SendDataCache<I extends WritableComparable, + D extends Writable> { + /** + * Internal cache of partitions (index) to their partition caches of + * type D. + */ + private final D[] dataCache; + /** How big to initially make output streams for each worker's partitions */ + private final int[] initialBufferSizes; + /** Giraph configuration */ + private final ImmutableClassesGiraphConfiguration conf; + /** Service worker */ + private final CentralizedServiceWorker serviceWorker; + /** Size of data (in bytes) for each worker */ + private final int[] dataSizes; + /** Total number of workers */ + private final int numWorkers; + /** List of partition ids belonging to a worker */ + private final Map<WorkerInfo, List<Integer>> workerPartitions = + Maps.newHashMap(); + /** Giraph configuration */ + private final ImmutableClassesGiraphConfiguration conf; + + /** + * Constructor. + * + * @param conf Giraph configuration + * @param serviceWorker Service worker + * @param maxRequestSize Maximum request size (in bytes) + * @param additionalRequestSize Additional request size (expressed as a + * ratio of the average request size) + */ + public SendDataCache(ImmutableClassesGiraphConfiguration conf, + CentralizedServiceWorker<?, ?, ?> serviceWorker, + int maxRequestSize, + float additionalRequestSize) { + this.conf = conf; + this.serviceWorker = serviceWorker; + int maxPartition = 0; + for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) { + List<Integer> workerPartitionIds = + workerPartitions.get(partitionOwner.getWorkerInfo()); + if (workerPartitionIds == null) { + workerPartitionIds = Lists.newArrayList(); + workerPartitions.put(partitionOwner.getWorkerInfo(), + workerPartitionIds); + } + workerPartitionIds.add(partitionOwner.getPartitionId()); + maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition); + } + dataCache = (D[]) new Writable[maxPartition + 1]; + + int maxWorker = 0; + for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) { + maxWorker = Math.max(maxWorker, workerInfo.getTaskId()); + } + dataSizes = new int[maxWorker + 1]; + + int initialRequestSize = + (int) (maxRequestSize * (1 + additionalRequestSize)); + initialBufferSizes = new int[maxWorker + 1]; + for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) { + initialBufferSizes[workerInfo.getTaskId()] = + initialRequestSize / workerPartitions.get(workerInfo).size(); + } + numWorkers = maxWorker + 1; + } + + /** + * Gets the data for a worker and removes it from the cache. + * + * @param workerInfo the address of the worker who owns the data + * partitions that are receiving the data + * @return List of pairs (partitionId, ByteArrayVertexIdData), + * where all partition ids belong to workerInfo + */ + public PairList<Integer, D> + removeWorkerData(WorkerInfo workerInfo) { + PairList<Integer, D> workerData = new PairList<Integer, D>(); + List<Integer> partitions = workerPartitions.get(workerInfo); + workerData.initialize(partitions.size()); + for (Integer partitionId : partitions) { + if (dataCache[partitionId] != null) { + workerData.add(partitionId, (D) dataCache[partitionId]); + dataCache[partitionId] = null; + } + } + dataSizes[workerInfo.getTaskId()] = 0; + return workerData; + } + + /** + * Gets all the data and removes it from the cache. + * + * @return All data for all vertices for all partitions + */ + public PairList<WorkerInfo, PairList<Integer, D>> removeAllData() { + PairList<WorkerInfo, PairList<Integer, D>> allData = + new PairList<WorkerInfo, PairList<Integer, D>>(); + allData.initialize(dataSizes.length); + for (WorkerInfo workerInfo : workerPartitions.keySet()) { + PairList<Integer, D> workerData = removeWorkerData(workerInfo); + if (!workerData.isEmpty()) { + allData.add(workerInfo, workerData); + } + dataSizes[workerInfo.getTaskId()] = 0; + } + return allData; + } + + /** + * Get the data cache for a partition id + * + * @param partitionId Partition id + * @return Data cache for a partition + */ + public D getData(int partitionId) { + return dataCache[partitionId]; + } + + /** + * Set the data cache for a partition id + * + * @param partitionId Partition id + * @param data Data to be set for a partition id + */ + public void setData(int partitionId, D data) { + dataCache[partitionId] = data; + } + + /** + * Get initial buffer size of a partition. + * + * @param partitionId Partition id + * @return Initial buffer size of a partition + */ + public int getInitialBufferSize(int partitionId) { + return initialBufferSizes[partitionId]; + } + + /** + * Increment the data size + * + * @param partitionId Partition id + * @param size Size to increment by + * @return new data size + */ + public int incrDataSize(int partitionId, int size) { + dataSizes[partitionId] += size; + return dataSizes[partitionId]; + } + + public ImmutableClassesGiraphConfiguration getConf() { + return conf; + } + + /** + * Get the service worker. + * + * @return CentralizedServiceWorker + */ + protected CentralizedServiceWorker getServiceWorker() { + return serviceWorker; + } + + /** + * Get the initial buffer size for the messages sent to a worker. + * + * @param taskId The task ID of a worker. + * @return The initial buffer size for a worker. + */ + protected int getSendWorkerInitialBufferSize(int taskId) { + return initialBufferSizes[taskId]; + } + + protected int getNumWorkers() { + return this.numWorkers; + } + + protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() { + return workerPartitions; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java index 5513da2..8350a55 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java @@ -38,7 +38,7 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE; * @param <E> Edge value */ public class SendEdgeCache<I extends WritableComparable, E extends Writable> - extends SendCache<I, Edge<I, E>, ByteArrayVertexIdEdges<I, E>> { + extends SendVertexIdDataCache<I, Edge<I, E>, ByteArrayVertexIdEdges<I, E>> { /** * Constructor * http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java index 8df0dda..24848db 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java @@ -46,7 +46,7 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE; * @param <M> Message data */ public class SendMessageCache<I extends WritableComparable, M extends Writable> - extends SendCache<I, M, ByteArrayVertexIdMessages<I, M>> { + extends SendVertexIdDataCache<I, M, ByteArrayVertexIdMessages<I, M>> { /** Class logger */ private static final Logger LOG = Logger.getLogger(SendMessageCache.class); @@ -80,7 +80,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable> @Override public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() { return new ByteArrayVertexIdMessages<I, M>( - getConf().getOutgoingMessageValueFactory()); + getConf().getOutgoingMessageValueFactory()); } /** @@ -92,8 +92,8 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable> * @param message Message to send to remote worker * @return Size of messages for the worker. */ - private int addMessage(WorkerInfo workerInfo, - int partitionId, I destVertexId, M message) { + public int addMessage(WorkerInfo workerInfo, + int partitionId, I destVertexId, M message) { return addData(workerInfo, partitionId, destVertexId, message); } http://git-wip-us.apache.org/repos/asf/giraph/blob/e26b51e0/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java new file mode 100644 index 0000000..2623812 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.ByteArrayVertexIdData; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.WritableComparable; + +import javax.annotation.concurrent.NotThreadSafe; + +/** + * An abstract structure for caching data indexed by vertex id, + * to be sent to workers in bulk. Not thread-safe. + * + * @param <I> Vertex id + * @param <T> Data + * @param <B> Specialization of {@link ByteArrayVertexIdData} for T + */ +@NotThreadSafe +@SuppressWarnings("unchecked") +public abstract class SendVertexIdDataCache<I extends WritableComparable, T, + B extends ByteArrayVertexIdData<I, T>> extends SendDataCache<I, B> { + /** + * Constructor. + * + * @param conf Giraph configuration + * @param serviceWorker Service worker + * @param maxRequestSize Maximum request size (in bytes) + * @param additionalRequestSize Additional request size (expressed as a + * ratio of the average request size) + */ + public SendVertexIdDataCache(ImmutableClassesGiraphConfiguration conf, + CentralizedServiceWorker<?, ?, ?> serviceWorker, + int maxRequestSize, + float additionalRequestSize) { + super(conf, serviceWorker, maxRequestSize, additionalRequestSize); + } + + /** + * Create a new {@link ByteArrayVertexIdData} specialized for the use case. + * + * @return A new instance of {@link ByteArrayVertexIdData} + */ + public abstract B createByteArrayVertexIdData(); + + /** + * Add data to the cache. + * + * @param workerInfo the remote worker destination + * @param partitionId the remote Partition this message belongs to + * @param destVertexId vertex id that is ultimate destination + * @param data Data to send to remote worker + * @return Size of messages for the worker. + */ + public int addData(WorkerInfo workerInfo, + int partitionId, I destVertexId, T data) { + // Get the data collection + B partitionData = getData(partitionId); + int originalSize = 0; + if (partitionData == null) { + partitionData = createByteArrayVertexIdData(); + partitionData.setConf(getConf()); + partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId())); + setData(partitionId, partitionData); + } else { + originalSize = partitionData.getSize(); + } + partitionData.add(destVertexId, data); + + // Update the size of cached, outgoing data per worker + return incrDataSize(workerInfo.getTaskId(), + partitionData.getSize() - originalSize); + } +}
