Updated Branches: refs/heads/trunk d31949a11 -> 294b73583
Rest of GIRAPH-701. Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/294b7358 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/294b7358 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/294b7358 Branch: refs/heads/trunk Commit: 294b73583fc0dcd548ae95a841829f9ad2ab3ed0 Parents: d31949a Author: Avery Ching <[email protected]> Authored: Sat Jul 20 14:04:43 2013 -0700 Committer: Avery Ching <[email protected]> Committed: Sat Jul 20 14:04:43 2013 -0700 ---------------------------------------------------------------------- .../giraph/comm/SendMessageToAllCache.java | 308 +++++++++++++++++++ .../SendWorkerOneToAllMessagesRequest.java | 155 ++++++++++ .../giraph/utils/ByteArrayOneToAllMessages.java | 168 ++++++++++ 3 files changed, 631 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/294b7358/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java new file mode 100644 index 0000000..54234c5 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; +import org.apache.giraph.comm.requests.SendWorkerMessagesRequest; +import org.apache.giraph.comm.requests.SendWorkerOneToAllMessagesRequest; +import org.apache.giraph.comm.requests.WritableRequest; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.utils.ByteArrayOneToAllMessages; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.PairList; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * Aggregates the messages to be sent to workers so they can be sent + * in bulk. Not thread-safe. + * + * @param <I> Vertex id + * @param <M> Message data + */ +public class SendMessageToAllCache<I extends WritableComparable, + M extends Writable> extends SendMessageCache<I, M> { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(SendMessageToAllCache.class); + /** Cache serialized messages for each worker */ + private final ByteArrayOneToAllMessages<I, M>[] oneToAllMsgCache; + /** Tracking one-to-all message sizes for each worker */ + private final int[] oneToAllMsgSizes; + /** Reused byte array to serialize target ids on each worker */ + private final ExtendedDataOutput[] idSerializer; + /** Reused int array to count target id distribution */ + private final int[] idCounter; + /** + * Reused int array to record the partition id + * of the first target vertex id found on the worker. + */ + private final int[] firstPartitionMap; + /** The WorkerInfo list */ + private final WorkerInfo[] workerInfoList; + + /** + * Constructor + * + * @param conf Giraph configuration + * @param serviceWorker Service worker + * @param processor NettyWorkerClientRequestProcessor + * @param maxMsgSize Max message size sent to a worker + */ + public SendMessageToAllCache(ImmutableClassesGiraphConfiguration conf, + CentralizedServiceWorker<?, ?, ?> serviceWorker, + NettyWorkerClientRequestProcessor<I, ?, ?> processor, + int maxMsgSize) { + super(conf, serviceWorker, processor, maxMsgSize); + int numWorkers = getNumWorkers(); + oneToAllMsgCache = new ByteArrayOneToAllMessages[numWorkers]; + oneToAllMsgSizes = new int[numWorkers]; + idSerializer = new ExtendedDataOutput[numWorkers]; + // InitialBufferSizes is alo initialized based on the number of workers. + // As a result, initialBufferSizes is the same as idSerializer in length + int initialBufferSize = 0; + for (int i = 0; i < this.idSerializer.length; i++) { + initialBufferSize = getSendWorkerInitialBufferSize(i); + if (initialBufferSize > 0) { + // InitialBufferSizes is from super class. + // Each element is for one worker. + idSerializer[i] = conf.createExtendedDataOutput(initialBufferSize); + } + } + idCounter = new int[numWorkers]; + firstPartitionMap = new int[numWorkers]; + // Get worker info list. + workerInfoList = new WorkerInfo[numWorkers]; + // Remember there could be null in the array. + for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) { + workerInfoList[workerInfo.getTaskId()] = workerInfo; + } + } + + /** + * Reset ExtendedDataOutput array for id serialization + * in next "one-to-all" message sending. + */ + private void resetIdSerializers() { + for (int i = 0; i < this.idSerializer.length; i++) { + if (idSerializer[i] != null) { + idSerializer[i].reset(); + } + } + } + + /** + * Reset id counter for next "one-to-all" message sending. + */ + private void resetIdCounter() { + Arrays.fill(idCounter, 0); + } + + /** + * Add message with multiple ids to + * one-to-all message cache. + * + * @param workerInfo The remote worker destination + * @param ids A byte array to hold serialized vertex ids + * @param idPos The end position of ids + * information in the byte array above + * @param count The number of target ids + * @param message Message to send to remote worker + * @return The size of messages for the worker. + */ + private int addOneToAllMessage( + WorkerInfo workerInfo, byte[] ids, int idPos, int count, M message) { + // Get the data collection + ByteArrayOneToAllMessages<I, M> workerData = + oneToAllMsgCache[workerInfo.getTaskId()]; + if (workerData == null) { + workerData = new ByteArrayOneToAllMessages<I, M>( + getConf().getOutgoingMessageValueFactory()); + workerData.setConf(getConf()); + workerData.initialize(getSendWorkerInitialBufferSize( + workerInfo.getTaskId())); + oneToAllMsgCache[workerInfo.getTaskId()] = workerData; + } + workerData.add(ids, idPos, count, message); + // Update the size of cached, outgoing data per worker + oneToAllMsgSizes[workerInfo.getTaskId()] = + workerData.getSize(); + return oneToAllMsgSizes[workerInfo.getTaskId()]; + } + + /** + * Gets the one-to-all + * messages for a worker and removes it from the cache. + * Here the ByteArrayOneToAllMessages returned could be null. + * But when invoking this method, we also check if the data size sent + * to this worker is above the threshold. Therefore, it doesn't matter + * if the result is null or not. + * + * @param workerInfo The target worker where one-to-all messages + * go to. + * @return ByteArrayOneToAllMessages that belong to the workerInfo + */ + private ByteArrayOneToAllMessages<I, M> + removeWorkerOneToAllMessages(WorkerInfo workerInfo) { + ByteArrayOneToAllMessages<I, M> workerData = + oneToAllMsgCache[workerInfo.getTaskId()]; + if (workerData != null) { + oneToAllMsgCache[workerInfo.getTaskId()] = null; + oneToAllMsgSizes[workerInfo.getTaskId()] = 0; + } + return workerData; + } + + /** + * Gets all the one-to-all + * messages and removes them from the cache. + * + * @return All vertex messages for all workers + */ + private PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>> + removeAllOneToAllMessages() { + PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>> allData = + new PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>>(); + allData.initialize(oneToAllMsgCache.length); + for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) { + ByteArrayOneToAllMessages<I, M> workerData = + removeWorkerOneToAllMessages(workerInfo); + if (workerData != null && !workerData.isEmpty()) { + allData.add(workerInfo, workerData); + } + } + return allData; + } + + @Override + public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) { + // This is going to be reused through every message sending + resetIdSerializers(); + resetIdCounter(); + // Count messages + int currentMachineId = 0; + PartitionOwner owner = null; + WorkerInfo workerInfo = null; + I vertexId = null; + while (vertexIdIterator.hasNext()) { + vertexId = vertexIdIterator.next(); + owner = getServiceWorker().getVertexPartitionOwner(vertexId); + workerInfo = owner.getWorkerInfo(); + currentMachineId = workerInfo.getTaskId(); + // Serialize this target vertex id + try { + vertexId.write(idSerializer[currentMachineId]); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to serialize the target vertex id."); + } + idCounter[currentMachineId]++; + // Record the first partition id in the worker which message send to. + // If idCounter shows there is only one target on this worker + // then this is the partition number of the target vertex. + if (idCounter[currentMachineId] == 1) { + firstPartitionMap[currentMachineId] = owner.getPartitionId(); + } + } + // Add the message to the cache + int idSerializerPos = 0; + int workerMessageSize = 0; + byte[] serializedId = null; + WritableRequest writableRequest = null; + for (int i = 0; i < idCounter.length; i++) { + if (idCounter[i] == 1) { + serializedId = idSerializer[i].getByteArray(); + idSerializerPos = idSerializer[i].getPos(); + // Add the message to the cache + workerMessageSize = addMessage(workerInfoList[i], + firstPartitionMap[i], serializedId, idSerializerPos, message); + + if (LOG.isTraceEnabled()) { + LOG.trace("sendMessageToAllRequest: Send bytes (" + + message.toString() + ") to one target in worker " + + workerInfoList[i]); + } + ++totalMsgsSentInSuperstep; + if (workerMessageSize >= maxMessagesSizePerWorker) { + PairList<Integer, ByteArrayVertexIdMessages<I, M>> + workerMessages = removeWorkerMessages(workerInfoList[i]); + writableRequest = + new SendWorkerMessagesRequest<I, M>(workerMessages); + totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); + clientProcessor.doRequest(workerInfoList[i], writableRequest); + // Notify sending + getServiceWorker().getGraphTaskManager().notifySentMessages(); + } + } else if (idCounter[i] > 1) { + serializedId = idSerializer[i].getByteArray(); + idSerializerPos = idSerializer[i].getPos(); + workerMessageSize = addOneToAllMessage( + workerInfoList[i], serializedId, idSerializerPos, idCounter[i], + message); + + if (LOG.isTraceEnabled()) { + LOG.trace("sendMessageToAllRequest: Send bytes (" + + message.toString() + ") to all targets in worker" + + workerInfoList[i]); + } + totalMsgsSentInSuperstep += idCounter[i]; + if (workerMessageSize >= maxMessagesSizePerWorker) { + ByteArrayOneToAllMessages<I, M> workerOneToAllMessages = + removeWorkerOneToAllMessages(workerInfoList[i]); + writableRequest = + new SendWorkerOneToAllMessagesRequest<I, M>( + workerOneToAllMessages, getConf()); + totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); + clientProcessor.doRequest(workerInfoList[i], writableRequest); + // Notify sending + getServiceWorker().getGraphTaskManager().notifySentMessages(); + } + } + } + } + + @Override + public void flush() { + super.flush(); + PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>> + remainingOneToAllMessageCache = + removeAllOneToAllMessages(); + PairList<WorkerInfo, + ByteArrayOneToAllMessages<I, M>>.Iterator + oneToAllMsgIterator = remainingOneToAllMessageCache.getIterator(); + while (oneToAllMsgIterator.hasNext()) { + oneToAllMsgIterator.next(); + WritableRequest writableRequest = + new SendWorkerOneToAllMessagesRequest<I, M>( + oneToAllMsgIterator.getCurrentSecond(), getConf()); + totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize(); + clientProcessor.doRequest( + oneToAllMsgIterator.getCurrentFirst(), writableRequest); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/294b7358/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java new file mode 100644 index 0000000..8745adb --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.ServerData; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.utils.ByteArrayOneToAllMessages; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; +import org.apache.giraph.utils.ExtendedDataInput; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Send a collection of one-to-all messages to a worker. + * + * @param <I> Vertex id + * @param <M> Message data + */ +@SuppressWarnings("unchecked") +public class SendWorkerOneToAllMessagesRequest<I extends WritableComparable, + M extends Writable> extends WritableRequest<I, Writable, Writable> + implements WorkerRequest<I, Writable, Writable> { + /** The byte array of one-to-all messages */ + private ByteArrayOneToAllMessages<I, M> oneToAllMsgs; + + /** + * Constructor used for reflection only. + */ + public SendWorkerOneToAllMessagesRequest() { } + + /** + * Constructor used to send request. + * + * @param oneToAllMsgs A byte array of all one-to-all messages + * @param conf ImmutableClassesGiraphConfiguration + */ + public SendWorkerOneToAllMessagesRequest( + ByteArrayOneToAllMessages<I, M> oneToAllMsgs, + ImmutableClassesGiraphConfiguration conf) { + this.oneToAllMsgs = oneToAllMsgs; + setConf(conf); + } + + @Override + public RequestType getType() { + return RequestType.SEND_WORKER_ONETOALL_MESSAGES_REQUEST; + } + + @Override + public void readFieldsRequest(DataInput input) throws IOException { + oneToAllMsgs = new ByteArrayOneToAllMessages<I, M>( + getConf().<M>getOutgoingMessageValueFactory()); + oneToAllMsgs.setConf(getConf()); + oneToAllMsgs.readFields(input); + } + + @Override + public void writeRequest(DataOutput output) throws IOException { + this.oneToAllMsgs.write(output); + } + + @Override + public int getSerializedSize() { + return super.getSerializedSize() + this.oneToAllMsgs.getSerializedSize(); + } + + @Override + public void doRequest(ServerData serverData) { + CentralizedServiceWorker<I, ?, ?> serviceWorker = + serverData.getServiceWorker(); + // Get the initial size of ByteArrayVertexIdMessages per partition + // on this worker. To make sure every ByteArrayVertexIdMessages to have + // enough space to store the messages, we divide the original one-to-all + // message size by the number of partitions and double the size + // (Assume the major component in one-to-all message is a id list. + // Now each target id has a copy of message, + // therefore we double the buffer size) + // to get the initial size of ByteArrayVertexIdMessages. + int initialSize = oneToAllMsgs.getSize() / + serverData.getPartitionStore().getNumPartitions() * 2; + // Create ByteArrayVertexIdMessages for + // message reformatting. + Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> + partitionIdMsgs = + new Int2ObjectOpenHashMap<ByteArrayVertexIdMessages>(); + + // Put data from ByteArrayOneToAllMessages to ByteArrayVertexIdMessages + ExtendedDataInput reader = oneToAllMsgs.getOneToAllMessagesReader(); + I vertexId = getConf().createVertexId(); + M msg = oneToAllMsgs.createMessage(); + int idCount = 0; + int partitionId = 0; + try { + while (reader.available() != 0) { + msg.readFields(reader); + idCount = reader.readInt(); + for (int i = 0; i < idCount; i++) { + vertexId.readFields(reader); + PartitionOwner owner = + serviceWorker.getVertexPartitionOwner(vertexId); + partitionId = owner.getPartitionId(); + ByteArrayVertexIdMessages<I, M> idMsgs = + partitionIdMsgs.get(partitionId); + if (idMsgs == null) { + idMsgs = new ByteArrayVertexIdMessages<I, M>( + getConf().<M>getOutgoingMessageValueFactory()); + idMsgs.setConf(getConf()); + idMsgs.initialize(initialSize); + partitionIdMsgs.put(partitionId, idMsgs); + } + idMsgs.add(vertexId, msg); + } + } + } catch (IOException e) { + throw new RuntimeException("doRequest: Got IOException ", e); + } + // Read ByteArrayVertexIdMessages and write to message store + try { + for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs : + partitionIdMsgs.entrySet()) { + if (!idMsgs.getValue().isEmpty()) { + serverData.getIncomingMessageStore().addPartitionMessages( + idMsgs.getKey(), idMsgs.getValue()); + } + } + } catch (IOException e) { + throw new RuntimeException("doRequest: Got IOException.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/294b7358/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java new file mode 100644 index 0000000..8a6fc12 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Stores a message and a list of target vertex ids. + * + * @param <I> Vertex id + * @param <M> Message data + */ +@SuppressWarnings("unchecked") +public class ByteArrayOneToAllMessages< + I extends WritableComparable, M extends Writable> + implements Writable, ImmutableClassesGiraphConfigurable { + /** Extended data output */ + private ExtendedDataOutput extendedDataOutput; + /** Configuration */ + private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration; + /** Message value class */ + private MessageValueFactory<M> messageValueFactory; + + /** + * Constructor. + * + * @param messageValueFactory Class for messages + */ + public ByteArrayOneToAllMessages( + MessageValueFactory<M> messageValueFactory) { + this.messageValueFactory = messageValueFactory; + } + + /** + * Initialize the inner state. Must be called before {@code add()} is called. + */ + public void initialize() { + extendedDataOutput = configuration.createExtendedDataOutput(); + } + + /** + * Initialize the inner state, with a known size. Must be called before + * {@code add()} is called. + * + * @param expectedSize Number of bytes to be expected + */ + public void initialize(int expectedSize) { + extendedDataOutput = configuration.createExtendedDataOutput(expectedSize); + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return this.configuration; + } + + /** + * Add a message. + * The order is: the message>id count>ids . + * + * @param ids The byte array which holds target ids + * of this message on the worker + * @param idPos The end position of the ids + * information in the byte array above. + * @param count The number of ids + * @param msg The message sent + */ + public void add(byte[] ids, int idPos, int count, M msg) { + try { + msg.write(extendedDataOutput); + extendedDataOutput.writeInt(count); + extendedDataOutput.write(ids, 0, idPos); + } catch (IOException e) { + throw new IllegalStateException("add: IOException", e); + } + } + + /** + * Create a message. + * + * @return A created message object. + */ + public M createMessage() { + return messageValueFactory.createMessageValue(); + } + + /** + * Get the number of bytes used. + * + * @return Bytes used + */ + public int getSize() { + return extendedDataOutput.getPos(); + } + + /** + * Get the size of ByteArrayOneToAllMessages after serialization. + * Here 4 is the size of an integer which represents the size of whole + * byte array. + * + * @return The size (in bytes) of the serialized object + */ + public int getSerializedSize() { + return 4 + getSize(); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeInt(extendedDataOutput.getPos()); + dataOutput.write(extendedDataOutput.getByteArray(), 0, + extendedDataOutput.getPos()); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + int size = dataInput.readInt(); + byte[] buf = new byte[size]; + dataInput.readFully(buf); + extendedDataOutput = configuration.createExtendedDataOutput(buf, size); + } + + /** + * Check if the byte array is empty. + * + * @return True if the position of the byte array is 0. + */ + public boolean isEmpty() { + return extendedDataOutput.getPos() == 0; + } + + /** + * Get the reader of this OneToAllMessages + * + * @return ExtendedDataInput + */ + public ExtendedDataInput getOneToAllMessagesReader() { + return configuration.createExtendedDataInput( + extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos()); + } +}
