http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java new file mode 100644 index 0000000..092d963 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages.primitives.long_id; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessagesIterable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.utils.VertexIdMessageBytesIterator; +import org.apache.giraph.utils.VertexIdMessageIterator; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.giraph.utils.VerboseByteStructMessageWrite; +import org.apache.giraph.utils.EmptyIterable; +import org.apache.giraph.utils.io.DataInputOutput; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.objects.ObjectIterator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Special message store to be used when ids are LongWritable and no combiner + * is used. + * Uses fastutil primitive maps in order to decrease number of objects and + * get better performance. + * + * @param <M> Message type + */ +public class LongByteArrayMessageStore<M extends Writable> + extends LongAbstractMessageStore<M, DataInputOutput> { + + /** + * Constructor + * + * @param messageValueFactory Factory for creating message values + * @param service Service worker + * @param config Hadoop configuration + */ + public LongByteArrayMessageStore( + MessageValueFactory<M> messageValueFactory, + CentralizedServiceWorker<LongWritable, Writable, Writable> service, + ImmutableClassesGiraphConfiguration<LongWritable, + Writable, Writable> config) { + super(messageValueFactory, service, config); + } + + @Override + public boolean isPointerListEncoding() { + return false; + } + + /** + * Get the DataInputOutput for a vertex id, creating if necessary. + * + * @param partitionMap Partition map to look in + * @param vertexId Id of the vertex + * @return DataInputOutput for this vertex id (created if necessary) + */ + private DataInputOutput getDataInputOutput( + Long2ObjectOpenHashMap<DataInputOutput> partitionMap, long vertexId) { + DataInputOutput dataInputOutput = partitionMap.get(vertexId); + if (dataInputOutput == null) { + dataInputOutput = config.createMessagesInputOutput(); + partitionMap.put(vertexId, dataInputOutput); + } + return dataInputOutput; + } + + @Override + public void addPartitionMessages(int partitionId, + VertexIdMessages<LongWritable, M> messages) throws IOException { + Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId); + synchronized (partitionMap) { + VertexIdMessageBytesIterator<LongWritable, M> + vertexIdMessageBytesIterator = + messages.getVertexIdMessageBytesIterator(); + // Try to copy the message buffer over rather than + // doing a deserialization of a message just to know its size. This + // should be more efficient for complex objects where serialization is + // expensive. If this type of iterator is not available, fall back to + // deserializing/serializing the messages + if (vertexIdMessageBytesIterator != null) { + while (vertexIdMessageBytesIterator.hasNext()) { + vertexIdMessageBytesIterator.next(); + DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, + vertexIdMessageBytesIterator.getCurrentVertexId().get()); + vertexIdMessageBytesIterator.writeCurrentMessageBytes( + dataInputOutput.getDataOutput()); + } + } else { + VertexIdMessageIterator<LongWritable, M> + iterator = messages.getVertexIdMessageIterator(); + while (iterator.hasNext()) { + iterator.next(); + DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, + iterator.getCurrentVertexId().get()); + VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, + dataInputOutput.getDataOutput()); + } + } + } + } + + @Override + public void finalizeStore() { + } + + @Override + public Iterable<M> getVertexMessages( + LongWritable vertexId) throws IOException { + DataInputOutput dataInputOutput = + getPartitionMap(vertexId).get(vertexId.get()); + if (dataInputOutput == null) { + return EmptyIterable.get(); + } else { + return new MessagesIterable<M>(dataInputOutput, messageValueFactory); + } + } + + @Override + public void writePartition(DataOutput out, int partitionId) + throws IOException { + Long2ObjectOpenHashMap<DataInputOutput> partitionMap = + map.get(partitionId); + out.writeInt(partitionMap.size()); + ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator = + partitionMap.long2ObjectEntrySet().fastIterator(); + while (iterator.hasNext()) { + Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next(); + out.writeLong(entry.getLongKey()); + entry.getValue().write(out); + } + } + + @Override + public void readFieldsForPartition(DataInput in, + int partitionId) throws IOException { + int size = in.readInt(); + Long2ObjectOpenHashMap<DataInputOutput> partitionMap = + new Long2ObjectOpenHashMap<DataInputOutput>(size); + while (size-- > 0) { + long vertexId = in.readLong(); + DataInputOutput dataInputOutput = config.createMessagesInputOutput(); + dataInputOutput.readFields(in); + partitionMap.put(vertexId, dataInputOutput); + } + synchronized (map) { + map.put(partitionId, partitionMap); + } + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java new file mode 100644 index 0000000..32296ad --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages.primitives.long_id; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.PointerListMessagesIterable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.utils.EmptyIterable; +import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.VertexIdMessageIterator; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut; + +/** + * This stores messages in + * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer} + * and stores long pointers that point to serialized messages + * + * @param <M> message type + */ +public class LongPointerListMessageStore<M extends Writable> + extends LongAbstractListMessageStore<M, LongArrayList> + implements MessageStore<LongWritable, M> { + + /** Buffers of byte array outputs used to store messages - thread safe */ + private final ExtendedByteArrayOutputBuffer bytesBuffer; + + /** + * Constructor + * + * @param messageValueFactory Factory for creating message values + * @param service Service worker + * @param config Hadoop configuration + */ + public LongPointerListMessageStore( + MessageValueFactory<M> messageValueFactory, + CentralizedServiceWorker<LongWritable, Writable, Writable> service, + ImmutableClassesGiraphConfiguration<LongWritable, + Writable, Writable> config) { + super(messageValueFactory, service, config); + bytesBuffer = new ExtendedByteArrayOutputBuffer(config); + } + + @Override + public boolean isPointerListEncoding() { + return true; + } + + @Override + protected LongArrayList createList() { + return new LongArrayList(); + } + + @Override + public void addPartitionMessages(int partitionId, + VertexIdMessages<LongWritable, M> messages) throws IOException { + VertexIdMessageIterator<LongWritable, M> iterator = + messages.getVertexIdMessageIterator(); + long pointer = 0; + LongArrayList list; + while (iterator.hasNext()) { + iterator.next(); + M msg = iterator.getCurrentMessage(); + list = getList(iterator); + if (iterator.isNewMessage()) { + IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); + pointer = indexAndDataOut.getIndex(); + pointer <<= 32; + ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput(); + pointer += dataOutput.getPos(); + msg.write(dataOutput); + } + synchronized (list) { // TODO - any better way? + list.add(pointer); + } + } + } + + @Override + public Iterable<M> getVertexMessages( + LongWritable vertexId) throws IOException { + LongArrayList list = getPartitionMap(vertexId).get( + vertexId.get()); + if (list == null) { + return EmptyIterable.get(); + } else { + return new PointerListMessagesIterable<>(messageValueFactory, + list, bytesBuffer); + } + } + + // FIXME -- complete these for check-pointing + @Override + public void writePartition(DataOutput out, int partitionId) + throws IOException { + } + + @Override + public void readFieldsForPartition(DataInput in, int partitionId) + throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java new file mode 100644 index 0000000..121d1db --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Message store based off of primitives when I = LongWritable + */ +package org.apache.giraph.comm.messages.primitives.long_id; http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java index ef3f824..f762f46 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java @@ -22,9 +22,9 @@ import com.yammer.metrics.core.Gauge; import com.yammer.metrics.util.PercentGauge; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.SendOneMessageToManyCache; import org.apache.giraph.comm.SendEdgeCache; import org.apache.giraph.comm.SendMessageCache; -import org.apache.giraph.comm.SendMessageToAllCache; import org.apache.giraph.comm.SendMutationsCache; import org.apache.giraph.comm.SendPartitionCache; import org.apache.giraph.comm.ServerData; @@ -134,9 +134,9 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf); maxVerticesSizePerWorker = GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf); - if (this.configuration.isOneToAllMsgSendingEnabled()) { + if (this.configuration.useOneMessageToManyIdsEncoding()) { sendMessageCache = - new SendMessageToAllCache<I, Writable>(conf, serviceWorker, + new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker, this, maxMessagesSizePerWorker); } else { sendMessageCache = @@ -395,7 +395,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, @Override public void flush() throws IOException { // Execute the remaining sends messages (if any) - // including one-to-one and one-to-all messages. + // including individual and compact messages. sendMessageCache.flush(); // Execute the remaining sends vertices (if any) http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java index 22ecc0e..81c892d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java @@ -122,7 +122,7 @@ public class NettyWorkerServer<I extends WritableComparable, @Override public void prepareSuperstep() { - serverData.prepareSuperstep(); + serverData.prepareSuperstep(); // updates the current message-store resolveMutations(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java index 408295c..c7561ee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java @@ -38,9 +38,9 @@ public enum RequestType { SEND_WORKER_VERTICES_REQUEST(SendWorkerVerticesRequest.class), /** Sending a partition of messages for next superstep */ SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class), - /** Sending one-to-all messages to a worker for next superstep */ - SEND_WORKER_ONETOALL_MESSAGES_REQUEST( - SendWorkerOneToAllMessagesRequest.class), + /** Sending one message to many ids in a single request */ + SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST( + SendWorkerOneMessageToManyRequest.class), /** * Sending a partition of messages for current superstep * (used during partition exchange) http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java new file mode 100644 index 0000000..798ddfa --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.ServerData; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.utils.ByteArrayOneMessageToManyIds; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; +import org.apache.giraph.utils.VertexIdMessageIterator; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Send a collection of ByteArrayOneMessageToManyIds messages to a worker. + * + * @param <I> Vertex id + * @param <M> Message data + */ +@SuppressWarnings("unchecked") +public class SendWorkerOneMessageToManyRequest<I extends WritableComparable, + M extends Writable> extends WritableRequest<I, Writable, Writable> + implements WorkerRequest<I, Writable, Writable> { + /** ByteArrayOneMessageToManyIds encoding of vertexId & messages */ + protected ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds; + + /** + * Constructor used for reflection only. + */ + public SendWorkerOneMessageToManyRequest() { } + + /** + * Constructor used to send request. + * + * @param oneMessageToManyIds ByteArrayOneMessageToManyIds + * @param conf ImmutableClassesGiraphConfiguration + */ + public SendWorkerOneMessageToManyRequest( + ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds, + ImmutableClassesGiraphConfiguration conf) { + this.oneMessageToManyIds = oneMessageToManyIds; + setConf(conf); + } + + @Override + public RequestType getType() { + return RequestType.SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST; + } + + @Override + public void readFieldsRequest(DataInput input) throws IOException { + oneMessageToManyIds = new ByteArrayOneMessageToManyIds<I, M>( + getConf().<M>getOutgoingMessageValueFactory()); + oneMessageToManyIds.setConf(getConf()); + oneMessageToManyIds.readFields(input); + } + + @Override + public void writeRequest(DataOutput output) throws IOException { + this.oneMessageToManyIds.write(output); + } + + @Override + public int getSerializedSize() { + return super.getSerializedSize() + + this.oneMessageToManyIds.getSerializedSize(); + } + + @Override + public void doRequest(ServerData serverData) { + try { + MessageStore<I, M> messageStore = serverData.getIncomingMessageStore(); + if (messageStore.isPointerListEncoding()) { + // if message store is pointer list based then send data as is + messageStore.addPartitionMessages(-1, oneMessageToManyIds); + } else { // else split the data per partition and send individually + CentralizedServiceWorker<I, ?, ?> serviceWorker = + serverData.getServiceWorker(); + // Get the initial size of ByteArrayVertexIdMessages per partition + // on this worker. To make sure every ByteArrayVertexIdMessages to have + // enough space to store the messages, we divide the original + // ByteArrayOneMessageToManyIds message size by the number of partitions + // and double the size + // (Assume the major component in ByteArrayOneMessageToManyIds message + // is a id list. Now each target id has a copy of message, + // therefore we double the buffer size) + // to get the initial size of ByteArrayVertexIdMessages. + int initialSize = oneMessageToManyIds.getSize() / + serverData.getPartitionStore().getNumPartitions() * 2; + // Create ByteArrayVertexIdMessages for + // message reformatting. + Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs = + new Int2ObjectOpenHashMap<>(); + + // Put data from ByteArrayOneMessageToManyIds + // to ByteArrayVertexIdMessages + VertexIdMessageIterator<I, M> vertexIdMessageIterator = + oneMessageToManyIds.getVertexIdMessageIterator(); + while (vertexIdMessageIterator.hasNext()) { + vertexIdMessageIterator.next(); + M msg = vertexIdMessageIterator.getCurrentMessage(); + I vertexId = vertexIdMessageIterator.getCurrentVertexId(); + PartitionOwner owner = + serviceWorker.getVertexPartitionOwner(vertexId); + int partitionId = owner.getPartitionId(); + ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs + .get(partitionId); + if (idMsgs == null) { + idMsgs = new ByteArrayVertexIdMessages<>( + getConf().<M>getOutgoingMessageValueFactory()); + idMsgs.setConf(getConf()); + idMsgs.initialize(initialSize); + partitionIdMsgs.put(partitionId, idMsgs); + } + idMsgs.add(vertexId, msg); + } + + // Read ByteArrayVertexIdMessages and write to message store + for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs : + partitionIdMsgs.entrySet()) { + if (!idMsgs.getValue().isEmpty()) { + serverData.getIncomingMessageStore().addPartitionMessages( + idMsgs.getKey(), idMsgs.getValue()); + } + } + } + } catch (IOException e) { + throw new IllegalStateException("doRequest: Got IOException ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java deleted file mode 100644 index 5f1ed53..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.requests; - -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Map.Entry; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.ServerData; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.utils.ByteArrayOneToAllMessages; -import org.apache.giraph.utils.ByteArrayVertexIdMessages; -import org.apache.giraph.utils.ExtendedDataInput; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Send a collection of one-to-all messages to a worker. - * - * @param <I> Vertex id - * @param <M> Message data - */ -@SuppressWarnings("unchecked") -public class SendWorkerOneToAllMessagesRequest<I extends WritableComparable, - M extends Writable> extends WritableRequest<I, Writable, Writable> - implements WorkerRequest<I, Writable, Writable> { - /** The byte array of one-to-all messages */ - private ByteArrayOneToAllMessages<I, M> oneToAllMsgs; - - /** - * Constructor used for reflection only. - */ - public SendWorkerOneToAllMessagesRequest() { } - - /** - * Constructor used to send request. - * - * @param oneToAllMsgs A byte array of all one-to-all messages - * @param conf ImmutableClassesGiraphConfiguration - */ - public SendWorkerOneToAllMessagesRequest( - ByteArrayOneToAllMessages<I, M> oneToAllMsgs, - ImmutableClassesGiraphConfiguration conf) { - this.oneToAllMsgs = oneToAllMsgs; - setConf(conf); - } - - @Override - public RequestType getType() { - return RequestType.SEND_WORKER_ONETOALL_MESSAGES_REQUEST; - } - - @Override - public void readFieldsRequest(DataInput input) throws IOException { - oneToAllMsgs = new ByteArrayOneToAllMessages<I, M>( - getConf().<M>getOutgoingMessageValueFactory()); - oneToAllMsgs.setConf(getConf()); - oneToAllMsgs.readFields(input); - } - - @Override - public void writeRequest(DataOutput output) throws IOException { - this.oneToAllMsgs.write(output); - } - - @Override - public int getSerializedSize() { - return super.getSerializedSize() + this.oneToAllMsgs.getSerializedSize(); - } - - @Override - public void doRequest(ServerData serverData) { - CentralizedServiceWorker<I, ?, ?> serviceWorker = - serverData.getServiceWorker(); - // Get the initial size of ByteArrayVertexIdMessages per partition - // on this worker. To make sure every ByteArrayVertexIdMessages to have - // enough space to store the messages, we divide the original one-to-all - // message size by the number of partitions and double the size - // (Assume the major component in one-to-all message is a id list. - // Now each target id has a copy of message, - // therefore we double the buffer size) - // to get the initial size of ByteArrayVertexIdMessages. - int initialSize = oneToAllMsgs.getSize() / - serverData.getPartitionStore().getNumPartitions() * 2; - // Create ByteArrayVertexIdMessages for - // message reformatting. - Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> - partitionIdMsgs = - new Int2ObjectOpenHashMap<ByteArrayVertexIdMessages>(); - - // Put data from ByteArrayOneToAllMessages to ByteArrayVertexIdMessages - ExtendedDataInput reader = oneToAllMsgs.getOneToAllMessagesReader(); - I vertexId = getConf().createVertexId(); - M msg = oneToAllMsgs.createMessage(); - int idCount = 0; - int partitionId = 0; - try { - while (!reader.endOfInput()) { - msg.readFields(reader); - idCount = reader.readInt(); - for (int i = 0; i < idCount; i++) { - vertexId.readFields(reader); - PartitionOwner owner = - serviceWorker.getVertexPartitionOwner(vertexId); - partitionId = owner.getPartitionId(); - ByteArrayVertexIdMessages<I, M> idMsgs = - partitionIdMsgs.get(partitionId); - if (idMsgs == null) { - idMsgs = new ByteArrayVertexIdMessages<I, M>( - getConf().<M>getOutgoingMessageValueFactory()); - idMsgs.setConf(getConf()); - idMsgs.initialize(initialSize); - partitionIdMsgs.put(partitionId, idMsgs); - } - idMsgs.add(vertexId, msg); - } - } - } catch (IOException e) { - throw new RuntimeException("doRequest: Got IOException ", e); - } - // Read ByteArrayVertexIdMessages and write to message store - try { - for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs : - partitionIdMsgs.entrySet()) { - if (!idMsgs.getValue().isEmpty()) { - serverData.getIncomingMessageStore().addPartitionMessages( - idMsgs.getKey(), idMsgs.getValue()); - } - } - } catch (IOException e) { - throw new RuntimeException("doRequest: Got IOException.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index ee88b04..953f49f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -1176,23 +1176,18 @@ public class GiraphConfiguration extends Configuration } /** - * Enable communication optimization for one-to-all messages. - */ - public void enableOneToAllMsgSending() { - ONE_TO_ALL_MSG_SENDING.set(this, true); - } - - /** - * Return if one-to-all messsage sending is enabled. + * Return if oneMessageToManyIds encoding can be enabled * - * @return True if this option is enabled. + * @return True if this option is true. */ - public boolean isOneToAllMsgSendingEnabled() { - return ONE_TO_ALL_MSG_SENDING.isTrue(this); + public boolean useOneMessageToManyIdsEncoding() { + return MESSAGE_ENCODE_AND_STORE_TYPE.get(this) + .useOneMessageToManyIdsEncoding(); } /** * Get option whether to create a source vertex present only in edge input + * * @return CREATE_EDGE_SOURCE_VERTICES option */ public boolean getCreateSourceVertex() { http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 1879a25..ab0570f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -21,6 +21,7 @@ import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.aggregators.TextAggregatorWriter; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.edge.ByteArrayEdges; import org.apache.giraph.edge.EdgeStoreFactory; @@ -556,12 +557,6 @@ public interface GiraphConstants { new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB, "How big to make the encoder buffer?"); - /** Whether or not netty request encoder should use direct byte buffers */ - BooleanConfOption NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS = - new BooleanConfOption("giraph.nettyRequestEncoderUseDirectBuffers", - false, "Whether or not netty request encoder " + - "should use direct byte buffers"); - /** Netty client threads */ IntConfOption NETTY_CLIENT_THREADS = new IntConfOption("giraph.nettyClientThreads", 4, "Netty client threads"); @@ -1054,13 +1049,14 @@ public interface GiraphConstants { "edges every time."); /** - * This option will enable communication optimization for one-to-all - * message sending. For multiple target ids on the same machine, - * we only send one message to all the targets. + * This option will tell which message encode & store enum to use when + * combining is not enabled */ - BooleanConfOption ONE_TO_ALL_MSG_SENDING = - new BooleanConfOption("giraph.oneToAllMsgSending", false, "Enable " + - "one-to-all message sending strategy"); + EnumConfOption<MessageEncodeAndStoreType> MESSAGE_ENCODE_AND_STORE_TYPE = + EnumConfOption.create("giraph.messageEncodeAndStoreType", + MessageEncodeAndStoreType.class, + MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION, + "Select the message_encode_and_store_type to use"); /** * This option can be used to specify if a source vertex present in edge http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java new file mode 100644 index 0000000..674b0b0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; + +/** + * Stores a message and a list of target vertex ids. + * + * @param <I> Vertex id + * @param <M> Message data + */ +@SuppressWarnings("unchecked") +public class ByteArrayOneMessageToManyIds<I extends WritableComparable, + M extends Writable> extends ByteArrayVertexIdData<I, M> + implements VertexIdMessages<I, M> { + /** Message value class */ + private MessageValueFactory<M> messageValueFactory; + + /** + * Constructor. + * + * @param messageValueFactory Class for messages + */ + public ByteArrayOneMessageToManyIds( + MessageValueFactory<M> messageValueFactory) { + this.messageValueFactory = messageValueFactory; + } + + @Override + public M createData() { + return messageValueFactory.newInstance(); + } + + @Override + public void writeData(ExtendedDataOutput out, M message) throws IOException { + message.write(out); + } + + @Override + public void readData(ExtendedDataInput in, M message) throws IOException { + message.readFields(in); + } + + /** + * Add a message. + * The order is: the message>id count>ids . + * + * @param ids The byte array which holds target ids + * of this message on the worker + * @param idPos The end position of the ids + * information in the byte array above. + * @param count The number of ids + * @param msg The message sent + */ + public void add(byte[] ids, int idPos, int count, M msg) { + try { + msg.write(extendedDataOutput); + extendedDataOutput.writeInt(count); + extendedDataOutput.write(ids, 0, idPos); + } catch (IOException e) { + throw new IllegalStateException("add: IOException", e); + } + } + + @Override + public void add(I vertexId, M data) { + throw new UnsupportedOperationException(); + } + + @Override + public void add(byte[] serializedId, int idPos, M data) { + throw new UnsupportedOperationException(); + } + + @Override + public VertexIdMessageBytesIterator<I, M> getVertexIdMessageBytesIterator() { + return null; + } + + @Override + public VertexIdMessageIterator<I, M> getVertexIdMessageIterator() { + return new OneMessageToManyIdsIterator<>(this); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java deleted file mode 100644 index f190c17..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.utils; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.factories.MessageValueFactory; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * Stores a message and a list of target vertex ids. - * - * @param <I> Vertex id - * @param <M> Message data - */ -@SuppressWarnings("unchecked") -public class ByteArrayOneToAllMessages< - I extends WritableComparable, M extends Writable> - implements Writable, ImmutableClassesGiraphConfigurable { - /** Extended data output */ - private ExtendedDataOutput extendedDataOutput; - /** Configuration */ - private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration; - /** Message value class */ - private MessageValueFactory<M> messageValueFactory; - - /** - * Constructor. - * - * @param messageValueFactory Class for messages - */ - public ByteArrayOneToAllMessages( - MessageValueFactory<M> messageValueFactory) { - this.messageValueFactory = messageValueFactory; - } - - /** - * Initialize the inner state. Must be called before {@code add()} is called. - */ - public void initialize() { - extendedDataOutput = configuration.createExtendedDataOutput(); - } - - /** - * Initialize the inner state, with a known size. Must be called before - * {@code add()} is called. - * - * @param expectedSize Number of bytes to be expected - */ - public void initialize(int expectedSize) { - extendedDataOutput = configuration.createExtendedDataOutput(expectedSize); - } - - @Override - public void setConf(ImmutableClassesGiraphConfiguration configuration) { - this.configuration = configuration; - } - - @Override - public ImmutableClassesGiraphConfiguration getConf() { - return this.configuration; - } - - /** - * Add a message. - * The order is: the message>id count>ids . - * - * @param ids The byte array which holds target ids - * of this message on the worker - * @param idPos The end position of the ids - * information in the byte array above. - * @param count The number of ids - * @param msg The message sent - */ - public void add(byte[] ids, int idPos, int count, M msg) { - try { - msg.write(extendedDataOutput); - extendedDataOutput.writeInt(count); - extendedDataOutput.write(ids, 0, idPos); - } catch (IOException e) { - throw new IllegalStateException("add: IOException", e); - } - } - - /** - * Create a message. - * - * @return A created message object. - */ - public M createMessage() { - return messageValueFactory.newInstance(); - } - - /** - * Get the number of bytes used. - * - * @return Bytes used - */ - public int getSize() { - return extendedDataOutput.getPos(); - } - - /** - * Get the size of ByteArrayOneToAllMessages after serialization. - * Here 4 is the size of an integer which represents the size of whole - * byte array. - * - * @return The size (in bytes) of the serialized object - */ - public int getSerializedSize() { - return 4 + getSize(); - } - - @Override - public void write(DataOutput dataOutput) throws IOException { - dataOutput.writeInt(extendedDataOutput.getPos()); - dataOutput.write(extendedDataOutput.getByteArray(), 0, - extendedDataOutput.getPos()); - } - - @Override - public void readFields(DataInput dataInput) throws IOException { - int size = dataInput.readInt(); - byte[] buf = new byte[size]; - dataInput.readFully(buf); - extendedDataOutput = configuration.createExtendedDataOutput(buf, size); - } - - /** - * Check if the byte array is empty. - * - * @return True if the position of the byte array is 0. - */ - public boolean isEmpty() { - return extendedDataOutput.getPos() == 0; - } - - /** - * Get the reader of this OneToAllMessages - * - * @return ExtendedDataInput - */ - public ExtendedDataInput getOneToAllMessagesReader() { - return configuration.createExtendedDataInput( - extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java index cefec0e..962bc75 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java @@ -39,6 +39,8 @@ public class ByteStructVertexIdDataIterator<I extends WritableComparable, T> extends ByteStructVertexIdIterator<I> implements VertexIdDataIterator<I, T> { /** VertexIdData to iterate over */ protected AbstractVertexIdData<I, T> vertexIdData; + /** Serialized size of the message object in bytestore */ + protected int dataSize; /** Current data. */ private T data; @@ -63,13 +65,20 @@ public class ByteStructVertexIdDataIterator<I extends WritableComparable, T> } try { vertexId.readFields(extendedDataInput); + int initial = extendedDataInput.getPos(); vertexIdData.readData(extendedDataInput, data); + dataSize = extendedDataInput.getPos() - initial; } catch (IOException e) { throw new IllegalStateException("next: IOException", e); } } @Override + public int getCurrentDataSize() { + return dataSize; + } + + @Override public T getCurrentData() { return data; } http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java index b686211..dd91ea2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java @@ -46,4 +46,14 @@ public class ByteStructVertexIdMessageIterator<I extends WritableComparable, public M getCurrentMessage() { return getCurrentData(); } + + @Override + public int getCurrentMessageSize() { + return getCurrentDataSize(); + } + + @Override + public boolean isNewMessage() { + return true; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java new file mode 100644 index 0000000..80c3aee --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Wraps a list of byte array outputs and provides convenient + * utilities on top of it + */ +public class ExtendedByteArrayOutputBuffer { + /** + * This option sets the capacity of an + * {@link org.apache.giraph.utils.ExtendedDataOutput} instance created in + * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer} + */ + public static final IntConfOption CAPACITY_OF_DATAOUT_IN_BUFFER = + new IntConfOption("giraph.capacityOfDataOutInBuffer", + 1024 * GiraphConstants.ONE_KB, + "Set the capacity of dataoutputs in dataout buffer"); + + /** + * This option sets the maximum fraction of a + * {@link org.apache.giraph.utils.ExtendedDataOutput} instance (stored in + * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}) + * that can be filled + */ + public static final FloatConfOption FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER = + new FloatConfOption("giraph.fillingThresholdOfDataoutInBuffer", 0.98f, + "Set the maximum fraction of dataoutput capacity allowed to fill"); + + /** Maximum size allowed for one byte array output */ + private final int maxBufSize; + /** Stop writing to buffer after threshold has been reached */ + private final int threshold; + /** Giraph configuration */ + private final ImmutableClassesGiraphConfiguration<?, ? , ?> config; + + /** Map of index => byte array outputs */ + private final Int2ObjectOpenHashMap<ExtendedDataOutput> + bytearrayOutputs = new Int2ObjectOpenHashMap<>(); + /** Size of byte array outputs map */ + private final AtomicInteger mapSize = new AtomicInteger(0); + /** Thread local variable to get hold of a byte array output stream */ + private final ThreadLocal<IndexAndDataOut> threadLocal = + new ThreadLocal<IndexAndDataOut>() { + @Override + protected IndexAndDataOut initialValue() { + return newIndexAndDataOutput(); + } + }; + + /** + * Constructor + * + * @param config configuration + */ + public ExtendedByteArrayOutputBuffer( + ImmutableClassesGiraphConfiguration<?, ?, ?> config) { + this.config = config; + + maxBufSize = CAPACITY_OF_DATAOUT_IN_BUFFER.get(config); + threshold = (int) (FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER.get(config) * + maxBufSize); + } + + /** + * Return threadLocal indexAndDataOutput instance + * + * @return threadLocal indexAndDataOutput instance + */ + public IndexAndDataOut getIndexAndDataOut() { + IndexAndDataOut indexAndDataOut = threadLocal.get(); + if (indexAndDataOut.dataOutput.getPos() >= threshold) { + indexAndDataOut = newIndexAndDataOutput(); + threadLocal.set(indexAndDataOut); + } + return indexAndDataOut; + } + + /** + * Get dataoutput from bytearrayOutputs + * + * @param index index in bytearrayOutputs + * @return extendeddataoutput at given index + */ + public ExtendedDataOutput getDataOutput(int index) { + return bytearrayOutputs.get(index); + } + + /** + * Holder for index & DataOutput objects + */ + public static class IndexAndDataOut { + /** Index */ + private final int index; + /** Dataouput instance */ + private final ExtendedDataOutput dataOutput; + + /** + * Constructor + * + * @param index index in bytearrayOutputs + * @param dataOutput dataoutput + */ + public IndexAndDataOut(int index, ExtendedDataOutput dataOutput) { + this.index = index; + this.dataOutput = dataOutput; + } + + public int getIndex() { + return index; + } + + public ExtendedDataOutput getDataOutput() { + return dataOutput; + } + } + + /** + * Create a new IndexAndDataOutput instance + * @return new IndexAndDataOutput instance + */ + private IndexAndDataOut newIndexAndDataOutput() { + int index = mapSize.getAndIncrement(); + ExtendedDataOutput output = config.createExtendedDataOutput( + maxBufSize); + synchronized (bytearrayOutputs) { + bytearrayOutputs.put(index, output); + } + return new IndexAndDataOut(index, output); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java index bc979af..0da9681 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java @@ -82,4 +82,3 @@ public interface ExtendedDataOutput extends DataOutput { */ void reset(); } - http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java new file mode 100644 index 0000000..f353b2d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; + +/** + * VertexIdData iterator for + * {@link ByteArrayOneMessageToManyIds} + * + * @param <I> vertexId type + * @param <M> message type + */ +public class OneMessageToManyIdsIterator<I extends WritableComparable, + M extends Writable> implements VertexIdMessageIterator<I, M> { + /** VertexIdMessages object to iterate over */ + private final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages; + /** Reader of the serialized edges */ + private final ExtendedDataInput extendedDataInput; + + /** Current vertex Id*/ + private I vertexId; + /** Current message */ + private M msg; + /** Counts of ids left to read before next message */ + private int idsToRead = 0; + /** Size of message read */ + private int msgSize = 0; + /** Is current message newly read */ + private boolean newMessage; + + /** + * Constructor + * + * @param vertexIdMessages vertexId messages object to iterate over + */ + public OneMessageToManyIdsIterator( + final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages) { + this.vertexIdMessages = vertexIdMessages; + this.extendedDataInput = vertexIdMessages.getConf() + .createExtendedDataInput(vertexIdMessages.extendedDataOutput); + } + + @Override + public I getCurrentVertexId() { + return vertexId; + } + + @Override + public M getCurrentMessage() { + return getCurrentData(); + } + + @Override + public M getCurrentData() { + return msg; + } + + @Override + public M releaseCurrentData() { + M releasedData = msg; + msg = null; + return releasedData; + } + + @Override + public I releaseCurrentVertexId() { + I releasedVertexId = vertexId; + vertexId = null; + return releasedVertexId; + } + + @Override + public boolean hasNext() { + return extendedDataInput.available() > 0; + } + + /** + * Properly initialize vertexId & msg object before calling next() + */ + private void initialize() { + if (vertexId == null) { + vertexId = vertexIdMessages.getConf().createVertexId(); + } + if (msg == null) { + msg = vertexIdMessages.createData(); + } + } + + @Override + public void next() { + initialize(); + try { + if (idsToRead == 0) { + newMessage = true; // a new message is read + int initial = extendedDataInput.getPos(); + msg.readFields(extendedDataInput); + msgSize = extendedDataInput.getPos() - initial; + idsToRead = extendedDataInput.readInt(); + } else { + newMessage = false; // same as previous message + } + vertexId.readFields(extendedDataInput); + idsToRead -= 1; + } catch (IOException e) { + throw new IllegalStateException("next: IOException", e); + } + } + + @Override + public int getCurrentMessageSize() { + return getCurrentDataSize(); + } + + @Override + public int getCurrentDataSize() { + return msgSize; + } + + @Override + public boolean isNewMessage() { + return newMessage; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java index 1ab8de6..c5587e1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java @@ -55,7 +55,7 @@ public class UnsafeArrayReads extends UnsafeReads { UNSAFE.arrayBaseOffset(byte[].class); /** Byte buffer */ - private final byte[] buf; + protected byte[] buf; /** * Constructor http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java index 5f99846..39ab352 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java @@ -28,7 +28,7 @@ import java.io.UTFDataFormatException; public abstract class UnsafeReads implements ExtendedDataInput { /** Buffer length */ - protected final int bufLength; + protected int bufLength; /** Position in the buffer */ protected long pos = 0; http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java new file mode 100644 index 0000000..a75815a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +/** + * UnsafeReusableByteArrayInput is a data structure to read from a + * byte buffer with a read pointer that can be moved to desired location + */ +public class UnsafeReusableByteArrayInput extends UnsafeArrayReads { + + /** + * Default Constructor + */ + public UnsafeReusableByteArrayInput() { + super(null, 0, 0); + } + + /** + * Initialize the object with all required parameters + * + * @param buf byte buffer + * @param offset offset in the buffer + * @param length length of the valid data + */ + public void initialize(byte[] buf, int offset, int length) { + this.buf = buf; + this.pos = offset; + this.bufLength = length; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java index 6aea8ea..80792a5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java @@ -41,6 +41,13 @@ public interface VertexIdDataIterator<I extends WritableComparable, T> T getCurrentData(); /** + * Get serialized size of current data + * + * @return serialized size of data + */ + int getCurrentDataSize(); + + /** * Release the current data object. * * @return Released data object http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java index c241cea..288f7ce 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java @@ -36,4 +36,18 @@ public interface VertexIdMessageIterator<I extends WritableComparable, * @return Current message */ M getCurrentMessage(); + + /** + * Get the serialized size of current message + * + * @return serialized size of current message + */ + int getCurrentMessageSize(); + + /** + * Return true of current message is new + * + * @return true if current message is new + */ + boolean isNewMessage(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index 8037db9..b56bab3 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -24,7 +24,7 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler; import org.apache.giraph.comm.requests.SendPartitionMutationsRequest; import org.apache.giraph.comm.requests.SendVertexRequest; import org.apache.giraph.comm.requests.SendWorkerMessagesRequest; -import org.apache.giraph.comm.requests.SendWorkerOneToAllMessagesRequest; +import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -36,8 +36,8 @@ import org.apache.giraph.graph.VertexMutations; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStore; +import org.apache.giraph.utils.ByteArrayOneMessageToManyIds; import org.apache.giraph.utils.VertexIdMessages; -import org.apache.giraph.utils.ByteArrayOneToAllMessages; import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.IntNoOpComputation; @@ -196,10 +196,10 @@ public class RequestTest { } @Test - public void sendWorkerOneToAllMessagesRequest() throws IOException { + public void sendWorkerIndividualMessagesRequest() throws IOException { // Data to send - ByteArrayOneToAllMessages<IntWritable, IntWritable> - dataToSend = new ByteArrayOneToAllMessages<>(new + ByteArrayOneMessageToManyIds<IntWritable, IntWritable> + dataToSend = new ByteArrayOneMessageToManyIds<>(new TestMessageValueFactory<>(IntWritable.class)); dataToSend.setConf(conf); dataToSend.initialize(); @@ -211,8 +211,8 @@ public class RequestTest { dataToSend.add(output.getByteArray(), output.getPos(), 7, new IntWritable(1)); // Send the request - SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable> request = - new SendWorkerOneToAllMessagesRequest<>(dataToSend, conf); + SendWorkerOneMessageToManyRequest<IntWritable, IntWritable> request = + new SendWorkerOneMessageToManyRequest<>(dataToSend, conf); client.sendWritableRequest(workerInfo.getTaskId(), request); client.waitAllRequests(); http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java index d3c392e..5903eb8 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java @@ -25,7 +25,7 @@ import junit.framework.Assert; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.DoubleSumMessageCombiner; -import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore; +import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
