Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java?rev=1414361&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java Tue Nov 27 20:01:38 2012 @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import java.util.Iterator; +import org.apache.giraph.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; + +/** + * This iterable is designed to deserialize a byte array on the fly to + * provide new copies of writable objects when desired. It does not reuse + * objects, and instead creates a new one for every next(). + * + * @param <T> Type that extends Writable that will be iterated + */ +public abstract class ByteArrayIterable<T extends Writable> implements + Iterable<T> { + /** Configuration */ + protected final ImmutableClassesGiraphConfiguration configuration; + /** Data input */ + protected final byte[] buf; + /** Offset to start in buf */ + protected final int off; + /** Length of buf */ + protected final int length; + + /** + * Constructor + * + * @param configuration Configuration + * @param buf Buffer + * @param off Offset to start in the buffer + * @param length Length of the buffer + */ + public ByteArrayIterable(ImmutableClassesGiraphConfiguration configuration, + byte[] buf, int off, int length) { + this.configuration = configuration; + this.buf = buf; + this.off = off; + this.length = length; + } + + /** + * Must be able to create the writable object + * + * @return New writable + */ + protected abstract T createWritable(); + + /** + * Iterator over the internal byte array + */ + private class ByteArrayIterableIterator extends ByteArrayIterator<T> { + /** + * Constructor. + * + * @param buf Buffer to read from + * @param off Offset to read from in the buffer + * @param length Maximum length of the buffer + */ + private ByteArrayIterableIterator(byte[] buf, int off, int length) { + super(configuration, buf, off, length); + } + + @Override + protected T createWritable() { + return ByteArrayIterable.this.createWritable(); + } + } + + @Override + public Iterator<T> iterator() { + return new ByteArrayIterableIterator(buf, off, length); + } +}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java?rev=1414361&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java Tue Nov 27 20:01:38 2012 @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.giraph.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; + +/** + * This iterator is designed to deserialize a byte array on the fly to + * provide new copies of writable objects when desired. It does not reuse + * objects, and instead creates a new one for every next(). + * + * @param <T> Type that extends Writable that will be iterated + */ +public abstract class ByteArrayIterator<T extends Writable> implements + Iterator<T> { + /** Data input */ + protected final ExtendedDataInput extendedDataInput; + + /** + * Constructor + * + * @param configuration Configuration + * @param buf Buffer + * @param off Offset to start in the buffer + * @param length Length of the buffer + */ + public ByteArrayIterator( + ImmutableClassesGiraphConfiguration configuration, + byte[] buf, int off, int length) { + extendedDataInput = + configuration.createExtendedDataInput(buf, off, length); + } + + @Override + public boolean hasNext() { + return extendedDataInput.available() > 0; + } + + @Override + public T next() { + T writable = createWritable(); + try { + writable.readFields(extendedDataInput); + } catch (IOException e) { + throw new IllegalStateException("next: readFields got IOException", e); + } + return writable; + } + + @Override + public void remove() { + throw new IllegalAccessError("remove: Not supported"); + } + + /** + * Must be able to create the writable object + * + * @return New writable + */ + protected abstract T createWritable(); +} Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java?rev=1414361&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java Tue Nov 27 20:01:38 2012 @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.giraph.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Stores vertex id and message pairs in a single byte array + * + * @param <I> Vertex id + * @param <M> Message data + */ +public class ByteArrayVertexIdMessages<I extends WritableComparable, + M extends Writable> implements Writable, + ImmutableClassesGiraphConfigurable { + /** Extended data output */ + private ExtendedDataOutput extendedDataOutput; + /** Configuration */ + private ImmutableClassesGiraphConfiguration<I, ?, ?, M> configuration; + /** Add the message size to the stream? (Depends on the message store) */ + private boolean useMessageSizeEncoding = false; + + /** + * Constructor for reflection + */ + public ByteArrayVertexIdMessages() { } + + /** + * Set whether message sizes should be encoded. This should only be a + * possibility when not combining. When combining, all messages need to be + * deserializd right away, so this won't help. + */ + private void setUseMessageSizeEncoding() { + if (!configuration.useCombiner()) { + useMessageSizeEncoding = configuration.useMessageSizeEncoding(); + } else { + useMessageSizeEncoding = false; + } + } + + /** + * Initialize the inner state. Must be called before {@code add()} is + * called. + */ + public void initialize() { + extendedDataOutput = configuration.createExtendedDataOutput(); + setUseMessageSizeEncoding(); + } + + /** + * Initialize the inner state, with a known size. Must be called before + * {@code add()} is called. + * + * @param expectedSize Number of bytes to be expected + */ + public void initialize(int expectedSize) { + extendedDataOutput = configuration.createExtendedDataOutput(expectedSize); + setUseMessageSizeEncoding(); + } + + /** + * Add a vertex id and message pair to the collection. + * + * @param vertexId Vertex id + * @param message Message + */ + public void add(I vertexId, M message) { + try { + vertexId.write(extendedDataOutput); + // Write the size if configured this way, else, just the message + if (useMessageSizeEncoding) { + int pos = extendedDataOutput.getPos(); + extendedDataOutput.skipBytes(4); + message.write(extendedDataOutput); + extendedDataOutput.writeInt( + pos, extendedDataOutput.getPos() - pos - 4); + } else { + message.write(extendedDataOutput); + } + } catch (IOException e) { + throw new IllegalStateException("add: IOException", e); + } + } + + /** + * Get the number of bytes used + * + * @return Number of bytes used + */ + public int getSize() { + return extendedDataOutput.getPos(); + } + + /** + * Check if the list is empty. + * + * @return True iff there are no pairs in the list + */ + public boolean isEmpty() { + return extendedDataOutput.getPos() == 0; + } + + /** + * Clear the collection. + */ + public void clear() { + extendedDataOutput.reset(); + } + + /** + * Get specialized iterator that will instiantiate the vertex id and + * message of this object. + * + * @return Special iterator that reuses vertex ids and messages unless + * specified + */ + public VertexIdMessageIterator getVertexIdMessageIterator() { + return new VertexIdMessageIterator(); + } + + /** + * Get specialized iterator that will instiantiate the vertex id and + * message of this object. It will only produce message bytes, not actual + * messages and expects a different encoding. + * + * @return Special iterator that reuses vertex ids (unless released) and + * copies message bytes + */ + public VertexIdMessageBytesIterator getVertexIdMessageBytesIterator() { + if (!useMessageSizeEncoding) { + return null; + } + return new VertexIdMessageBytesIterator(); + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return configuration; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeBoolean(useMessageSizeEncoding); + dataOutput.writeInt(extendedDataOutput.getPos()); + dataOutput.write(extendedDataOutput.getByteArray(), 0, + extendedDataOutput.getPos()); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + useMessageSizeEncoding = dataInput.readBoolean(); + int size = dataInput.readInt(); + byte[] buf = new byte[size]; + dataInput.readFully(buf); + extendedDataOutput = configuration.createExtendedDataOutput(buf, size); + } + + /** + * Common implementation for VertexIdMessageIterator + * and VertexIdMessageBytesIterator + */ + public abstract class VertexIdIterator { + /** Reader of the serialized messages */ + protected final ExtendedDataInput extendedDataInput = + configuration.createExtendedDataInput( + extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos()); + /** Current vertex id */ + protected I vertexId; + + /** + * Returns true if the iteration has more elements. + * + * @return True if the iteration has more elements. + */ + public boolean hasNext() { + return extendedDataInput.available() > 0; + } + /** + * Moves to the next element in the iteration. + */ + public abstract void next(); + + /** + * Get the current vertex id. Ihis object's contents are only guaranteed + * until next() is called. To take ownership of this object call + * releaseCurrentVertexId() after getting a reference to this object. + * + * @return Current vertex id + */ + public I getCurrentVertexId() { + return vertexId; + } + /** + * The backing store of the current vertex id is now released. + * Further calls to getCurrentVertexId () without calling next() + * will return null. + * + * @return Current vertex id that was released + */ + public I releaseCurrentVertexId() { + I releasedVertexId = vertexId; + vertexId = null; + return releasedVertexId; + } + } + + /** + * Special iterator that reuses vertex ids and messages so that the + * lifetime of the object is only until next() is called. + * + * Vertex id ownership can be released if desired through + * releaseCurrentVertexId(). This optimization allows us to cut down + * on the number of objects instantiated and garbage collected. + * + * Not thread-safe. + */ + public class VertexIdMessageIterator extends VertexIdIterator { + /** Current message */ + private M message; + + @Override + public void next() { + if (vertexId == null) { + vertexId = configuration.createVertexId(); + } + if (message == null) { + message = configuration.createMessageValue(); + } + try { + vertexId.readFields(extendedDataInput); + message.readFields(extendedDataInput); + } catch (IOException e) { + throw new IllegalStateException("next: IOException", e); + } + } + + /** + * Get the current message + * + * @return Current message + */ + public M getCurrentMessage() { + return message; + } + } + + /** + * Special iterator that reuses vertex ids and messages bytes so that the + * lifetime of the object is only until next() is called. + * + * Vertex id ownership can be released if desired through + * releaseCurrentVertexId(). This optimization allows us to cut down + * on the number of objects instantiated and garbage collected. Messages + * can only be copied to an ExtendedDataOutput object + * + * Not thread-safe. + */ + public class VertexIdMessageBytesIterator extends VertexIdIterator { + /** Last message offset */ + private int messageOffset = -1; + /** Number of bytes in the last message */ + private int messageBytes = -1; + + /** + * Moves to the next element in the iteration. + */ + @Override + public void next() { + if (vertexId == null) { + vertexId = configuration.createVertexId(); + } + + try { + vertexId.readFields(extendedDataInput); + messageBytes = extendedDataInput.readInt(); + messageOffset = extendedDataInput.getPos(); + if (extendedDataInput.skipBytes(messageBytes) != messageBytes) { + throw new IllegalStateException("next: Failed to skip " + + messageBytes); + } + } catch (IOException e) { + throw new IllegalStateException("next: IOException", e); + } + } + + /** + * Write the current message to an ExtendedDataOutput object + * + * @param dataOutput Where the current message will be written to + */ + public void writeCurrentMessageBytes( + ExtendedDataOutput dataOutput) { + try { + dataOutput.write( + extendedDataOutput.getByteArray(), messageOffset, messageBytes); + } catch (IOException e) { + throw new IllegalStateException("writeCurrentMessageBytes: Got " + + "IOException", e); + } + } + } +} Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java Tue Nov 27 20:01:38 2012 @@ -18,17 +18,23 @@ package org.apache.giraph.utils; +import com.google.common.collect.Iterables; import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.ConcurrentMap; +import org.apache.log4j.Logger; /** Helper methods for Collections */ public class CollectionUtils { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(CollectionUtils.class); + /** Do not instantiate. */ private CollectionUtils() { } /** - * If map already has value associated with the key it adds values to that - * value, otherwise it will put values to the map. + * If map already has a value associated with the key it adds values to that + * value, otherwise it will put values to the map. Do not reuse values. * * @param key Key under which we are adding values * @param values Values we want to add @@ -52,4 +58,61 @@ public class CollectionUtils { } return currentValues; } + + /** + * Helper method to check if iterables are equal. Supports the case + * where the iterable next() returns a reused object. We do assume that + * iterator() produces the objects in the same order across repeated calls, + * if the object doesn't change. This is very expensive (n^2) and should + * be used for testing only. + * + * @param first First iterable + * @param second Second iterable + * @param <T> Type to compare + * @return True if equal, false otherwise + */ + public static <T> boolean isEqual(Iterable<T> first, Iterable<T> second) { + // Relies on elements from the iterator arriving in the same order. + // For every element in first, check elements on the second iterable by + // marking the ones seen that have been found. Then ensure that all + // the elements of the second have been seen as well. + int firstSize = Iterables.size(first); + int secondSize = Iterables.size(second); + boolean[] usedSecondArray = new boolean[secondSize]; + Iterator<T> firstIterator = first.iterator(); + while (firstIterator.hasNext()) { + T firstValue = firstIterator.next(); + boolean foundFirstValue = false; + Iterator<T> secondIterator = second.iterator(); + for (int i = 0; i < usedSecondArray.length; ++i) { + T secondValue = secondIterator.next(); + if (!usedSecondArray[i]) { + if (firstValue.equals(secondValue)) { + usedSecondArray[i] = true; + foundFirstValue = true; + break; + } + } + } + + if (!foundFirstValue) { + LOG.error("isEqual: Couldn't find element from first (" + firstValue + + ") in second " + second + "(size=" + secondSize + ")"); + return false; + } + } + + Iterator<T> secondIterator = second.iterator(); + for (int i = 0; i < usedSecondArray.length; ++i) { + T secondValue = secondIterator.next(); + if (!usedSecondArray[i]) { + LOG.error("isEqual: Element " + secondValue + " (index " + i + + ") in second " + second + "(size=" + secondSize + + ") not found in " + first + " (size=" + firstSize + ")"); + return false; + } + } + + return true; + } } Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java Tue Nov 27 20:01:38 2012 @@ -27,6 +27,20 @@ import java.util.NoSuchElementException; * @param <M> Message data */ public class EmptyIterable<M> implements Iterable<M>, Iterator<M> { + /** Singleton empty iterable */ + private static final EmptyIterable<Object> EMPTY_ITERABLE = + new EmptyIterable<Object>(); + + /** + * Get the singleton empty iterable + * + * @param <T> Type of the empty iterable + * @return Empty singleton iterable + */ + public static <T> Iterable<T> emptyIterable() { + return (Iterable<T>) EMPTY_ITERABLE; + } + @Override public Iterator<M> iterator() { return this; Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java Tue Nov 27 20:01:38 2012 @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Arrays; /** * Adds some functionality to ByteArrayOutputStream, @@ -140,6 +141,14 @@ public class ExtendedByteArrayDataOutput } @Override + public void skipBytes(int bytesToSkip) { + if ((count + bytesToSkip) > buf.length) { + buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + bytesToSkip)); + } + count += bytesToSkip; + } + + @Override public void writeInt(int position, int value) { if (position + 4 > count) { throw new IndexOutOfBoundsException( Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java Tue Nov 27 20:01:38 2012 @@ -24,6 +24,13 @@ import java.io.DataOutput; */ public interface ExtendedDataOutput extends DataOutput { /** + * Skip some number of bytes. + * + * @param bytesToSkip Number of bytes to skip + */ + void skipBytes(int bytesToSkip); + + /** * In order to write a size as a first part of an data output, it is * useful to be able to write an int at an arbitrary location in the stream * Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java?rev=1414361&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java Tue Nov 27 20:01:38 2012 @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import java.util.Iterator; +import org.apache.giraph.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; + +/** + * The objects provided by the iterators generated from this object have + * lifetimes only until next() is called. In that sense, the object + * provided is only a representative object. + * + * @param <T> Type that extends Writable that will be iterated + */ +public abstract class RepresentativeByteArrayIterable<T extends Writable> + extends ByteArrayIterable<T> { + /** + * Constructor + * + * @param configuration Configuration + * @param buf Buffer + * @param off Offset to start in the buffer + * @param length Length of the buffer + */ + public RepresentativeByteArrayIterable( + ImmutableClassesGiraphConfiguration configuration, + byte[] buf, int off, int length) { + super(configuration, buf, off, length); + } + + /** + * Iterator over the internal byte array + */ + private class RepresentativeByteArrayIterableIterator extends + RepresentativeByteArrayIterator<T> { + /** + * Constructor. + * + * @param buf Buffer to read from + * @param off Offset to read from in the buffer + * @param length Maximum length of the buffer + */ + private RepresentativeByteArrayIterableIterator( + byte[] buf, int off, int length) { + super(configuration, buf, off, length); + } + + @Override + protected T createWritable() { + return RepresentativeByteArrayIterable.this.createWritable(); + } + } + + @Override + public Iterator<T> iterator() { + return new RepresentativeByteArrayIterableIterator(buf, off, length); + } +} Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java?rev=1414361&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java Tue Nov 27 20:01:38 2012 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import java.io.IOException; +import org.apache.giraph.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; + +/** + * The objects provided by this iterator have lifetimes only until next() is + * called. In that sense, the object provided is only a representative object. + * + * @param <T> Type that extends Writable that will be iterated + */ +public abstract class RepresentativeByteArrayIterator<T extends + Writable> extends ByteArrayIterator<T> { + /** Representative writable */ + private final T representativeWritable = createWritable(); + + /** + * Constructor + * + * @param configuration Configuration + * @param buf buffer to read from + * @param off Offset into the buffer to start from + * @param length Length of the buffer + */ + public RepresentativeByteArrayIterator( + ImmutableClassesGiraphConfiguration configuration, + byte[] buf, int off, int length) { + super(configuration, buf, off, length); + } + + @Override + public T next() { + try { + representativeWritable.readFields(extendedDataInput); + } catch (IOException e) { + throw new IllegalStateException("next: readFields got IOException", e); + } + return representativeWritable; + } +} Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java Tue Nov 27 20:01:38 2012 @@ -212,6 +212,14 @@ public class UnsafeByteArrayOutputStream } @Override + public void skipBytes(int bytesToSkip) { + if ((pos + bytesToSkip) > buf.length) { + buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + bytesToSkip)); + } + pos += bytesToSkip; + } + + @Override public void writeInt(int pos, int value) { if (pos + SIZE_OF_INT > this.pos) { throw new IndexOutOfBoundsException( Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Tue Nov 27 20:01:38 2012 @@ -20,7 +20,6 @@ package org.apache.giraph.comm; import com.google.common.collect.Lists; import java.io.IOException; -import java.util.Collection; import org.apache.giraph.GiraphConfiguration; import org.apache.giraph.ImmutableClassesGiraphConfiguration; import org.apache.giraph.comm.netty.NettyClient; @@ -30,7 +29,7 @@ import org.apache.giraph.comm.requests.S import org.apache.giraph.comm.requests.WritableRequest; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.WorkerInfo; -import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.MockUtils; import org.apache.giraph.utils.PairList; import org.apache.hadoop.io.IntWritable; @@ -82,14 +81,14 @@ public class RequestFailureTest { private WritableRequest getRequest() { // Data to send final int partitionId = 0; - PairList<Integer, ByteArrayVertexIdMessageCollection<IntWritable, - IntWritable>> + PairList<Integer, ByteArrayVertexIdMessages<IntWritable, + IntWritable>> dataToSend = new PairList<Integer, - ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>>(); + ByteArrayVertexIdMessages<IntWritable, IntWritable>>(); dataToSend.initialize(); - ByteArrayVertexIdMessageCollection<IntWritable, - IntWritable> vertexIdMessages = - new ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>(); + ByteArrayVertexIdMessages<IntWritable, + IntWritable> vertexIdMessages = + new ByteArrayVertexIdMessages<IntWritable, IntWritable>(); vertexIdMessages.setConf(conf); vertexIdMessages.initialize(); dataToSend.add(partitionId, vertexIdMessages); @@ -116,7 +115,7 @@ public class RequestFailureTest { int messageSum = 0; for (IntWritable vertexId : vertices) { keySum += vertexId.get(); - Collection<IntWritable> messages = + Iterable<IntWritable> messages = serverData.getIncomingMessageStore().getVertexMessages(vertexId); synchronized (messages) { for (IntWritable message : messages) { Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Nov 27 20:01:38 2012 @@ -21,7 +21,6 @@ package org.apache.giraph.comm; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.io.IOException; -import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -40,7 +39,7 @@ import org.apache.giraph.graph.VertexMut import org.apache.giraph.graph.WorkerInfo; import org.apache.giraph.graph.partition.Partition; import org.apache.giraph.graph.partition.PartitionStore; -import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.MockUtils; import org.apache.giraph.utils.PairList; import org.apache.hadoop.io.IntWritable; @@ -144,15 +143,15 @@ public class RequestTest { @Test public void sendWorkerMessagesRequest() throws IOException { // Data to send - PairList<Integer, ByteArrayVertexIdMessageCollection<IntWritable, - IntWritable>> + PairList<Integer, ByteArrayVertexIdMessages<IntWritable, + IntWritable>> dataToSend = new PairList<Integer, - ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>>(); + ByteArrayVertexIdMessages<IntWritable, IntWritable>>(); dataToSend.initialize(); int partitionId = 0; - ByteArrayVertexIdMessageCollection<IntWritable, - IntWritable> vertexIdMessages = - new ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>(); + ByteArrayVertexIdMessages<IntWritable, + IntWritable> vertexIdMessages = + new ByteArrayVertexIdMessages<IntWritable, IntWritable>(); vertexIdMessages.setConf(conf); vertexIdMessages.initialize(); dataToSend.add(partitionId, vertexIdMessages); @@ -182,7 +181,7 @@ public class RequestTest { int messageSum = 0; for (IntWritable vertexId : vertices) { keySum += vertexId.get(); - Collection<IntWritable> messages = + Iterable<IntWritable> messages = serverData.getIncomingMessageStore().getVertexMessages(vertexId); synchronized (messages) { for (IntWritable message : messages) { Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java Tue Nov 27 20:01:38 2012 @@ -18,19 +18,38 @@ package org.apache.giraph.comm; -import org.apache.commons.collections.CollectionUtils; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; import org.apache.giraph.GiraphConfiguration; import org.apache.giraph.ImmutableClassesGiraphConfiguration; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.messages.BasicMessageStore; -import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore; -import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition; +import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore; import org.apache.giraph.comm.messages.DiskBackedMessageStore; +import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition; import org.apache.giraph.comm.messages.FlushableMessageStore; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.comm.messages.SequentialFileMessageStore; import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; +import org.apache.giraph.utils.CollectionUtils; import org.apache.giraph.utils.MockUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; @@ -42,25 +61,6 @@ import org.junit.Test; import static org.junit.Assert.assertTrue; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; - /** Test for different types of message stores */ public class TestMessageStores { private static String directory; @@ -69,6 +69,11 @@ public class TestMessageStores { private static CentralizedServiceWorker<IntWritable, IntWritable, IntWritable, IntWritable> service; + /** + * Pseudo-random number generator with the same seed to help with + * debugging) + */ + private static final Random RANDOM = new Random(101); private static class IntVertex extends EdgeListVertex<IntWritable, IntWritable, IntWritable, IntWritable> { @@ -122,27 +127,68 @@ public class TestMessageStores { SortedMap<IntWritable, Collection<IntWritable>> allMessages = new TreeMap<IntWritable, Collection<IntWritable>>(); for (int v = 0; v < testData.numVertices; v++) { - int messageNum = (int) (Math.random() * testData.maxNumberOfMessages); + int messageNum = + (int) (RANDOM.nextFloat() * testData.maxNumberOfMessages); Collection<IntWritable> vertexMessages = Lists.newArrayList(); for (int m = 0; m < messageNum; m++) { vertexMessages.add( - new IntWritable((int) (Math.random() * testData.maxMessage))); + new IntWritable((int) (RANDOM.nextFloat() * testData.maxMessage))); } IntWritable vertexId = - new IntWritable((int) (Math.random() * testData.maxId)); + new IntWritable((int) (RANDOM.nextFloat() * testData.maxId)); allMessages.put(vertexId, vertexMessages); } return allMessages; } - private void putNTimes - (MessageStore<IntWritable, IntWritable> messageStore, - Map<IntWritable, Collection<IntWritable>> messages, - TestData testData) throws IOException { + /** + * Used for testing only + */ + private static class InputMessageStore extends + ByteArrayMessagesPerVertexStore<IntWritable, IntWritable> { + + /** + * Constructor + * + * @param service Service worker + * @param config Hadoop configuration + */ + InputMessageStore( + CentralizedServiceWorker<IntWritable, ?, ?, IntWritable> service, + ImmutableClassesGiraphConfiguration<IntWritable, ?, ?, + IntWritable> config, + Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException { + super(service, config); + // Adds all the messages to the store + for (Map.Entry<IntWritable, Collection<IntWritable>> entry : + inputMap.entrySet()) { + int partitionId = getPartitionId(entry.getKey()); + ByteArrayVertexIdMessages<IntWritable, IntWritable> + byteArrayVertexIdMessages = + new ByteArrayVertexIdMessages<IntWritable, IntWritable>(); + byteArrayVertexIdMessages.setConf(config); + byteArrayVertexIdMessages.initialize(); + for (IntWritable message : entry.getValue()) { + byteArrayVertexIdMessages.add(entry.getKey(), message); + } + try { + addPartitionMessages(partitionId, byteArrayVertexIdMessages); + } catch (IOException e) { + throw new IllegalStateException("Got IOException", e); + } + } + } + } + + private void putNTimes( + MessageStore<IntWritable, IntWritable> messageStore, + Map<IntWritable, Collection<IntWritable>> messages, + TestData testData) throws IOException { for (int n = 0; n < testData.numTimes; n++) { SortedMap<IntWritable, Collection<IntWritable>> batch = createRandomMessages(testData); - messageStore.addMessages(batch); + messageStore.addMessages(new InputMessageStore(service, config, + batch)); for (Entry<IntWritable, Collection<IntWritable>> entry : batch.entrySet()) { if (messages.containsKey(entry.getKey())) { @@ -161,12 +207,14 @@ public class TestMessageStores { TreeSet<I> vertexIds = Sets.newTreeSet(); Iterables.addAll(vertexIds, messageStore.getDestinationVertices()); for (I vertexId : vertexIds) { - Collection<M> expected = expectedMessages.get(vertexId); + Iterable<M> expected = expectedMessages.get(vertexId); if (expected == null) { return false; } - Collection<M> actual = messageStore.getVertexMessages(vertexId); - if (!CollectionUtils.isEqualCollection(expected, actual)) { + Iterable<M> actual = messageStore.getVertexMessages(vertexId); + if (!CollectionUtils.isEqual(expected, actual)) { + System.err.println("equalMessages: For vertexId " + vertexId + + " expected " + expected + ", but got " + actual); return false; } } @@ -213,10 +261,10 @@ public class TestMessageStores { } @Test - public void testCollectionOfMessagesPeVertexStore() { + public void testByteArrayMessagesPerVertexStore() { try { testMessageStore( - CollectionOfMessagesPerVertexStore.newFactory(service, config), + ByteArrayMessagesPerVertexStore.newFactory(service, config), testData); } catch (IOException e) { e.printStackTrace(); Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1414361&r1=1414360&r2=1414361&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java Tue Nov 27 20:01:38 2012 @@ -22,7 +22,7 @@ import org.apache.giraph.ImmutableClasse import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerClientRequestProcessor; -import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore; +import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.partition.BasicPartitionOwner; @@ -162,7 +162,7 @@ public class MockUtils { Mapper.Context context) { return new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>( conf, - CollectionOfMessagesPerVertexStore.newFactory( + ByteArrayMessagesPerVertexStore.newFactory( MockUtils.mockServiceGetVertexPartitionOwner(1), conf), context); }
