http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java new file mode 100644 index 0000000..1d8fc26 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.MutableEdge; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * A list of edges backed by a byte-array. + * The same Edge object is reused when iterating over all edges, + * unless an alternative iterable is requested. + * It automatically optimizes for edges with no value, + * and also supports shallow copy from another instance. + * + * @param <I> Vertex id + * @param <E> Edge value + */ +public class ByteArrayEdges<I extends WritableComparable, E extends Writable> + implements Iterable<Edge<I, E>>, Writable { + /** Representative edge object. */ + private MutableEdge<I, E> representativeEdge; + /** Serialized edges. */ + private byte[] serializedEdges; + /** Number of bytes used in serializedEdges. */ + private int serializedEdgesBytesUsed; + /** Number of edges. */ + private int edgeCount; + /** Configuration. */ + private ImmutableClassesGiraphConfiguration<I, ?, E, ?> configuration; + + /** + * Constructor. + * Depending on the configuration, instantiates a representative edge with + * or without an edge value. + * + * @param conf Configuration + */ + public ByteArrayEdges(ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf) { + configuration = conf; + representativeEdge = configuration.createMutableEdge(); + ExtendedDataOutput extendedOutputStream = + configuration.createExtendedDataOutput(); + serializedEdges = extendedOutputStream.getByteArray(); + serializedEdgesBytesUsed = extendedOutputStream.getPos(); + } + + /** + * Constructor. + * Takes another instance of {@link ByteArrayEdges} and makes a shallow + * copy of it. + * + * @param edges {@link ByteArrayEdges} to copy + */ + public ByteArrayEdges(ByteArrayEdges<I, E> edges) { + representativeEdge = edges.representativeEdge; + serializedEdges = edges.serializedEdges; + serializedEdgesBytesUsed = edges.serializedEdgesBytesUsed; + edgeCount = edges.edgeCount; + configuration = edges.configuration; + } + + /** + * Append an edge to the serialized representation. + * + * @param edge Edge to append + */ + public final void appendEdge(Edge<I, E> edge) { + ExtendedDataOutput extendedDataOutput = + configuration.createExtendedDataOutput( + serializedEdges, serializedEdgesBytesUsed); + try { + WritableUtils.writeEdge(extendedDataOutput, edge); + } catch (IOException e) { + throw new IllegalStateException("append: Failed to write to the new " + + "byte array"); + } + serializedEdges = extendedDataOutput.getByteArray(); + serializedEdgesBytesUsed = extendedDataOutput.getPos(); + ++edgeCount; + } + + /** + * Set all the edges. + * Note: when possible, use the constructor which takes another {@link + * ByteArrayEdges} instead of a generic {@link Iterable}. + * + * @param edges Iterable of edges + */ + public final void setEdges(Iterable<Edge<I, E>> edges) { + ExtendedDataOutput extendedOutputStream = + configuration.createExtendedDataOutput(); + if (edges != null) { + for (Edge<I, E> edge : edges) { + try { + WritableUtils.writeEdge(extendedOutputStream, edge); + } catch (IOException e) { + throw new IllegalStateException("setEdges: Failed to serialize " + + edge); + } + ++edgeCount; + } + } + serializedEdges = extendedOutputStream.getByteArray(); + serializedEdgesBytesUsed = extendedOutputStream.getPos(); + } + + /** + * Remove the first edge pointing to a target vertex. + * + * @param targetVertexId Target vertex id + * @return True if one such edge was found and removed. + */ + public final boolean removeFirstEdge(I targetVertexId) { + // Note that this is very expensive (deserializes all edges). + ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator(); + int foundStartOffset = 0; + while (iterator.hasNext()) { + Edge<I, E> edge = iterator.next(); + if (edge.getTargetVertexId().equals(targetVertexId)) { + System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(), + serializedEdges, foundStartOffset, + serializedEdgesBytesUsed - iterator.extendedDataInput.getPos()); + serializedEdgesBytesUsed -= + iterator.extendedDataInput.getPos() - foundStartOffset; + --edgeCount; + return true; + } + foundStartOffset = iterator.extendedDataInput.getPos(); + } + + return false; + } + + /** + * Remove all edges pointing to a target vertex. + * + * @param targetVertexId Target vertex id + * @return The number of removed edges + */ + public final int removeAllEdges(I targetVertexId) { + // Note that this is very expensive (deserializes all edges). + ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator(); + int removedCount = 0; + List<Integer> foundStartOffsets = new LinkedList<Integer>(); + List<Integer> foundEndOffsets = new LinkedList<Integer>(); + int lastStartOffset = 0; + while (iterator.hasNext()) { + Edge<I, E> edge = iterator.next(); + if (edge.getTargetVertexId().equals(targetVertexId)) { + foundStartOffsets.add(lastStartOffset); + foundEndOffsets.add(iterator.extendedDataInput.getPos()); + ++removedCount; + } + lastStartOffset = iterator.extendedDataInput.getPos(); + } + foundStartOffsets.add(serializedEdgesBytesUsed); + + Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator(); + Integer foundStartOffset = foundStartOffsetIter.next(); + for (Integer foundEndOffset : foundEndOffsets) { + Integer nextFoundStartOffset = foundStartOffsetIter.next(); + System.arraycopy(serializedEdges, foundEndOffset, + serializedEdges, foundStartOffset, + nextFoundStartOffset - foundEndOffset); + serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset; + foundStartOffset = nextFoundStartOffset; + } + + edgeCount -= removedCount; + return removedCount; + } + + public final int getNumEdges() { + return edgeCount; + } + + /** + * Iterator that uses the representative edge (only one iterator allowed + * at a time). + */ + private final class ByteArrayEdgeIterator implements Iterator<Edge<I, E>> { + /** Input for processing the bytes */ + private final ExtendedDataInput extendedDataInput; + + /** Constructor. */ + ByteArrayEdgeIterator() { + extendedDataInput = configuration.createExtendedDataInput( + serializedEdges, 0, serializedEdgesBytesUsed); + } + + @Override + public boolean hasNext() { + return serializedEdges != null && extendedDataInput.available() > 0; + } + + @Override + public Edge<I, E> next() { + try { + WritableUtils.readEdge(extendedDataInput, representativeEdge); + } catch (IOException e) { + throw new IllegalStateException("next: Failed on pos " + + extendedDataInput.getPos() + " edge " + representativeEdge); + } + return representativeEdge; + } + + @Override + public void remove() { + throw new IllegalAccessError("remove: Not supported"); + } + } + + @Override + public final Iterator<Edge<I, E>> iterator() { + return new ByteArrayEdgeIterator(); + } + + /** + * Release and return the current representative edge. + * + * @return The released edge + */ + private Edge<I, E> releaseCurrentEdge() { + Edge<I, E> releasedEdge = representativeEdge; + representativeEdge = configuration.createMutableEdge(); + return releasedEdge; + } + + /** + * Get an iterable wrapper that creates new Edge objects on the fly. + * + * @return Edge iteratable that creates new objects + */ + public final Iterable<Edge<I, E>> copyEdgeIterable() { + return Iterables.transform(this, + new Function<Edge<I, E>, Edge<I, E>>() { + @Override + public Edge<I, E> apply(Edge<I, E> input) { + return releaseCurrentEdge(); + } + }); + } + + @Override + public final void readFields(DataInput in) throws IOException { + serializedEdgesBytesUsed = in.readInt(); + // Only create a new buffer if the old one isn't big enough + if (serializedEdges == null || + serializedEdgesBytesUsed > serializedEdges.length) { + serializedEdges = new byte[serializedEdgesBytesUsed]; + } + in.readFully(serializedEdges, 0, serializedEdgesBytesUsed); + edgeCount = in.readInt(); + } + + @Override + public final void write(DataOutput out) throws IOException { + out.writeInt(serializedEdgesBytesUsed); + out.write(serializedEdges, 0, serializedEdgesBytesUsed); + out.writeInt(edgeCount); + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java new file mode 100644 index 0000000..bd464d5 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Stores pairs of vertex id and generic data in a single byte array + * + * @param <I> Vertex id + * @param <T> Data + */ +public abstract class ByteArrayVertexIdData<I extends WritableComparable, T> + implements Writable, ImmutableClassesGiraphConfigurable { + /** Extended data output */ + private ExtendedDataOutput extendedDataOutput; + /** Configuration */ + private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> configuration; + + /** + * Create a new data object. + * + * @return Newly-created data object. + */ + public abstract T createData(); + + /** + * Write a data object to an {@link ExtendedDataOutput}. + * + * @param out {@link ExtendedDataOutput} + * @param data Data object to write + * @throws IOException + */ + public abstract void writeData(ExtendedDataOutput out, T data) + throws IOException; + + /** + * Read a data object's fields from an {@link ExtendedDataInput}. + * + * @param in {@link ExtendedDataInput} + * @param data Data object to fill in-place + * @throws IOException + */ + public abstract void readData(ExtendedDataInput in, T data) + throws IOException; + + /** + * Initialize the inner state. Must be called before {@code add()} is + * called. + */ + public void initialize() { + extendedDataOutput = configuration.createExtendedDataOutput(); + } + + /** + * Initialize the inner state, with a known size. Must be called before + * {@code add()} is called. + * + * @param expectedSize Number of bytes to be expected + */ + public void initialize(int expectedSize) { + extendedDataOutput = configuration.createExtendedDataOutput(expectedSize); + } + + /** + * Add a vertex id and data pair to the collection. + * + * @param vertexId Vertex id + * @param data Data + */ + public void add(I vertexId, T data) { + try { + vertexId.write(extendedDataOutput); + writeData(extendedDataOutput, data); + } catch (IOException e) { + throw new IllegalStateException("add: IOException", e); + } + } + + /** + * Get the number of bytes used. + * + * @return Bytes used + */ + public int getSize() { + return extendedDataOutput.getPos(); + } + + /** + * Get the size of this object in serialized form. + * + * @return The size (in bytes) of the serialized object + */ + public int getSerializedSize() { + return 1 + 4 + getSize(); + } + + /** + * Check if the list is empty. + * + * @return Whether the list is empty + */ + public boolean isEmpty() { + return extendedDataOutput.getPos() == 0; + } + + /** + * Clear the list. + */ + public void clear() { + extendedDataOutput.reset(); + } + + /** + * Get the underlying byte-array. + * + * @return The underlying byte-array + */ + public byte[] getByteArray() { + return extendedDataOutput.getByteArray(); + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return configuration; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeInt(extendedDataOutput.getPos()); + dataOutput.write(extendedDataOutput.getByteArray(), 0, + extendedDataOutput.getPos()); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + int size = dataInput.readInt(); + byte[] buf = new byte[size]; + dataInput.readFully(buf); + extendedDataOutput = configuration.createExtendedDataOutput(buf, size); + } + + /** + * Get an iterator over the pairs. + * + * @return Iterator + */ + public VertexIdDataIterator getVertexIdDataIterator() { + return new VertexIdDataIterator(); + } + + /** + * Special iterator that reuses vertex ids and data objects so that the + * lifetime of the object is only until next() is called. + * + * Vertex id ownership can be released if desired through + * releaseCurrentVertexId(). This optimization allows us to cut down + * on the number of objects instantiated and garbage collected. + * + * Not thread-safe. + */ + public class VertexIdDataIterator extends VertexIdIterator<I> { + /** Current data. */ + private T data; + + /** Default constructor. */ + public VertexIdDataIterator() { + super(extendedDataOutput, configuration); + } + + @Override + public void next() { + if (vertexId == null) { + vertexId = configuration.createVertexId(); + } + if (data == null) { + data = createData(); + } + try { + vertexId.readFields(extendedDataInput); + readData(extendedDataInput, data); + } catch (IOException e) { + throw new IllegalStateException("next: IOException", e); + } + } + + /** + * Get the current data. + * + * @return Current data + */ + public T getCurrentData() { + return data; + } + } + +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java new file mode 100644 index 0000000..1cfd21e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Edge; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; + +/** + * Stores vertex id and edge pairs in a single byte array. + * + * @param <I> Vertex id + * @param <E> Edge value + */ +@SuppressWarnings("unchecked") +public class ByteArrayVertexIdEdges<I extends WritableComparable, + E extends Writable> extends ByteArrayVertexIdData<I, Edge<I, E>> { + /** + * Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used + * to generate edge objects. + * + * @return Casted configuration + */ + @Override + public ImmutableClassesGiraphConfiguration<I, ?, E, ?> getConf() { + return (ImmutableClassesGiraphConfiguration<I, ?, E, ?>) super.getConf(); + } + + @Override + public Edge<I, E> createData() { + return getConf().createMutableEdge(); + } + + @Override + public void writeData(ExtendedDataOutput out, Edge<I, E> edge) + throws IOException { + WritableUtils.writeEdge(out, edge); + } + + @Override + public void readData(ExtendedDataInput in, Edge<I, E> edge) + throws IOException { + WritableUtils.readEdge(in, edge); + } + + /** + * Get an iterator over the pairs. + * + * @return Iterator + */ + public VertexIdEdgeIterator getVertexIdEdgeIterator() { + return new VertexIdEdgeIterator(); + } + + /** + * Special iterator that reuses vertex ids and edge objects so that the + * lifetime of the object is only until next() is called. + */ + public class VertexIdEdgeIterator extends VertexIdDataIterator { + public Edge<I, E> getCurrentEdge() { + return getCurrentData(); + } + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java index dea4229..fd06783 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java @@ -17,115 +17,75 @@ */ package org.apache.giraph.utils; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + /** - * Stores vertex id and message pairs in a single byte array + * Stores vertex id and message pairs in a single byte array. * * @param <I> Vertex id * @param <M> Message data */ +@SuppressWarnings("unchecked") public class ByteArrayVertexIdMessages<I extends WritableComparable, - M extends Writable> implements Writable, - ImmutableClassesGiraphConfigurable { - /** Extended data output */ - private ExtendedDataOutput extendedDataOutput; - /** Configuration */ - private ImmutableClassesGiraphConfiguration<I, ?, ?, M> configuration; + M extends Writable> extends ByteArrayVertexIdData<I, M> { /** Add the message size to the stream? (Depends on the message store) */ private boolean useMessageSizeEncoding = false; /** - * Constructor for reflection - */ - public ByteArrayVertexIdMessages() { } - - /** * Set whether message sizes should be encoded. This should only be a * possibility when not combining. When combining, all messages need to be * deserializd right away, so this won't help. */ private void setUseMessageSizeEncoding() { - if (!configuration.useCombiner()) { - useMessageSizeEncoding = configuration.useMessageSizeEncoding(); + if (!getConf().useCombiner()) { + useMessageSizeEncoding = getConf().useMessageSizeEncoding(); } else { useMessageSizeEncoding = false; } } /** - * Initialize the inner state. Must be called before {@code add()} is - * called. + * Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used + * to generate message objects. + * + * @return Casted configuration */ - public void initialize() { - extendedDataOutput = configuration.createExtendedDataOutput(); - setUseMessageSizeEncoding(); + @Override + public ImmutableClassesGiraphConfiguration<I, ?, ?, M> getConf() { + return (ImmutableClassesGiraphConfiguration<I, ?, ?, M>) super.getConf(); } - /** - * Initialize the inner state, with a known size. Must be called before - * {@code add()} is called. - * - * @param expectedSize Number of bytes to be expected - */ - public void initialize(int expectedSize) { - extendedDataOutput = configuration.createExtendedDataOutput(expectedSize); - setUseMessageSizeEncoding(); + @Override + public M createData() { + return getConf().createMessageValue(); } - /** - * Add a vertex id and message pair to the collection. - * - * @param vertexId Vertex id - * @param message Message - */ - public void add(I vertexId, M message) { - try { - vertexId.write(extendedDataOutput); - // Write the size if configured this way, else, just the message - if (useMessageSizeEncoding) { - int pos = extendedDataOutput.getPos(); - extendedDataOutput.skipBytes(4); - message.write(extendedDataOutput); - extendedDataOutput.writeInt( - pos, extendedDataOutput.getPos() - pos - 4); - } else { - message.write(extendedDataOutput); - } - } catch (IOException e) { - throw new IllegalStateException("add: IOException", e); - } + @Override + public void writeData(ExtendedDataOutput out, M message) throws IOException { + message.write(out); } - /** - * Get the number of bytes used - * - * @return Number of bytes used - */ - public int getSize() { - return extendedDataOutput.getPos(); + @Override + public void readData(ExtendedDataInput in, M message) throws IOException { + message.readFields(in); } - /** - * Check if the list is empty. - * - * @return True iff there are no pairs in the list - */ - public boolean isEmpty() { - return extendedDataOutput.getPos() == 0; + @Override + public void initialize() { + super.initialize(); + setUseMessageSizeEncoding(); } - /** - * Clear the collection. - */ - public void clear() { - extendedDataOutput.reset(); + @Override + public void initialize(int expectedSize) { + super.initialize(expectedSize); + setUseMessageSizeEncoding(); } /** @@ -140,6 +100,16 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable, } /** + * Special iterator that reuses vertex ids and message objects so that the + * lifetime of the object is only until next() is called. + */ + public class VertexIdMessageIterator extends VertexIdDataIterator { + public M getCurrentMessage() { + return getCurrentData(); + } + } + + /** * Get specialized iterator that will instiantiate the vertex id and * message of this object. It will only produce message bytes, not actual * messages and expects a different encoding. @@ -155,128 +125,15 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable, } @Override - public void setConf(ImmutableClassesGiraphConfiguration configuration) { - this.configuration = configuration; - } - - @Override - public ImmutableClassesGiraphConfiguration getConf() { - return configuration; - } - - @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeBoolean(useMessageSizeEncoding); - dataOutput.writeInt(extendedDataOutput.getPos()); - dataOutput.write(extendedDataOutput.getByteArray(), 0, - extendedDataOutput.getPos()); + super.write(dataOutput); } @Override public void readFields(DataInput dataInput) throws IOException { useMessageSizeEncoding = dataInput.readBoolean(); - int size = dataInput.readInt(); - byte[] buf = new byte[size]; - dataInput.readFully(buf); - extendedDataOutput = configuration.createExtendedDataOutput(buf, size); - } - - /** - * Get the size of this object in serialized form. - * - * @return The size (in bytes) of serialized object - */ - public int getSerializedSize() { - return 1 + 4 + getSize(); - } - - /** - * Common implementation for VertexIdMessageIterator - * and VertexIdMessageBytesIterator - */ - public abstract class VertexIdIterator { - /** Reader of the serialized messages */ - protected final ExtendedDataInput extendedDataInput = - configuration.createExtendedDataInput( - extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos()); - /** Current vertex id */ - protected I vertexId; - - /** - * Returns true if the iteration has more elements. - * - * @return True if the iteration has more elements. - */ - public boolean hasNext() { - return extendedDataInput.available() > 0; - } - /** - * Moves to the next element in the iteration. - */ - public abstract void next(); - - /** - * Get the current vertex id. Ihis object's contents are only guaranteed - * until next() is called. To take ownership of this object call - * releaseCurrentVertexId() after getting a reference to this object. - * - * @return Current vertex id - */ - public I getCurrentVertexId() { - return vertexId; - } - /** - * The backing store of the current vertex id is now released. - * Further calls to getCurrentVertexId () without calling next() - * will return null. - * - * @return Current vertex id that was released - */ - public I releaseCurrentVertexId() { - I releasedVertexId = vertexId; - vertexId = null; - return releasedVertexId; - } - } - - /** - * Special iterator that reuses vertex ids and messages so that the - * lifetime of the object is only until next() is called. - * - * Vertex id ownership can be released if desired through - * releaseCurrentVertexId(). This optimization allows us to cut down - * on the number of objects instantiated and garbage collected. - * - * Not thread-safe. - */ - public class VertexIdMessageIterator extends VertexIdIterator { - /** Current message */ - private M message; - - @Override - public void next() { - if (vertexId == null) { - vertexId = configuration.createVertexId(); - } - if (message == null) { - message = configuration.createMessageValue(); - } - try { - vertexId.readFields(extendedDataInput); - message.readFields(extendedDataInput); - } catch (IOException e) { - throw new IllegalStateException("next: IOException", e); - } - } - - /** - * Get the current message - * - * @return Current message - */ - public M getCurrentMessage() { - return message; - } + super.readFields(dataInput); } /** @@ -290,7 +147,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable, * * Not thread-safe. */ - public class VertexIdMessageBytesIterator extends VertexIdIterator { + public class VertexIdMessageBytesIterator extends VertexIdDataIterator { /** Last message offset */ private int messageOffset = -1; /** Number of bytes in the last message */ @@ -302,7 +159,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable, @Override public void next() { if (vertexId == null) { - vertexId = configuration.createVertexId(); + vertexId = getConf().createVertexId(); } try { @@ -326,8 +183,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable, public void writeCurrentMessageBytes( ExtendedDataOutput dataOutput) { try { - dataOutput.write( - extendedDataOutput.getByteArray(), messageOffset, messageBytes); + dataOutput.write(getByteArray(), messageOffset, messageBytes); } catch (IOException e) { throw new IllegalStateException("writeCurrentMessageBytes: Got " + "IOException", e); http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java new file mode 100644 index 0000000..0c9ee07 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.WritableComparable; + +/** + * Common implementation for VertexIdEdgeIterator, VertexIdMessageIterator + * and VertexIdMessageBytesIterator. + * + * @param <I> Vertex id + */ +public abstract class VertexIdIterator<I extends WritableComparable> { + /** Reader of the serialized edges */ + protected final ExtendedDataInput extendedDataInput; + + /** Current vertex id */ + protected I vertexId; + + /** + * Constructor. + * + * @param extendedDataOutput Extended data output + * @param configuration Configuration + */ + public VertexIdIterator( + ExtendedDataOutput extendedDataOutput, + ImmutableClassesGiraphConfiguration<I, ?, ?, ?> configuration) { + extendedDataInput = configuration.createExtendedDataInput( + extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos()); + } + + /** + * Returns true if the iteration has more elements. + * + * @return True if the iteration has more elements. + */ + public boolean hasNext() { + return extendedDataInput.available() > 0; + } + /** + * Moves to the next element in the iteration. + */ + public abstract void next(); + + /** + * Get the current vertex id. Ihis object's contents are only guaranteed + * until next() is called. To take ownership of this object call + * releaseCurrentVertexId() after getting a reference to this object. + * + * @return Current vertex id + */ + public I getCurrentVertexId() { + return vertexId; + } + /** + * The backing store of the current vertex id is now released. + * Further calls to getCurrentVertexId () without calling next() + * will return null. + * + * @return Current vertex id that was released + */ + public I releaseCurrentVertexId() { + I releasedVertexId = vertexId; + vertexId = null; + return releasedVertexId; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index fefe9a0..ef94645 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -18,10 +18,12 @@ package org.apache.giraph.utils; +import org.apache.giraph.graph.Edge; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.giraph.zk.ZooKeeperExt.PathStat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -30,6 +32,7 @@ import org.apache.zookeeper.data.Stat; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; @@ -323,4 +326,36 @@ public class WritableUtils { e); } } + + /** + * Write an edge to an output stream. + * + * @param out Data output + * @param edge Edge to write + * @param <I> Vertex id + * @param <E> Edge value + * @throws IOException + */ + public static <I extends WritableComparable, E extends Writable> + void writeEdge(DataOutput out, Edge<I, E> edge) + throws IOException { + edge.getTargetVertexId().write(out); + edge.getValue().write(out); + } + + /** + * Read an edge from an input stream. + * + * @param in Data input + * @param edge Edge to fill in-place + * @param <I> Vertex id + * @param <E> Edge value + * @throws IOException + */ + public static <I extends WritableComparable, E extends Writable> + void readEdge(DataInput in, Edge<I, E> edge) + throws IOException { + edge.getTargetVertexId().readFields(in); + edge.getValue().readFields(in); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java new file mode 100644 index 0000000..1e56b20 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.vertex; + +import org.apache.giraph.graph.Edge; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * A vertex whose edges are backed by a byte-array. + * + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message data + */ +public abstract class ByteArrayVertex< + I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends ByteArrayVertexBase<I, V, E, M> { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(ByteArrayVertex.class); + + @Override + public final boolean addEdge(Edge<I, E> edge) { + // Note that this is very expensive (deserializes all edges + // in an addEdge() request). + // Hopefully the user set all the edges in setEdges(). + for (Edge<I, E> currentEdge : getEdges()) { + if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) { + LOG.warn("addEdge: Vertex=" + getId() + + ": already added an edge value for target vertex id " + + edge.getTargetVertexId()); + return false; + } + } + appendEdge(edge); + return true; + } + + @Override + public final int removeEdges(I targetVertexId) { + return removeFirstEdge(targetVertexId) ? 1 : 0; + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java new file mode 100644 index 0000000..26c3f62 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.vertex; + +import org.apache.giraph.graph.Edge; +import org.apache.giraph.utils.ByteArrayEdges; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Common base class for byte-array backed vertices. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message data + */ +public abstract class ByteArrayVertexBase< + I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends MutableVertex<I, V, E, M> { + /** Serialized edge list. */ + private ByteArrayEdges<I, E> edges; + + /** + * Append an edge to the serialized representation. + * + * @param edge Edge to append + */ + protected void appendEdge(Edge<I, E> edge) { + edges.appendEdge(edge); + } + + /** + * Remove the first edge pointing to a target vertex. + * + * @param targetVertexId Target vertex id + * @return True if one such edge was found and removed. + */ + protected boolean removeFirstEdge(I targetVertexId) { + return edges.removeFirstEdge(targetVertexId); + } + + /** + * Remove all edges pointing to a target vertex. + * + * @param targetVertexId Target vertex id + * @return The number of removed edges + */ + protected int removeAllEdges(I targetVertexId) { + return edges.removeAllEdges(targetVertexId); + } + + @Override + public final void setEdges(Iterable<Edge<I, E>> edges) { + // If the edge iterable is backed by a byte-array, + // we simply get a shallow copy of it. + if (edges instanceof ByteArrayEdges) { + this.edges = new ByteArrayEdges<I, E>((ByteArrayEdges<I, E>) edges); + } else { + this.edges = new ByteArrayEdges<I, E>(getConf()); + this.edges.setEdges(edges); + } + } + + @Override + public final Iterable<Edge<I, E>> getEdges() { + return edges; + } + + @Override + public final int getNumEdges() { + return edges.getNumEdges(); + } + + @Override + public final void readFields(DataInput in) throws IOException { + I vertexId = getId(); + if (vertexId == null) { + vertexId = getConf().createVertexId(); + } + vertexId.readFields(in); + + V vertexValue = getValue(); + if (vertexValue == null) { + vertexValue = getConf().createVertexValue(); + } + vertexValue.readFields(in); + + initialize(vertexId, vertexValue); + + edges.readFields(in); + + readHaltBoolean(in); + } + + @Override + public final void write(DataOutput out) throws IOException { + getId().write(out); + getValue().write(out); + + edges.write(out); + + out.writeBoolean(isHalted()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java index 9ae692f..882bbb8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java +++ b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java @@ -29,7 +29,7 @@ import java.util.Iterator; * User applications can subclass {@link EdgeListVertex}, which stores * the outbound edges in an ArrayList (less memory as the cost of expensive * random-access lookup). Good for static graphs. Not nearly as memory - * efficient as using RepresentativeVertex + ByteArrayPartition + * efficient as using ByteArrayVertex + ByteArrayPartition * (probably about 10x more), but not bad when keeping vertices as objects in * memory (SimplePartition). * http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphByteArrayVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphByteArrayVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphByteArrayVertex.java new file mode 100644 index 0000000..a50f48d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphByteArrayVertex.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.vertex; + +import org.apache.giraph.graph.Edge; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Similar to {@link ByteArrayVertex}, but allows for parallel edges. + * + * Note: removeEdge() here removes all edges pointing to the target vertex, + * but returns only one of them (or null if there are no such edges). + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message data + */ +public abstract class MultiGraphByteArrayVertex<I extends + WritableComparable, V extends Writable, E extends Writable, + M extends Writable> extends ByteArrayVertexBase<I, V, E, M> { + @Override + public final boolean addEdge(Edge<I, E> edge) { + appendEdge(edge); + return true; + } + + @Override + public final int removeEdges(I targetVertexId) { + return removeAllEdges(targetVertexId); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index fa3ab49..3256a02 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -565,10 +565,8 @@ else[HADOOP_NON_SECURE]*/ } if (getConfiguration().hasEdgeInputFormat()) { - // Create vertices from added edges via vertex resolver. - // Doing this at the beginning of superstep 0 is not enough, - // because we want the vertex/edge stats to be accurate. - workerServer.resolveMutations(graphState); + // Move edges from temporary storage to their source vertices. + getServerData().getEdgeStore().moveEdgesToVertices(); } // Generate the partition stats for the input superstep and process http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index bdf9f57..c577cbb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -130,7 +130,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, "without a value! - " + readerEdge); } - graphState.getWorkerClientRequestProcessor().addEdgeRequest( + graphState.getWorkerClientRequestProcessor().sendEdgeRequest( sourceId, readerEdge); context.progress(); // do this before potential data transfer ++inputSplitEdgesLoaded; http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java index 2845c90..c42754f 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java @@ -103,10 +103,8 @@ public class RequestFailureTest { } // Send the request - SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable, - IntWritable> request = - new SendWorkerMessagesRequest<IntWritable, IntWritable, - IntWritable, IntWritable>(dataToSend); + SendWorkerMessagesRequest<IntWritable, IntWritable> request = + new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend); return request; } http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index d779fe4..ef1920d 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -171,10 +171,8 @@ public class RequestTest { } // Send the request - SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable, - IntWritable> request = - new SendWorkerMessagesRequest<IntWritable, IntWritable, - IntWritable, IntWritable>(dataToSend); + SendWorkerMessagesRequest<IntWritable, IntWritable> request = + new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend); client.sendWritableRequest(workerInfo.getTaskId(), request); client.waitAllRequests(); http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java index 82dc283..04c7a3c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java +++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java @@ -161,6 +161,7 @@ public class MockUtils { createNewServerData(ImmutableClassesGiraphConfiguration conf, Mapper.Context context) { return new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>( + Mockito.mock(CentralizedServiceWorker.class), conf, ByteArrayMessagesPerVertexStore.newFactory( MockUtils.mockServiceGetVertexPartitionOwner(1), conf), http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java index a5a3545..11fc8a5 100644 --- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java +++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java @@ -45,9 +45,9 @@ public class TestMultiGraphVertex { public void compute(Iterable<IntWritable> messages) throws IOException { } } - public static class MyMultiGraphRepresentativeVertex - extends MultiGraphRepresentativeVertex<IntWritable, IntWritable, - IntWritable, IntWritable> { + public static class MyMultiGraphByteArrayVertex + extends MultiGraphByteArrayVertex<IntWritable, IntWritable, + IntWritable, IntWritable> { @Override public void compute(Iterable<IntWritable> messages) throws IOException { } } @@ -55,7 +55,7 @@ public class TestMultiGraphVertex { @Before public void setUp() { vertexClasses.add(MyMultiGraphEdgeListVertex.class); - vertexClasses.add(MyMultiGraphRepresentativeVertex.class); + vertexClasses.add(MyMultiGraphByteArrayVertex.class); } @Test http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java index ca4ba1a..cbbaa1b 100644 --- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java +++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java @@ -80,10 +80,10 @@ public class TestMutableVertex { /** * Simple instantiable class that extends - * {@link org.apache.giraph.vertex.RepresentativeVertex}. + * {@link ByteArrayVertex}. */ - public static class IFDLRepresentativeVertex extends RepresentativeVertex<IntWritable, FloatWritable, DoubleWritable, - LongWritable> { + public static class IFDLRepresentativeVertex extends ByteArrayVertex<IntWritable, FloatWritable, DoubleWritable, + LongWritable> { @Override public void compute(Iterable<LongWritable> messages) throws IOException { } }
