Updated Branches: refs/heads/trunk af1c39b43 -> 62c12fa0b
GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/62c12fa0 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/62c12fa0 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/62c12fa0 Branch: refs/heads/trunk Commit: 62c12fa0b899ee76962d6e3fa05298e2a23d5e68 Parents: af1c39b Author: Claudio Martella <[email protected]> Authored: Tue Feb 5 13:02:27 2013 +0100 Committer: Claudio Martella <[email protected]> Committed: Tue Feb 5 13:02:27 2013 +0100 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/bsp/CentralizedServiceWorker.java | 19 - .../giraph/comm/netty/NettyWorkerServer.java | 2 + .../org/apache/giraph/graph/ComputeCallable.java | 2 + .../giraph/partition/DiskBackedPartitionStore.java | 872 +++++++++++---- .../apache/giraph/partition/PartitionStore.java | 28 +- .../giraph/partition/SimplePartitionStore.java | 3 +- .../org/apache/giraph/worker/BspServiceWorker.java | 35 +- .../java/org/apache/giraph/comm/RequestTest.java | 7 +- .../giraph/partition/TestPartitionStores.java | 17 +- 10 files changed, 708 insertions(+), 279 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index aec8fdf..2d25746 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache (claudio) + GIRAPH-494: Make Edge an interface (nitay) GIRAPH-492: Saving vertices has no status report, making it hard to http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java index 30d4462..71f8f72 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -34,8 +34,6 @@ import org.apache.giraph.partition.PartitionStore; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.giraph.vertex.Vertex; -import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.worker.WorkerInfo; @@ -135,15 +133,6 @@ public interface CentralizedServiceWorker<I extends WritableComparable, List<PartitionStats> partitionStatsList); /** - * Get the partition that a vertex id would belong to. - * - * @param vertexId Id of the vertex that is used to find the correct - * partition. - * @return Correct partition if exists on this worker, null otherwise. - */ - Partition<I, V, E, M> getPartition(I vertexId); - - /** * Get the partition id that a vertex id would belong to. * * @param vertexId Vertex id @@ -176,14 +165,6 @@ public interface CentralizedServiceWorker<I extends WritableComparable, Iterable<? extends PartitionOwner> getPartitionOwners(); /** - * Look up a vertex on a worker given its vertex index. - * - * @param vertexId Vertex index to look for - * @return Vertex if it exists on this worker. - */ - Vertex<I, V, E, M> getVertex(I vertexId); - - /** * If desired by the user, vertex partitions are redistributed among * workers according to the chosen WorkerGraphPartitioner. * http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java index e2866fd..1b7cc54 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java @@ -184,6 +184,7 @@ public class NettyWorkerServer<I extends WritableComparable, } } } + service.getPartitionStore().putPartition(partition); } } // Resolve all graph mutations @@ -226,6 +227,7 @@ public class NettyWorkerServer<I extends WritableComparable, partition.removeVertex(originalVertex.getId()); } } + service.getPartitionStore().putPartition(partition); } if (!serverData.getVertexMutations().isEmpty()) { throw new IllegalStateException("resolveMutations: Illegally " + http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index 042fd47..a87561d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -157,6 +157,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, } catch (IOException e) { throw new IllegalStateException("call: Caught unexpected IOException," + " failing.", e); + } finally { + serviceWorker.getPartitionStore().putPartition(partition); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java index 09e5d75..844a229 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java @@ -18,69 +18,106 @@ package org.apache.giraph.partition; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.vertex.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.log4j.Logger; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import com.google.common.collect.Sets; /** - * A partition store that can possibly spill to disk. + * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis. + * Thread-safe, but expects the caller to synchronized between deletes, adds, + * puts and gets. * * @param <I> Vertex id * @param <V> Vertex data * @param <E> Edge data * @param <M> Message data */ +@SuppressWarnings("rawtypes") public class DiskBackedPartitionStore<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends PartitionStore<I, V, E, M> { /** Class logger. */ private static final Logger LOG = Logger.getLogger(DiskBackedPartitionStore.class); - /** Map of partitions kept in memory. */ - private final ConcurrentMap<Integer, Partition<I, V, E, M>> - inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>(); - /** Maximum number of partitions to keep in memory. */ - private int maxInMemoryPartitions; - /** Map of partitions kept out-of-core. The values are partition sizes. */ - private final ConcurrentMap<Integer, Integer> onDiskPartitions = - Maps.newConcurrentMap(); - /** Directory on the local file system for storing out-of-core partitions. */ - private final String basePath; - /** Configuration. */ + /** States the partition can be found in */ + private enum State { ACTIVE, INACTIVE, LOADING, OFFLOADING, ONDISK }; + /** Global lock to the whole partition */ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + /** + * Global write lock. Must be hold to modify class state for read and write. + * Conditions are bond to this lock. + */ + private final Lock wLock = lock.writeLock(); + /** The ids of the partitions contained in the store */ + private final Set<Integer> partitionIds = Sets.newHashSet(); + /** Partitions' states store */ + private final Map<Integer, State> states = Maps.newHashMap(); + /** Current active partitions, which have not been put back yet */ + private final Map<Integer, Partition<I, V, E, M>> active = Maps.newHashMap(); + /** Inactive partitions to re-activate or spill to disk to make space */ + private final Map<Integer, Partition<I, V, E, M>> inactive = + Maps.newLinkedHashMap(); + /** Ids of partitions stored on disk and number of vertices contained */ + private final Map<Integer, Integer> onDisk = Maps.newHashMap(); + /** Per-partition users counters (clearly only for active partitions) */ + private final Map<Integer, Integer> counters = Maps.newHashMap(); + /** These Conditions are used to partitions' change of state */ + private final Map<Integer, Condition> pending = Maps.newHashMap(); + /** + * Used to signal threads waiting to load partitions. Can be used when new + * inactive partitions are avaiable, or when free slots are available. + */ + private final Condition notEmpty = wLock.newCondition(); + /** Executors for users requests. Uses caller threads */ + private final ExecutorService pool = new DirectExecutorService(); + /** Giraph configuration */ private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf; - /** Slot for loading out-of-core partitions. */ - private Partition<I, V, E, M> loadedPartition; - /** Locks for accessing and modifying partitions. */ - private final ConcurrentMap<Integer, Lock> partitionLocks = - Maps.newConcurrentMap(); - /** Context used to report progress */ - private final Mapper<?, ?, ?, ?>.Context context; + /** Mapper context */ + private final Context context; + /** Base path where the partition files are written to */ + private final String basePath; + /** Maximum number of slots */ + private final int maxInMemoryPartitions; + /** Number of slots used */ + private int inMemoryPartitions; /** - * Constructor. + * Constructor * * @param conf Configuration - * @param context Mapper context + * @param context Context */ public DiskBackedPartitionStore( ImmutableClassesGiraphConfiguration<I, V, E, M> conf, @@ -96,76 +133,233 @@ public class DiskBackedPartitionStore<I extends WritableComparable, GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT); } - /** - * Get the path to the file where a partition is stored. - * - * @param partitionId The partition - * @return The path to the given partition - */ - private String getPartitionPath(Integer partitionId) { - return basePath + "/partition-" + partitionId; + @Override + public Iterable<Integer> getPartitionIds() { + try { + return pool.submit(new Callable<Iterable<Integer>>() { + + @Override + public Iterable<Integer> call() throws Exception { + wLock.lock(); + try { + return Iterables.unmodifiableIterable(partitionIds); + } finally { + wLock.unlock(); + } + } + }).get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getPartitionIds: cannot retrieve partition ids", e); + } catch (ExecutionException e) { + throw new IllegalStateException( + "getPartitionIds: cannot retrieve partition ids", e); + } } - /** - * Create a new lock for a partition, lock it, and return it. If already - * existing, return null. - * - * @param partitionId Partition id - * @return A newly created lock, or null if already present - */ - private Lock createLock(Integer partitionId) { - Lock lock = new ReentrantLock(true); - lock.lock(); - if (partitionLocks.putIfAbsent(partitionId, lock) != null) { - return null; + @Override + public boolean hasPartition(final Integer id) { + try { + return pool.submit(new Callable<Boolean>() { + + @Override + public Boolean call() throws Exception { + wLock.lock(); + try { + return partitionIds.contains(id); + } finally { + wLock.unlock(); + } + } + }).get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "hasPartition: cannot check partition", e); + } catch (ExecutionException e) { + throw new IllegalStateException( + "hasPartition: cannot check partition", e); + } + } + + @Override + public int getNumPartitions() { + try { + return pool.submit(new Callable<Integer>() { + + @Override + public Integer call() throws Exception { + wLock.lock(); + try { + return partitionIds.size(); + } finally { + wLock.unlock(); + } + } + }).get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getNumPartitions: cannot retrieve partition ids", e); + } catch (ExecutionException e) { + throw new IllegalStateException( + "getNumPartitions: cannot retrieve partition ids", e); + } + } + + @Override + public Partition<I, V, E, M> getPartition(Integer id) { + try { + return pool.submit(new GetPartition(id)).get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getPartition: cannot retrieve partition " + id, e); + } catch (ExecutionException e) { + throw new IllegalStateException( + "getPartition: cannot retrieve partition " + id, e); + } + } + + @Override + public void putPartition(Partition<I, V, E, M> partition) { + Integer id = partition.getId(); + try { + pool.submit(new PutPartition(id, partition)).get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "putPartition: cannot put back partition " + id, e); + } catch (ExecutionException e) { + throw new IllegalStateException( + "putPartition: cannot put back partition " + id, e); } - return lock; + } + + @Override + public void deletePartition(Integer id) { + try { + pool.submit(new DeletePartition(id)).get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "deletePartition: cannot delete partition " + id, e); + } catch (ExecutionException e) { + throw new IllegalStateException( + "deletePartition: cannot delete partition " + id, e); + } + } + + @Override + public Partition<I, V, E, M> removePartition(Integer id) { + Partition<I, V, E, M> partition = getPartition(id); + // we put it back, so the partition can turn INACTIVE and be deleted. + putPartition(partition); + deletePartition(id); + return partition; + } + + @Override + public void addPartition(Partition<I, V, E, M> partition) { + Integer id = partition.getId(); + try { + pool.submit(new AddPartition(partition.getId(), partition)).get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "addPartition: cannot add partition " + id, e); + } catch (ExecutionException e) { + throw new IllegalStateException( + "addPartition: cannot add partition " + id, e); + } + } + + @Override + public void shutdown() { + try { + pool.shutdown(); + try { + if (!pool.awaitTermination(120, TimeUnit.SECONDS)) { + pool.shutdownNow(); + } + } catch (InterruptedException e) { + pool.shutdownNow(); + } + } finally { + for (Integer id : onDisk.values()) { + deletePartitionFile(id); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(partitionIds.toString()); + sb.append("\nActive\n"); + for (Entry<Integer, Partition<I, V, E, M>> e : active.entrySet()) { + sb.append(e.getKey() + ":" + e.getValue() + "\n"); + } + sb.append("Inactive\n"); + for (Entry<Integer, Partition<I, V, E, M>> e : inactive.entrySet()) { + sb.append(e.getKey() + ":" + e.getValue() + "\n"); + } + sb.append("OnDisk\n"); + for (Entry<Integer, Integer> e : onDisk.entrySet()) { + sb.append(e.getKey() + ":" + e.getValue() + "\n"); + } + sb.append("Counters\n"); + for (Entry<Integer, Integer> e : counters.entrySet()) { + sb.append(e.getKey() + ":" + e.getValue() + "\n"); + } + sb.append("Pending\n"); + for (Entry<Integer, Condition> e : pending.entrySet()) { + sb.append(e.getKey() + "\n"); + } + return sb.toString(); } /** - * Get the lock for a partition id. + * Increment the number of active users for a partition. Caller should hold + * the global write lock. * - * @param partitionId Partition id - * @return The lock + * @param id The id of the counter to increment + * @return The new value */ - private Lock getLock(Integer partitionId) { - return partitionLocks.get(partitionId); + private Integer incrementCounter(Integer id) { + Integer count = counters.get(id); + if (count == null) { + count = 0; + } + counters.put(id, ++count); + return count; } /** - * Write a partition to disk. + * Decrement the number of active users for a partition. Caller should hold + * the global write lock. * - * @param partition The partition object to write - * @throws java.io.IOException + * @param id The id of the counter to decrement + * @return The new value */ - private void writePartition(Partition<I, V, E, M> partition) - throws IOException { - File file = new File(getPartitionPath(partition.getId())); - file.getParentFile().mkdirs(); - file.createNewFile(); - DataOutputStream outputStream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(file))); - for (Vertex<I, V, E, M> vertex : partition) { - vertex.write(outputStream); + private Integer decrementCounter(Integer id) { + Integer count = counters.get(id); + if (count == null) { + throw new IllegalStateException("no counter for partition " + id); } - outputStream.close(); + counters.put(id, --count); + return count; } /** - * Read a partition from disk. + * Load a partition from disk. It deletes the file after the load. * - * @param partitionId Id of the partition to read - * @return The partition object + * @param id The id of the partition to load + * @param numVertices The number of vertices contained on disk + * @return The partition * @throws IOException */ - private Partition<I, V, E, M> readPartition(Integer partitionId) + private Partition<I, V, E, M> loadPartition(Integer id, int numVertices) throws IOException { Partition<I, V, E, M> partition = - conf.createPartition(partitionId, context); - File file = new File(getPartitionPath(partitionId)); + conf.createPartition(id, context); + File file = new File(getPartitionPath(id)); DataInputStream inputStream = new DataInputStream( new BufferedInputStream(new FileInputStream(file))); - int numVertices = onDiskPartitions.get(partitionId); for (int i = 0; i < numVertices; ++i) { Vertex<I, V, E, M> vertex = conf.createVertex(); vertex.readFields(inputStream); @@ -177,16 +371,22 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** - * Append some vertices of another partition to an out-of-core partition. + * Write a partition to disk. * - * @param partition Partition to add + * @param partition The partition to offload * @throws IOException */ - private void appendPartitionOutOfCore(Partition<I, V, E, M> partition) + private void offloadPartition(Partition<I, V, E, M> partition) throws IOException { + if (LOG.isInfoEnabled()) { + LOG.info("offloadPartition: writing partition " + partition.getId() + + " to disk."); + } File file = new File(getPartitionPath(partition.getId())); + file.getParentFile().mkdirs(); + file.createNewFile(); DataOutputStream outputStream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(file, true))); + new BufferedOutputStream(new FileOutputStream(file))); for (Vertex<I, V, E, M> vertex : partition) { vertex.write(outputStream); } @@ -194,171 +394,407 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** - * Load an out-of-core partition in memory. + * Append a partition on disk at the end of the file. Expects the caller + * to hold the global lock. * - * @param partitionId Partition id + * @param partition The partition + * @throws IOException */ - private void loadPartition(Integer partitionId) { - if (loadedPartition != null) { - if (loadedPartition.getId() == partitionId) { - return; - } - if (LOG.isInfoEnabled()) { - LOG.info("loadPartition: moving partition " + loadedPartition.getId() + - " out of core with size " + loadedPartition.getVertexCount()); - } - try { - writePartition(loadedPartition); - onDiskPartitions.put(loadedPartition.getId(), - (int) loadedPartition.getVertexCount()); - loadedPartition = null; - } catch (IOException e) { - throw new IllegalStateException("loadPartition: failed writing " + - "partition " + loadedPartition.getId() + " to disk", e); - } - } - if (LOG.isInfoEnabled()) { - LOG.info("loadPartition: loading partition " + partitionId + - " in memory"); - } - try { - loadedPartition = readPartition(partitionId); - } catch (IOException e) { - throw new IllegalStateException("loadPartition: failed reading " + - "partition " + partitionId + " from disk"); + private void addToOOCPartition(Partition<I, V, E, M> partition) + throws IOException { + Integer id = partition.getId(); + Integer count = onDisk.get(id); + onDisk.put(id, count + (int) partition.getVertexCount()); + File file = new File(getPartitionPath(id)); + DataOutputStream outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file, true))); + for (Vertex<I, V, E, M> vertex : partition) { + vertex.write(outputStream); } + outputStream.close(); } /** - * Add a new partition without requiring a lock. + * Delete a partition file * - * @param partition Partition to be added + * @param id The id of the partition owning the file. */ - private void addPartitionNoLock(Partition<I, V, E, M> partition) { - synchronized (inMemoryPartitions) { - if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) { - inMemoryPartitions.put(partition.getId(), partition); + public void deletePartitionFile(Integer id) { + File file = new File(getPartitionPath(id)); + file.delete(); + } - return; - } + /** + * Get the path to the file where a partition is stored. + * + * @param partitionId The partition + * @return The path to the given partition + */ + private String getPartitionPath(Integer partitionId) { + return basePath + "/partition-" + partitionId; + } + + /** + * Task that gets a partition from the store + */ + private class GetPartition implements Callable<Partition<I, V, E, M>> { + /** Partition id */ + private Integer id; + + /** + * Constructor + * + * @param id Partition id + */ + public GetPartition(Integer id) { + this.id = id; } - try { - writePartition(partition); - onDiskPartitions.put(partition.getId(), - (int) partition.getVertexCount()); - } catch (IOException e) { - throw new IllegalStateException("addPartition: failed writing " + - "partition " + partition.getId() + "to disk"); + + /** + * Removes and returns the last recently used entry. + * + * @return The last recently used entry. + */ + private Entry<Integer, Partition<I, V, E, M>> getLRUEntry() { + Iterator<Entry<Integer, Partition<I, V, E, M>>> i = + inactive.entrySet().iterator(); + Entry<Integer, Partition<I, V, E, M>> lruEntry = i.next(); + i.remove(); + return lruEntry; } - } - @Override - public void addPartition(Partition<I, V, E, M> partition) { - if (inMemoryPartitions.containsKey(partition.getId())) { - Partition<I, V, E, M> existingPartition = - inMemoryPartitions.get(partition.getId()); - existingPartition.addPartition(partition); - } else if (onDiskPartitions.containsKey(partition.getId())) { - Lock lock = getLock(partition.getId()); - lock.lock(); - if (loadedPartition != null && loadedPartition.getId() == - partition.getId()) { - loadedPartition.addPartition(partition); - } else { + @Override + public Partition<I, V, E, M> call() throws Exception { + Partition<I, V, E, M> partition = null; + + while (partition == null) { + wLock.lock(); try { - appendPartitionOutOfCore(partition); - onDiskPartitions.put(partition.getId(), - onDiskPartitions.get(partition.getId()) + - (int) partition.getVertexCount()); - } catch (IOException e) { - throw new IllegalStateException("addPartition: failed " + - "writing vertices to partition " + partition.getId() + " on disk", - e); + State pState = states.get(id); + switch (pState) { + case ONDISK: + Entry<Integer, Partition<I, V, E, M>> lru = null; + states.put(id, State.LOADING); + int numVertices = onDisk.remove(id); + /* + * Wait until we have space in memory or inactive data for a switch + */ + while (inMemoryPartitions >= maxInMemoryPartitions && + inactive.size() == 0) { + notEmpty.await(); + } + /* + * we have to make some space first + */ + if (inMemoryPartitions >= maxInMemoryPartitions) { + lru = getLRUEntry(); + states.put(lru.getKey(), State.OFFLOADING); + pending.get(lru.getKey()).signalAll(); + } else { // there is space, just add it to the in-memory partitions + inMemoryPartitions++; + } + /* + * do IO without contention, the threads interested to these + * partitions will subscribe to the relative Condition. + */ + wLock.unlock(); + if (lru != null) { + offloadPartition(lru.getValue()); + } + partition = loadPartition(id, numVertices); + wLock.lock(); + /* + * update state and signal the pending threads + */ + if (lru != null) { + states.put(lru.getKey(), State.ONDISK); + onDisk.put(lru.getKey(), (int) lru.getValue().getVertexCount()); + pending.get(lru.getKey()).signalAll(); + } + active.put(id, partition); + states.put(id, State.ACTIVE); + pending.get(id).signalAll(); + incrementCounter(id); + break; + case INACTIVE: + partition = inactive.remove(id); + active.put(id, partition); + states.put(id, State.ACTIVE); + incrementCounter(id); + break; + case ACTIVE: + partition = active.get(id); + incrementCounter(id); + break; + case LOADING: + pending.get(id).await(); + break; + case OFFLOADING: + pending.get(id).await(); + break; + default: + throw new IllegalStateException( + "illegal state " + pState + " for partition " + id); + } + } finally { + wLock.unlock(); } } - lock.unlock(); - } else { - Lock lock = createLock(partition.getId()); - if (lock != null) { - addPartitionNoLock(partition); - lock.unlock(); - } else { - // Another thread is already creating the partition, - // so we make sure it's done before repeating the call. - lock = getLock(partition.getId()); - lock.lock(); - lock.unlock(); - addPartition(partition); - } + return partition; } } - @Override - public Partition<I, V, E, M> getPartition(Integer partitionId) { - if (inMemoryPartitions.containsKey(partitionId)) { - return inMemoryPartitions.get(partitionId); - } else if (onDiskPartitions.containsKey(partitionId)) { - loadPartition(partitionId); - return loadedPartition; - } else { - throw new IllegalStateException("getPartition: partition " + - partitionId + " does not exist"); + /** + * Task that puts a partition back to the store + */ + private class PutPartition implements Callable<Void> { + /** Partition id */ + private Integer id; + + /** + * Constructor + * + * @param id The partition id + * @param partition The partition + */ + public PutPartition(Integer id, Partition<I, V, E, M> partition) { + this.id = id; } - } - @Override - public Partition<I, V, E, M> removePartition(Integer partitionId) { - partitionLocks.remove(partitionId); - if (onDiskPartitions.containsKey(partitionId)) { - Partition<I, V, E, M> partition; - if (loadedPartition != null && loadedPartition.getId() == partitionId) { - partition = loadedPartition; - loadedPartition = null; - } else { - try { - partition = readPartition(partitionId); - } catch (IOException e) { - throw new IllegalStateException("removePartition: failed reading " + - "partition " + partitionId + " from disk", e); + @Override + public Void call() throws Exception { + wLock.lock(); + try { + if (decrementCounter(id) == 0) { + inactive.put(id, active.remove(id)); + states.put(id, State.INACTIVE); + pending.get(id).signalAll(); + notEmpty.signal(); } + return null; + } finally { + wLock.unlock(); } - onDiskPartitions.remove(partitionId); - return partition; - } else { - return inMemoryPartitions.remove(partitionId); } } - @Override - public void deletePartition(Integer partitionId) { - partitionLocks.remove(partitionId); - if (inMemoryPartitions.containsKey(partitionId)) { - inMemoryPartitions.remove(partitionId); - } else { - if (loadedPartition != null && loadedPartition.getId() == partitionId) { - loadedPartition = null; - } else { - File file = new File(getPartitionPath(partitionId)); - file.delete(); + /** + * Task that adds a partition to the store + */ + private class AddPartition implements Callable<Void> { + /** Partition id */ + private Integer id; + /** Partition */ + private Partition<I, V, E, M> partition; + + /** + * Constructor + * + * @param id The partition id + * @param partition The partition + */ + public AddPartition(Integer id, Partition<I, V, E, M> partition) { + this.id = id; + this.partition = partition; + } + + @Override + public Void call() throws Exception { + + wLock.lock(); + try { + if (partitionIds.contains(id)) { + Partition<I, V, E, M> existing = null; + boolean isOOC = false; + boolean done = false; + while (!done) { + State pState = states.get(id); + switch (pState) { + case ONDISK: + isOOC = true; + done = true; + break; + /* + * just add data to the in-memory partitions, + * concurrency should be managed by the caller. + */ + case INACTIVE: + existing = inactive.get(id); + done = true; + break; + case ACTIVE: + existing = active.get(id); + done = true; + break; + case LOADING: + pending.get(id).await(); + break; + case OFFLOADING: + pending.get(id).await(); + break; + default: + throw new IllegalStateException( + "illegal state " + pState + " for partition " + id); + } + } + if (isOOC) { + addToOOCPartition(partition); + } else { + existing.addPartition(partition); + } + } else { + Condition newC = wLock.newCondition(); + pending.put(id, newC); + partitionIds.add(id); + if (inMemoryPartitions < maxInMemoryPartitions) { + inMemoryPartitions++; + states.put(id, State.INACTIVE); + inactive.put(id, partition); + notEmpty.signal(); + } else { + states.put(id, State.OFFLOADING); + onDisk.put(id, (int) partition.getVertexCount()); + wLock.unlock(); + offloadPartition(partition); + wLock.lock(); + states.put(id, State.ONDISK); + newC.signalAll(); + } + } + return null; + } finally { + wLock.unlock(); } - onDiskPartitions.remove(partitionId); } } - @Override - public boolean hasPartition(Integer partitionId) { - return partitionLocks.containsKey(partitionId); - } + /** + * Task that deletes a partition to the store + */ + private class DeletePartition implements Callable<Void> { + /** Partition id */ + private Integer id; - @Override - public Iterable<Integer> getPartitionIds() { - return Iterables.concat(inMemoryPartitions.keySet(), - onDiskPartitions.keySet()); - } + /** + * Constructor + * + * @param id The partition id + */ + public DeletePartition(Integer id) { + this.id = id; + } - @Override - public int getNumPartitions() { - return partitionLocks.size(); + @Override + public Void call() throws Exception { + boolean done = false; + + wLock.lock(); + try { + while (!done) { + State pState = states.get(id); + switch (pState) { + case ONDISK: + onDisk.remove(id); + deletePartitionFile(id); + done = true; + break; + case INACTIVE: + inactive.remove(id); + inMemoryPartitions--; + notEmpty.signal(); + done = true; + break; + case ACTIVE: + pending.get(id).await(); + break; + case LOADING: + pending.get(id).await(); + break; + case OFFLOADING: + pending.get(id).await(); + break; + default: + throw new IllegalStateException( + "illegal state " + pState + " for partition " + id); + } + } + partitionIds.remove(id); + states.remove(id); + counters.remove(id); + pending.remove(id).signalAll(); + return null; + } finally { + wLock.unlock(); + } + } } + /** + * Direct Executor that executes tasks within the calling threads. + */ + private class DirectExecutorService extends AbstractExecutorService { + /** Executor state */ + private volatile boolean shutdown = false; + + /** + * Constructor + */ + public DirectExecutorService() { } + + /** + * Execute the task in the calling thread. + * + * @param task Task to execute + */ + public void execute(Runnable task) { + task.run(); + } + + /** + * Shutdown the executor. + */ + public void shutdown() { + this.shutdown = true; + } + + /** + * Shutdown the executor and return the current queue (empty). + * + * @return The list of awaiting tasks + */ + public List<Runnable> shutdownNow() { + this.shutdown = true; + return Collections.emptyList(); + } + + /** + * Return current shutdown state. + * + * @return Shutdown state + */ + public boolean isShutdown() { + return shutdown; + } + + /** + * Return current termination state. + * + * @return Termination state + */ + public boolean isTerminated() { + return shutdown; + } + + /** + * Do nothing and return shutdown state. + * + * @param timeout Timeout + * @param unit Time unit + * @return Shutdown state + */ + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + return shutdown; + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java index 3e8dda9..4206ce3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java @@ -18,8 +18,6 @@ package org.apache.giraph.partition; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -43,7 +41,8 @@ public abstract class PartitionStore<I extends WritableComparable, public abstract void addPartition(Partition<I, V, E, M> partition); /** - * Get a partition. + * Get a partition. Note: user has to put back it to the store through + * {@link #putPartition(Integer, Partition)} after use. * * @param partitionId Partition id * @return The requested partition @@ -51,6 +50,14 @@ public abstract class PartitionStore<I extends WritableComparable, public abstract Partition<I, V, E, M> getPartition(Integer partitionId); /** + * Put a partition back to the store. Use this method to be put a partition + * back after it has been retrieved through {@link #getPartition(Integer)}. + * + * @param partition Partition + */ + public abstract void putPartition(Partition<I, V, E, M> partition); + + /** * Remove a partition and return it. * * @param partitionId Partition id @@ -99,18 +106,7 @@ public abstract class PartitionStore<I extends WritableComparable, } /** - * Return all the stored partitions as an Iterable. Note that this may force - * out-of-core partitions to be loaded into memory if using out-of-core. - * - * @return The partition objects + * Called at the end of the computation. */ - public Iterable<Partition<I, V, E, M>> getPartitions() { - return Iterables.transform(getPartitionIds(), - new Function<Integer, Partition<I, V, E, M>>() { - @Override - public Partition<I, V, E, M> apply(Integer partitionId) { - return getPartition(partitionId); - } - }); - } + public void shutdown() { } } http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java index 7bd0bb1..74cc3a7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java @@ -101,5 +101,6 @@ public class SimplePartitionStore<I extends WritableComparable, return partitions.size(); } - + @Override + public void putPartition(Partition<I, V, E, M> partition) { } } http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index f542344..a48c5ea 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -546,8 +546,9 @@ else[HADOOP_NON_SECURE]*/ // if necessary List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>(); - for (Partition<I, V, E, M> partition : - getPartitionStore().getPartitions()) { + for (Integer partitionId : getPartitionStore().getPartitionIds()) { + Partition<I, V, E, M> partition = + getPartitionStore().getPartition(partitionId); PartitionStats partitionStats = new PartitionStats(partition.getId(), partition.getVertexCount(), @@ -555,6 +556,7 @@ else[HADOOP_NON_SECURE]*/ partition.getEdgeCount(), 0); partitionStatsList.add(partitionStats); + getPartitionStore().putPartition(partition); } workerGraphPartitioner.finalizePartitionStats( partitionStatsList, getPartitionStore()); @@ -894,8 +896,9 @@ else[HADOOP_NON_SECURE]*/ long nextPrintMsecs = System.currentTimeMillis() + 15000; int partitionIndex = 0; int numPartitions = getPartitionStore().getNumPartitions(); - for (Partition<I, V, E, M> partition : - getPartitionStore().getPartitions()) { + for (Integer partitionId : getPartitionStore().getPartitionIds()) { + Partition<I, V, E, M> partition = + getPartitionStore().getPartition(partitionId); for (Vertex<I, V, E, M> vertex : partition) { getContext().progress(); vertexWriter.writeVertex(vertex); @@ -914,6 +917,7 @@ else[HADOOP_NON_SECURE]*/ nextPrintVertices = verticesWritten + 250000; } } + getPartitionStore().putPartition(partition); getContext().progress(); ++partitionIndex; } @@ -928,6 +932,7 @@ else[HADOOP_NON_SECURE]*/ workerClient.closeConnections(); setCachedSuperstep(getSuperstep() - 1); saveVertices(finishedSuperstepStats.getLocalVertexCount()); + getPartitionStore().shutdown(); // All worker processes should denote they are done by adding special // znode. Once the number of znodes equals the number of partitions // for workers and masters, the master will clean up the ZooKeeper @@ -1017,8 +1022,9 @@ else[HADOOP_NON_SECURE]*/ getFs().create(verticesFilePath); ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream(); DataOutput metadataOutput = new DataOutputStream(metadataByteStream); - for (Partition<I, V, E, M> partition : - getPartitionStore().getPartitions()) { + for (Integer partitionId : getPartitionStore().getPartitionIds()) { + Partition<I, V, E, M> partition = + getPartitionStore().getPartition(partitionId); long startPos = verticesOutputStream.getPos(); partition.write(verticesOutputStream); // write messages @@ -1037,6 +1043,7 @@ else[HADOOP_NON_SECURE]*/ (verticesOutputStream.getPos() - startPos) + ", partition = " + partition.toString()); } + getPartitionStore().putPartition(partition); getContext().progress(); } // Metadata is buffered and written at the end since it's small and @@ -1388,11 +1395,6 @@ else[HADOOP_NON_SECURE]*/ } @Override - public Partition<I, V, E, M> getPartition(I vertexId) { - return getPartitionStore().getPartition(getPartitionId(vertexId)); - } - - @Override public Integer getPartitionId(I vertexId) { PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId); return partitionOwner.getPartitionId(); @@ -1404,17 +1406,6 @@ else[HADOOP_NON_SECURE]*/ } @Override - public Vertex<I, V, E, M> getVertex(I vertexId) { - PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId); - if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) { - return getPartitionStore().getPartition( - partitionOwner.getPartitionId()).getVertex(vertexId); - } else { - return null; - } - } - - @Override public ServerData<I, V, E, M> getServerData() { return workerServer.getServerData(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index 7187928..d779fe4 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -137,12 +137,15 @@ public class RequestTest { serverData.getPartitionStore(); assertTrue(partitionStore.hasPartition(partitionId)); int total = 0; + Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition2 = + partitionStore.getPartition(partitionId); for (Vertex<IntWritable, IntWritable, - IntWritable, IntWritable> vertex : - partitionStore.getPartition(partitionId)) { + IntWritable, IntWritable> vertex : partition2) { total += vertex.getId().get(); } + partitionStore.putPartition(partition2); assertEquals(total, 45); + partitionStore.shutdown(); } @Test http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index b02ed3a..b4cddf6 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -34,6 +34,8 @@ import org.junit.Test; import com.google.common.collect.Iterables; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -80,6 +82,7 @@ public class TestPartitionStores { partitionStore = new SimplePartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>(conf, context); testReadWrite(partitionStore, conf); + partitionStore.shutdown(); } @Test @@ -137,11 +140,13 @@ public class TestPartitionStores { partitionStore = new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>(conf, context); testReadWrite(partitionStore, conf); + partitionStore.shutdown(); conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, 2); partitionStore = new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>(conf, context); testReadWrite(partitionStore, conf); + partitionStore.shutdown(); } /** @@ -185,16 +190,26 @@ public class TestPartitionStores { Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 = partitionStore.getPartition(1); + partitionStore.putPartition(partition1); Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition2 = partitionStore.getPartition(2); + partitionStore.putPartition(partition2); Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition3 = partitionStore.removePartition(3); Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition4 = partitionStore.getPartition(4); + partitionStore.putPartition(partition4); assertEquals(3, partitionStore.getNumPartitions()); assertEquals(3, Iterables.size(partitionStore.getPartitionIds())); - assertEquals(3, Iterables.size(partitionStore.getPartitions())); + int partitionsNumber = 0; + for (Integer partitionId : partitionStore.getPartitionIds()) { + Partition<IntWritable, IntWritable, NullWritable, IntWritable> p = + partitionStore.getPartition(partitionId); + partitionStore.putPartition(p); + partitionsNumber++; + } + assertEquals(3, partitionsNumber); assertTrue(partitionStore.hasPartition(1)); assertTrue(partitionStore.hasPartition(2)); assertFalse(partitionStore.hasPartition(3));
