Updated Branches: refs/heads/trunk 96fd05385 -> 67e0f11d2
GIRAPH-613 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6cf79dbe Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6cf79dbe Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6cf79dbe Branch: refs/heads/trunk Commit: 6cf79dbe2c4397732820c435df723fe50e9f3daf Parents: 96fd053 Author: Claudio Martella <[email protected]> Authored: Thu Apr 11 18:09:25 2013 +0200 Committer: Claudio Martella <[email protected]> Committed: Thu Apr 11 18:09:25 2013 +0200 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/conf/GiraphClasses.java | 2 +- .../main/java/org/apache/giraph/graph/Vertex.java | 25 +-- .../org/apache/giraph/graph/VertexMutations.java | 6 +- .../giraph/partition/ByteArrayPartition.java | 31 ++-- .../giraph/partition/DiskBackedPartitionStore.java | 9 +- .../apache/giraph/partition/SimplePartition.java | 9 +- .../org/apache/giraph/utils/WritableUtils.java | 182 +++++++++++++++ .../apache/giraph/graph/TestVertexAndEdges.java | 24 ++- 9 files changed, 231 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index babbb88..4a1e7ca 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-613: Remove Writable from the interfaces implemented by Vertex (claudio) + GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark (majakabiljo) GIRAPH-615: Add support for multithreaded output (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java index 64f8bb1..95499bd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java @@ -134,7 +134,7 @@ public class GiraphClasses<I extends WritableComparable, } /** - * Contructor that reads classes from a Configuration object. + * Constructor that reads classes from a Configuration object. * * @param conf Configuration object to read from. */ http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java index fda6023..a1b1a87 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java @@ -35,13 +35,13 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; /** * Basic abstract class for writing a BSP application for computation. + * Giraph will checkpoint Vertex value and edges, hence all user data should + * be stored as part of the vertex value. * * @param <I> Vertex id * @param <V> Vertex data @@ -51,7 +51,7 @@ import java.util.Iterator; public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M> - implements WorkerAggregatorUsage, Writable { + implements WorkerAggregatorUsage { /** Vertex id. */ private I id; /** Vertex value. */ @@ -507,25 +507,6 @@ public abstract class Vertex<I extends WritableComparable, } @Override - public void readFields(DataInput in) throws IOException { - id = getConf().createVertexId(); - id.readFields(in); - value = getConf().createVertexValue(); - value.readFields(in); - edges = getConf().createVertexEdges(); - edges.readFields(in); - halt = in.readBoolean(); - } - - @Override - public void write(DataOutput out) throws IOException { - id.write(out); - value.write(out); - edges.write(out); - out.writeBoolean(halt); - } - - @Override public String toString() { return "Vertex(id=" + getId() + ",value=" + getValue() + ",#edges=" + getNumEdges() + ")"; http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java index ea50f25..75c0aef 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java @@ -87,8 +87,8 @@ public class VertexMutations<I extends WritableComparable, int addedVertexListSize = input.readInt(); for (int i = 0; i < addedVertexListSize; ++i) { - Vertex<I, V, E, M> vertex = conf.createVertex(); - vertex.readFields(input); + Vertex<I, V, E, M> vertex = + WritableUtils.readVertexFromDataInput(input, getConf()); addedVertexList.add(vertex); } removedVertexCount = input.readInt(); @@ -110,7 +110,7 @@ public class VertexMutations<I extends WritableComparable, public void write(DataOutput output) throws IOException { output.writeInt(addedVertexList.size()); for (Vertex<I, V, E, M> vertex : addedVertexList) { - vertex.write(output); + WritableUtils.writeVertexToDataOutput(output, vertex, getConf()); } output.writeInt(removedVertexCount); output.writeInt(addedEdgeList.size()); http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java index dd8c974..d2e7599 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java @@ -79,21 +79,22 @@ public class ByteArrayPartition<I extends WritableComparable, if (vertexData == null) { return null; } - WritableUtils.readFieldsFromByteArrayWithSize( - vertexData, representativeVertex, useUnsafeSerialization); + WritableUtils.reinitializeVertexFromByteArray( + vertexData, representativeVertex, useUnsafeSerialization, getConf()); return representativeVertex; } @Override public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) { byte[] vertexData = - WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization); + WritableUtils.writeVertexToByteArray( + vertex, useUnsafeSerialization, getConf()); byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData); if (oldVertexBytes == null) { return null; } else { - WritableUtils.readFieldsFromByteArrayWithSize( - oldVertexBytes, representativeVertex, useUnsafeSerialization); + WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes, + representativeVertex, useUnsafeSerialization, getConf()); return representativeVertex; } } @@ -104,8 +105,8 @@ public class ByteArrayPartition<I extends WritableComparable, if (vertexBytes == null) { return null; } - WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes, - representativeVertex, useUnsafeSerialization); + WritableUtils.reinitializeVertexFromByteArray(vertexBytes, + representativeVertex, useUnsafeSerialization, getConf()); return representativeVertex; } @@ -134,8 +135,8 @@ public class ByteArrayPartition<I extends WritableComparable, public long getEdgeCount() { long edges = 0; for (byte[] vertexBytes : vertexMap.values()) { - WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes, - representativeVertex, useUnsafeSerialization); + WritableUtils.reinitializeVertexFromByteArray(vertexBytes, + representativeVertex, useUnsafeSerialization, getConf()); edges += representativeVertex.getNumEdges(); } return edges; @@ -147,12 +148,12 @@ public class ByteArrayPartition<I extends WritableComparable, byte[] oldVertexData = vertexMap.get(vertex.getId()); if (oldVertexData != null) { vertexMap.put(vertex.getId(), - WritableUtils.writeToByteArrayWithSize( - vertex, oldVertexData, useUnsafeSerialization)); + WritableUtils.writeVertexToByteArray( + vertex, oldVertexData, useUnsafeSerialization, getConf())); } else { vertexMap.put(vertex.getId(), - WritableUtils.writeToByteArrayWithSize( - vertex, useUnsafeSerialization)); + WritableUtils.writeVertexToByteArray( + vertex, useUnsafeSerialization, getConf())); } } @@ -223,9 +224,9 @@ public class ByteArrayPartition<I extends WritableComparable, @Override public Vertex<I, V, E, M> next() { - WritableUtils.readFieldsFromByteArrayWithSize( + WritableUtils.reinitializeVertexFromByteArray( vertexDataIterator.next(), representativeVertex, - useUnsafeSerialization); + useUnsafeSerialization, getConf()); return representativeVertex; } http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java index 3525302..11e0a90 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java @@ -20,6 +20,7 @@ package org.apache.giraph.partition; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -370,8 +371,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable, DataInputStream inputStream = new DataInputStream( new BufferedInputStream(new FileInputStream(file))); for (int i = 0; i < numVertices; ++i) { - Vertex<I, V, E, M> vertex = conf.createVertex(); - vertex.readFields(inputStream); + Vertex<I, V , E, M> vertex = + WritableUtils.readVertexFromDataInput(inputStream, conf); partition.putVertex(vertex); } inputStream.close(); @@ -397,7 +398,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, DataOutputStream outputStream = new DataOutputStream( new BufferedOutputStream(new FileOutputStream(file))); for (Vertex<I, V, E, M> vertex : partition) { - vertex.write(outputStream); + WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf); } outputStream.close(); } @@ -418,7 +419,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, DataOutputStream outputStream = new DataOutputStream( new BufferedOutputStream(new FileOutputStream(file, true))); for (Vertex<I, V, E, M> vertex : partition) { - vertex.write(outputStream); + WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf); } outputStream.close(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java index 23e0f05..d6a46bd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java @@ -19,6 +19,7 @@ package org.apache.giraph.partition; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.Progressable; @@ -121,9 +122,9 @@ public class SimplePartition<I extends WritableComparable, } int vertices = input.readInt(); for (int i = 0; i < vertices; ++i) { - Vertex<I, V, E, M> vertex = getConf().createVertex(); progress(); - vertex.readFields(input); + Vertex<I, V, E, M> vertex = + WritableUtils.readVertexFromDataInput(input, getConf()); if (vertexMap.put(vertex.getId(), vertex) != null) { throw new IllegalStateException( "readFields: " + this + @@ -136,9 +137,9 @@ public class SimplePartition<I extends WritableComparable, public void write(DataOutput output) throws IOException { super.write(output); output.writeInt(vertexMap.size()); - for (Vertex vertex : vertexMap.values()) { + for (Vertex<I, V, E, M> vertex : vertexMap.values()) { progress(); - vertex.write(output); + WritableUtils.writeVertexToDataOutput(output, vertex, getConf()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index 6e7b87a..e3d79f7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -18,7 +18,10 @@ package org.apache.giraph.utils; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.VertexEdges; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.giraph.zk.ZooKeeperExt.PathStat; import org.apache.hadoop.conf.Configuration; @@ -328,6 +331,103 @@ public class WritableUtils { } /** + * Write vertex data to byte array with the first 4 bytes as the size of the + * entire buffer (including the size). + * + * @param vertex Vertex to write from. + * @param buffer Use this buffer instead + * @param unsafe Use unsafe serialization? + * @param conf Configuration + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + * @return Byte array with serialized object. + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> byte[] writeVertexToByteArray( + Vertex<I, V, E, M> vertex, + byte[] buffer, + boolean unsafe, + ImmutableClassesGiraphConfiguration<I, V, E, M> conf) { + ExtendedDataOutput extendedDataOutput; + if (unsafe) { + extendedDataOutput = new UnsafeByteArrayOutputStream(buffer); + } else { + extendedDataOutput = new ExtendedByteArrayDataOutput(buffer); + } + try { + extendedDataOutput.writeInt(-1); + writeVertexToDataOutput(extendedDataOutput, vertex, conf); + extendedDataOutput.writeInt(0, extendedDataOutput.getPos()); + } catch (IOException e) { + throw new IllegalStateException("writeVertexToByteArray: " + + "IOException", e); + } + + return extendedDataOutput.getByteArray(); + } + + /** + * Write vertex data to byte array with the first 4 bytes as the size of the + * entire buffer (including the size). + * + * @param vertex Vertex to write from. + * @param unsafe Use unsafe serialization? + * @param conf Configuration + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + * @return Byte array with serialized object. + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> byte[] writeVertexToByteArray( + Vertex<I, V, E, M> vertex, + boolean unsafe, + ImmutableClassesGiraphConfiguration<I, V, E, M> conf) { + return writeVertexToByteArray(vertex, null, unsafe, conf); + } + + /** + * Read vertex data from byteArray to a Writeable object, skipping the size. + * Serialization method is choosable. Assumes the vertex has already been + * initialized and contains values for Id, value, and edges. + * + * @param byteArray Byte array to find the fields in. + * @param vertex Vertex to fill in the fields. + * @param unsafe Use unsafe deserialization + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + * @param conf Configuration + * @return The vertex + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> Vertex<I, V, E, M> + reinitializeVertexFromByteArray( + byte[] byteArray, + Vertex<I, V, E, M> vertex, + boolean unsafe, + ImmutableClassesGiraphConfiguration<I, V, E, M> conf) { + ExtendedDataInput extendedDataInput; + if (unsafe) { + extendedDataInput = new UnsafeByteArrayInputStream(byteArray); + } else { + extendedDataInput = new ExtendedByteArrayDataInput(byteArray); + } + try { + extendedDataInput.readInt(); + reinitializeVertexFromDataInput(extendedDataInput, vertex, conf); + } catch (IOException e) { + throw new IllegalStateException( + "readFieldsFromByteArrayWithSize: IOException", e); + } + return vertex; + } + + /** * Write an edge to an output stream. * * @param out Data output @@ -356,4 +456,86 @@ public class WritableUtils { edge.getTargetVertexId().readFields(in); edge.getValue().readFields(in); } + + /** + * Reads data from input stream to inizialize Vertex. Assumes the vertex has + * already been initialized and contains values for Id, value, and edges. + * + * @param input The input stream + * @param vertex The vertex to initialize + * @param conf Configuration + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> void reinitializeVertexFromDataInput( + DataInput input, + Vertex<I, V, E, M> vertex, + ImmutableClassesGiraphConfiguration<I, V, E, M> conf) + throws IOException { + vertex.getId().readFields(input); + vertex.getValue().readFields(input); + ((VertexEdges<I, E>) vertex.getEdges()).readFields(input); + if (input.readBoolean()) { + vertex.voteToHalt(); + } else { + vertex.wakeUp(); + } + } + + /** + * Reads data from input stream to inizialize Vertex. + * + * @param input The input stream + * @param conf Configuration + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + * @return The vertex + * @throws IOException + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> Vertex<I, V, E, M> + readVertexFromDataInput( + DataInput input, + ImmutableClassesGiraphConfiguration<I, V, E, M> conf) + throws IOException { + Vertex<I, V, E, M> vertex = conf.createVertex(); + I id = conf.createVertexId(); + V value = conf.createVertexValue(); + VertexEdges<I, E> edges = conf.createVertexEdges(); + vertex.initialize(id, value, edges); + reinitializeVertexFromDataInput(input, vertex, conf); + return vertex; + } + + /** + * Writes Vertex data to output stream. + * + * @param output the output stream + * @param vertex The vertex to serialize + * @param conf Configuration + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message value + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> void writeVertexToDataOutput( + DataOutput output, + Vertex<I, V, E, M> vertex, + ImmutableClassesGiraphConfiguration<I, V, E, M> conf) + throws IOException { + vertex.getId().write(output); + vertex.getValue().write(output); + ((VertexEdges<I, E>) vertex.getEdges()).write(output); + output.writeBoolean(vertex.isHalted()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java index fb5b685..8a048fd 100644 --- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java +++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java @@ -370,7 +370,8 @@ public class TestVertexAndEdges { byte[] byteArray = null; for (int i = 0; i < REPS; ++i) { serializeNanosStart = SystemTime.get().getNanoseconds(); - byteArray = WritableUtils.writeToByteArray(vertex); + byteArray = WritableUtils.writeVertexToByteArray( + vertex, false, vertex.getConf()); serializeNanos += Times.getNanosecondsSince(SystemTime.get(), serializeNanosStart); } @@ -381,13 +382,14 @@ public class TestVertexAndEdges { " bytes / sec for " + edgesClass.getName()); Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> - readVertex = instantiateVertex(edgesClass); - + readVertex = buildVertex(edgesClass); + long deserializeNanosStart; long deserializeNanos = 0; for (int i = 0; i < REPS; ++i) { deserializeNanosStart = SystemTime.get().getNanoseconds(); - WritableUtils.readFieldsFromByteArray(byteArray, readVertex); + WritableUtils.reinitializeVertexFromByteArray(byteArray, readVertex, false, + readVertex.getConf()); deserializeNanos += Times.getNanosecondsSince(SystemTime.get(), deserializeNanosStart); } @@ -416,7 +418,7 @@ public class TestVertexAndEdges { serializeNanosStart = SystemTime.get().getNanoseconds(); outputStream = new DynamicChannelBufferOutputStream(32); - vertex.write(outputStream); + WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf()); serializeNanos += Times.getNanosecondsSince(SystemTime.get(), serializeNanosStart); } @@ -429,7 +431,7 @@ public class TestVertexAndEdges { " bytes / sec for " + edgesClass.getName()); Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> - readVertex = instantiateVertex(edgesClass); + readVertex = buildVertex(edgesClass); long deserializeNanosStart; long deserializeNanos = 0; @@ -438,7 +440,8 @@ public class TestVertexAndEdges { DynamicChannelBufferInputStream inputStream = new DynamicChannelBufferInputStream( outputStream.getDynamicChannelBuffer()); - readVertex.readFields(inputStream); + WritableUtils.reinitializeVertexFromDataInput( + inputStream, readVertex, readVertex.getConf()); deserializeNanos += Times.getNanosecondsSince(SystemTime.get(), deserializeNanosStart); outputStream.getDynamicChannelBuffer().readerIndex(0); @@ -470,7 +473,7 @@ public class TestVertexAndEdges { serializeNanosStart = SystemTime.get().getNanoseconds(); outputStream = new UnsafeByteArrayOutputStream(32); - vertex.write(outputStream); + WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf()); serializeNanos += Times.getNanosecondsSince(SystemTime.get(), serializeNanosStart); } @@ -485,7 +488,7 @@ public class TestVertexAndEdges { " bytes / sec for " + edgesClass.getName()); Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> - readVertex = instantiateVertex(edgesClass); + readVertex = buildVertex(edgesClass); long deserializeNanosStart; long deserializeNanos = 0; @@ -494,7 +497,8 @@ public class TestVertexAndEdges { UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream( outputStream.getByteArray(), 0, outputStream.getPos()); - readVertex.readFields(inputStream); + WritableUtils.reinitializeVertexFromDataInput( + inputStream, readVertex, readVertex.getConf()); deserializeNanos += Times.getNanosecondsSince(SystemTime.get(), deserializeNanosStart); }
