Updated Branches: refs/heads/trunk f34e9b7c8 -> 01b353334
GIRAPH-493: Remove EdgeWithSource (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/01b35333 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/01b35333 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/01b35333 Branch: refs/heads/trunk Commit: 01b353334417fe9d511c239d013dedd822c8235a Parents: f34e9b7 Author: Nitay Joffe <[email protected]> Authored: Wed Jan 30 00:46:01 2013 -0500 Committer: Nitay Joffe <[email protected]> Committed: Tue Feb 5 17:20:26 2013 -0500 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/graph/EdgeWithSource.java | 65 --------------- .../main/java/org/apache/giraph/io/EdgeReader.java | 16 +++- .../io/formats/PseudoRandomEdgeInputFormat.java | 20 +++-- .../giraph/io/formats/TextEdgeInputFormat.java | 49 +++++++++--- .../giraph/worker/EdgeInputSplitsCallable.java | 17 ++-- .../io/hcatalog/HCatalogEdgeInputFormat.java | 28 ++++-- 7 files changed, 92 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index b18c417..cc0ca34 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-493: Remove EdgeWithSource (nitay) + GIRAPH-429: Number of input split threads set to 1 less than necessary (majakabiljo) GIRAPH-498: We should check input splits status from zookeeeper once per worker, http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java deleted file mode 100644 index 84232c9..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * A pair of source vertex id and Edge object (that is, - * all the information about an edge). - * - * @param <I> Vertex id - * @param <E> Edge data - */ -public class EdgeWithSource<I extends WritableComparable, E extends Writable> { - /** Source vertex id. */ - private final I sourceVertexId; - /** Edge. */ - private final Edge<I, E> edge; - - /** - * Constructor. - * - * @param sourceVertexId Source vertex id - * @param edge Edge - */ - public EdgeWithSource(I sourceVertexId, Edge<I, E> edge) { - this.sourceVertexId = sourceVertexId; - this.edge = edge; - } - - /** - * Get the source vertex id. - * - * @return Source vertex id. - */ - public I getSourceVertexId() { - return sourceVertexId; - } - - /** - * Get the edge object. - * - * @return The edge. - */ - public Edge<I, E> getEdge() { - return edge; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java index 9642fab..ed6fad1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java @@ -18,7 +18,7 @@ package org.apache.giraph.io; -import org.apache.giraph.graph.EdgeWithSource; +import org.apache.giraph.graph.Edge; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -48,6 +48,7 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> { IOException, InterruptedException; /** + * Read the next edge. * * @return false iff there are no more edges * @throws IOException @@ -56,6 +57,16 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> { boolean nextEdge() throws IOException, InterruptedException; /** + * Get the current edge source id. + * + * @return Current edge source id which has been read. + * nextEdge() should be called first. + * @throws IOException + * @throws InterruptedException + */ + I getCurrentSourceId() throws IOException, InterruptedException; + + /** * Get the current edge. * * @return the current edge which has been read. @@ -63,8 +74,7 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> { * @throws IOException * @throws InterruptedException */ - EdgeWithSource<I, E> getCurrentEdge() throws IOException, - InterruptedException; + Edge<I, E> getCurrentEdge() throws IOException, InterruptedException; /** * Close this {@link EdgeReader} to future operations. http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java index 9196f18..116f45e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java @@ -18,13 +18,12 @@ package org.apache.giraph.io.formats; -import com.google.common.collect.Sets; import org.apache.giraph.bsp.BspInputSplit; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.DefaultEdge; +import org.apache.giraph.graph.Edge; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; -import org.apache.giraph.graph.EdgeWithSource; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; @@ -32,6 +31,8 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; +import com.google.common.collect.Sets; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -149,8 +150,8 @@ public class PseudoRandomEdgeInputFormat } @Override - public EdgeWithSource<LongWritable, DoubleWritable> getCurrentEdge() - throws IOException, InterruptedException { + public LongWritable getCurrentSourceId() throws IOException, + InterruptedException { if (currentVertexEdgesRead == edgesPerVertex) { ++verticesRead; currentVertexId = new LongWritable(-1); @@ -165,7 +166,12 @@ public class PseudoRandomEdgeInputFormat random.setSeed(currentVertexId.get()); currentVertexDestVertices.clear(); } + return currentVertexId; + } + @Override + public Edge<LongWritable, DoubleWritable> getCurrentEdge() + throws IOException, InterruptedException { LongWritable destVertexId; do { destVertexId = @@ -178,11 +184,9 @@ public class PseudoRandomEdgeInputFormat LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " + "" + destVertexId + ")"); } - return new EdgeWithSource<LongWritable, DoubleWritable>( - currentVertexId, - new DefaultEdge<LongWritable, DoubleWritable>( + return new DefaultEdge<LongWritable, DoubleWritable>( destVertexId, - new DoubleWritable(random.nextDouble()))); + new DoubleWritable(random.nextDouble())); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java index 45ea61a..a8ebfda 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java @@ -20,9 +20,9 @@ package org.apache.giraph.io.formats; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.DefaultEdge; +import org.apache.giraph.graph.Edge; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; -import org.apache.giraph.graph.EdgeWithSource; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -152,14 +152,19 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable, */ protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader { @Override - public final EdgeWithSource<I, E> getCurrentEdge() throws IOException, + public final I getCurrentSourceId() throws IOException, + InterruptedException { + Text line = getRecordReader().getCurrentValue(); + return getSourceVertexId(line); + } + + @Override + public final Edge<I, E> getCurrentEdge() throws IOException, InterruptedException { Text line = getRecordReader().getCurrentValue(); - I sourceVertexId = getSourceVertexId(line); I targetVertexId = getTargetVertexId(line); E edgeValue = getValue(line); - return new EdgeWithSource<I, E>(sourceVertexId, - new DefaultEdge<I, E>(targetVertexId, edgeValue)); + return new DefaultEdge<I, E>(targetVertexId, edgeValue); } @Override @@ -214,20 +219,42 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable, */ protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends TextEdgeReader { + /** Generic type holding processed line */ + private T processedLine; + @Override - public final EdgeWithSource<I, E> getCurrentEdge() throws IOException, + public I getCurrentSourceId() throws IOException, InterruptedException { + T processed = processCurrentLine(); + return getSourceVertexId(processed); + } + + @Override + public final Edge<I, E> getCurrentEdge() throws IOException, InterruptedException { - Text line = getRecordReader().getCurrentValue(); - T processed = preprocessLine(line); - I sourceVertexId = getSourceVertexId(processed); + T processed = processCurrentLine(); I targetVertexId = getTargetVertexId(processed); E edgeValue = getValue(processed); - return new EdgeWithSource<I, E>(sourceVertexId, - new DefaultEdge<I, E>(targetVertexId, edgeValue)); + return new DefaultEdge<I, E>(targetVertexId, edgeValue); + } + + /** + * Process the current line to the user's type. + * + * @return T processed line + * @throws IOException on I/O error + * @throws InterruptedException on interruption + */ + private T processCurrentLine() throws IOException, InterruptedException { + if (processedLine == null) { + Text line = getRecordReader().getCurrentValue(); + processedLine = preprocessLine(line); + } + return processedLine; } @Override public final boolean nextEdge() throws IOException, InterruptedException { + processedLine = null; return getRecordReader().nextKeyValue(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 80c341c..3e2dc66 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -19,11 +19,11 @@ package org.apache.giraph.worker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.io.EdgeInputFormat; -import org.apache.giraph.io.EdgeReader; -import org.apache.giraph.graph.EdgeWithSource; +import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeReader; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphMetricsRegistry; import org.apache.giraph.utils.LoggerUtils; @@ -114,25 +114,26 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, edgeReader.initialize(inputSplit, context); long inputSplitEdgesLoaded = 0; while (edgeReader.nextEdge()) { - EdgeWithSource<I, E> readerEdge = edgeReader.getCurrentEdge(); - if (readerEdge.getSourceVertexId() == null) { + I sourceId = edgeReader.getCurrentSourceId(); + Edge<I, E> readerEdge = edgeReader.getCurrentEdge(); + if (sourceId == null) { throw new IllegalArgumentException( "readInputSplit: Edge reader returned an edge " + "without a source vertex id! - " + readerEdge); } - if (readerEdge.getEdge().getTargetVertexId() == null) { + if (readerEdge.getTargetVertexId() == null) { throw new IllegalArgumentException( "readInputSplit: Edge reader returned an edge " + "without a target vertex id! - " + readerEdge); } - if (readerEdge.getEdge().getValue() == null) { + if (readerEdge.getValue() == null) { throw new IllegalArgumentException( "readInputSplit: Edge reader returned an edge " + "without a value! - " + readerEdge); } graphState.getWorkerClientRequestProcessor().addEdgeRequest( - readerEdge.getSourceVertexId(), readerEdge.getEdge()); + sourceId, readerEdge); context.progress(); // do this before potential data transfer ++inputSplitEdgesLoaded; http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java index c92cc34..fe0ddd5 100644 --- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java +++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java @@ -19,8 +19,8 @@ package org.apache.giraph.io.hcatalog; import org.apache.giraph.graph.DefaultEdge; +import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeNoValue; -import org.apache.giraph.graph.EdgeWithSource; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; import org.apache.hadoop.io.NullWritable; @@ -193,13 +193,17 @@ public abstract class HCatalogEdgeInputFormat< protected abstract E getEdgeValue(HCatRecord record); @Override - public EdgeWithSource<I, E> getCurrentEdge() throws IOException, + public I getCurrentSourceId() throws IOException, InterruptedException { + HCatRecord record = getRecordReader().getCurrentValue(); + return getSourceVertexId(record); + } + + @Override + public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException { HCatRecord record = getRecordReader().getCurrentValue(); - return new EdgeWithSource<I, E>( - getSourceVertexId(record), - new DefaultEdge<I, E>(getTargetVertexId(record), - getEdgeValue(record))); + return new DefaultEdge<I, E>(getTargetVertexId(record), + getEdgeValue(record)); } } @@ -236,12 +240,16 @@ public abstract class HCatalogEdgeInputFormat< protected abstract I getTargetVertexId(HCatRecord record); @Override - public EdgeWithSource<I, NullWritable> getCurrentEdge() throws IOException, + public I getCurrentSourceId() throws IOException, InterruptedException { + HCatRecord record = getRecordReader().getCurrentValue(); + return getSourceVertexId(record); + } + + @Override + public Edge<I, NullWritable> getCurrentEdge() throws IOException, InterruptedException { HCatRecord record = getRecordReader().getCurrentValue(); - return new EdgeWithSource<I, NullWritable>( - getSourceVertexId(record), - new EdgeNoValue<I>(getTargetVertexId(record))); + return new EdgeNoValue<I>(getTargetVertexId(record)); } } }
