Updated Branches: refs/heads/trunk 8db290147 -> d6200bdd8
GIRAPH-702: Fix multithreaded output (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d6200bdd Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d6200bdd Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d6200bdd Branch: refs/heads/trunk Commit: d6200bdd8327c94a1b171123ae14157bafacbd6f Parents: 8db2901 Author: Maja Kabiljo <[email protected]> Authored: Fri Jun 28 10:40:16 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Jun 28 10:40:16 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../apache/giraph/worker/BspServiceWorker.java | 22 ++++++++++++++--- .../giraph/hive/output/HiveOutputTest.java | 26 +++++++++++++++++++- 3 files changed, 46 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/d6200bdd/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 69d2bee..60cedbc 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-702: Fix multithreaded output (majakabiljo) + GIRAPH-676: A short tutorial on getting started with Giraph (boshmaf via claudio) GIRAPH-698: Expose Computation to a user (aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/d6200bdd/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 342e2b2..89b6f9e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -83,6 +83,7 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import net.iharder.Base64; @@ -95,10 +96,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -920,13 +924,20 @@ public class BspServiceWorker<I extends WritableComparable, return; } + final int numPartitions = getPartitionStore().getNumPartitions(); int numThreads = Math.min(getConfiguration().getNumOutputThreads(), - getPartitionStore().getNumPartitions()); + numPartitions); LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "saveVertices: Starting to save " + numLocalVertices + " vertices " + "using " + numThreads + " threads"); final VertexOutputFormat<I, V, E> vertexOutputFormat = getConfiguration().createWrappedVertexOutputFormat(); + + final Queue<Integer> partitionIdQueue = + (numPartitions == 0) ? new LinkedList<Integer>() : + new ArrayBlockingQueue<Integer>(numPartitions); + Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds()); + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { @Override public Callable<Void> newCallable(int callableId) { @@ -937,14 +948,19 @@ public class BspServiceWorker<I extends WritableComparable, vertexOutputFormat.createVertexWriter(getContext()); vertexWriter.setConf(getConfiguration()); vertexWriter.initialize(getContext()); - long verticesWritten = 0; long nextPrintVertices = 0; long nextPrintMsecs = System.currentTimeMillis() + 15000; int partitionIndex = 0; int numPartitions = getPartitionStore().getNumPartitions(); - for (Integer partitionId : getPartitionStore().getPartitionIds()) { + while (!partitionIdQueue.isEmpty()) { + Integer partitionId = partitionIdQueue.poll(); + if (partitionId == null) { + break; + } + Partition<I, V, E> partition = getPartitionStore().getPartition(partitionId); + long verticesWritten = 0; for (Vertex<I, V, E> vertex : partition) { vertexWriter.writeVertex(vertex); ++verticesWritten; http://git-wip-us.apache.org/repos/asf/giraph/blob/d6200bdd/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java index 1f92176..4d4d976 100644 --- a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java @@ -18,6 +18,7 @@ package org.apache.giraph.hive.output; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.edge.ByteArrayEdges; import org.apache.giraph.hive.GiraphHiveTestBase; import org.apache.giraph.hive.Helpers; @@ -43,6 +44,8 @@ import java.util.Map; import static org.junit.Assert.assertEquals; +import junit.framework.Assert; + public class HiveOutputTest extends GiraphHiveTestBase { private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive"); @@ -88,6 +91,25 @@ public class HiveOutputTest extends GiraphHiveTestBase { verifyRecords(inputDesc); } + @Test + public void testHiveMultithreadedOutput() throws Exception + { + String tableName = "test1"; + hiveServer.createTable("CREATE TABLE " + tableName + + " (i1 BIGINT, i2 BIGINT) "); + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setVertexOutputFormatThreadSafe(true); + conf.setNumOutputThreads(2); + GiraphConstants.USER_PARTITION_COUNT.set(conf, 4); + runJob(tableName, conf); + + HiveInputDescription inputDesc = new HiveInputDescription(); + inputDesc.getTableDesc().setTableName(tableName); + + verifyRecords(inputDesc); + } + private void runJob(String tableName, GiraphConfiguration conf) throws Exception { String[] edges = new String[] { "1 2", @@ -116,7 +138,9 @@ public class HiveOutputTest extends GiraphHiveTestBase { // Records are in an unknown sort order so we grab their values here for (HiveReadableRecord record : records) { - data.put(record.getLong(0), record.getLong(1)); + if (data.put(record.getLong(0), record.getLong(1)) != null) { + Assert.fail("Id " + record.getLong(0) + " appears twice in the output"); + } } assertEquals(3, data.size());
