abdullah alamoudi has submitted this change and it was merged. Change subject: Support Sending Messages Alongside Frame Data ......................................................................
Support Sending Messages Alongside Frame Data This change support sending messages with records. The tuple Appender reserves 100 bytes for a message. Before sending the frame, it appends The message in the last tuple position. The message is read from the task context as the shared object between different operators in the pipeline. The first use of this feature will be within feeds to request acks for at least once semantics. Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28 Reviewed-on: https://asterix-gerrit.ics.uci.edu/604 Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java M hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java 19 files changed, 289 insertions(+), 78 deletions(-) Approvals: Michael Blow: Looks good to me, but someone else must approve Murtadha Hubail: Looks good to me, approved Jenkins: Verified diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java index eb6b888..473f3ae 100644 --- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java +++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java @@ -54,7 +54,7 @@ * nbytes the actual data. * If the tupleLength includes the field slot, please set the fieldCount = 0 */ - public static int calcSpaceInFrame(int fieldCount, int tupleLength) { + public static int calcRequiredSpace(int fieldCount, int tupleLength) { return 4 + fieldCount * 4 + tupleLength; } @@ -68,7 +68,7 @@ */ public static int calcAlignedFrameSizeToStore(int fieldCount, int tupleLength, int minFrameSize) { assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0; - return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize) + return (1 + (calcRequiredSpace(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize) * minFrameSize; } diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java index fd1d376..274d446 100644 --- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java +++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java @@ -41,4 +41,8 @@ public IDatasetPartitionManager getDatasetPartitionManager(); public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception; + + public void setSharedObject(Object sharedObject); + + public Object getSharedObject(); } \ No newline at end of file diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java index 81a0290..339eb9d 100644 --- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java +++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java @@ -21,9 +21,6 @@ import java.io.Serializable; import java.util.BitSet; -import org.json.JSONException; -import org.json.JSONObject; - import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.IPartitionCollector; @@ -33,6 +30,8 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.ActivityCluster; +import org.json.JSONException; +import org.json.JSONObject; /** * Connector that connects operators in a Job. @@ -40,6 +39,7 @@ * @author vinayakb */ public interface IConnectorDescriptor extends Serializable { + /** * Gets the id of the connector. * @@ -68,7 +68,7 @@ */ public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException; + throws HyracksDataException; /** * Factory metod to create the receive side reader that reads data from this diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 61baf82..e99fea8 100644 --- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -95,6 +95,8 @@ private List<List<PartitionChannel>> inputChannelsFromConnectors; + private Object sharedObject; + public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor, NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) { this.joblet = joblet; @@ -383,4 +385,14 @@ public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception { this.ncs.sendApplicationMessageToCC(message, deploymentId); } + + @Override + public void setSharedObject(Object sharedObject) { + this.sharedObject = sharedObject; + } + + @Override + public Object getSharedObject() { + return sharedObject; + } } \ No newline at end of file diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java index fd71716..1553605 100644 --- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java +++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java @@ -29,6 +29,18 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.util.IntSerDeUtils; +/* + * Frame + * _____________________________________________ + * |[tuple1][tuple2][tuple3]......... | + * | . | + * | . | + * | . | + * | . | + * | . | + * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]| + * |_____________________________________________| + */ public class AbstractFrameAppender implements IFrameAppender { protected IFrame frame; protected byte[] array; // cached the getBuffer().array to speed up byte array access a little @@ -46,7 +58,7 @@ } protected boolean hasEnoughSpace(int fieldCount, int tupleLength) { - return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength) + return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, tupleLength) + tupleCount * FrameConstants.SIZE_LEN <= FrameHelper.getTupleCountOffset(frame.getFrameSize()); } diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java index c9c51d3..136e231 100644 --- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java +++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java @@ -40,6 +40,10 @@ reset(frame, clear); } + /** + * append fieldSlots and bytes to the current frame + */ + @Override public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException { if (canHoldNewTuple(fieldSlots.length, length)) { for (int i = 0; i < fieldSlots.length; ++i) { @@ -50,27 +54,28 @@ IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); ++tupleCount; - IntSerDeUtils - .putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), + tupleCount); return true; } return false; } + @Override public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException { if (canHoldNewTuple(0, length)) { System.arraycopy(bytes, offset, getBuffer().array(), tupleDataEndOffset, length); tupleDataEndOffset += length; - IntSerDeUtils.putInt(array, - FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), + tupleDataEndOffset); ++tupleCount; - IntSerDeUtils - .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); return true; } return false; } + @Override public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException { if (canHoldNewTuple(fieldSlots.length, length)) { @@ -83,17 +88,16 @@ } System.arraycopy(bytes, offset, array, tupleDataEndOffset + effectiveSlots * 4, length); tupleDataEndOffset += effectiveSlots * 4 + length; - IntSerDeUtils.putInt(array, - FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); ++tupleCount; - IntSerDeUtils - .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); return true; } return false; } + @Override public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) throws HyracksDataException { int length = tEndOffset - tStartOffset; @@ -101,25 +105,25 @@ ByteBuffer src = tupleAccessor.getBuffer(); System.arraycopy(src.array(), tStartOffset, array, tupleDataEndOffset, length); tupleDataEndOffset += length; - IntSerDeUtils.putInt(array, - FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); ++tupleCount; - IntSerDeUtils - .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); return true; } return false; } + @Override public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException { int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex); int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex); return append(tupleAccessor, tStartOffset, tEndOffset); } - public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, - int tIndex1) throws HyracksDataException { + @Override + public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) + throws HyracksDataException { int startOffset0 = accessor0.getTupleStartOffset(tIndex0); int endOffset0 = accessor0.getTupleEndOffset(tIndex0); int length0 = endOffset0 - startOffset0; @@ -143,22 +147,22 @@ src1.getInt(startOffset1 + i * 4) + dataLen0); } // Copy data0 - System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 - + slotsLen1, dataLen0); + System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, + dataLen0); // Copy data1 - System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0 - + slotsLen1 + dataLen0, dataLen1); + System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, + tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1); tupleDataEndOffset += (length0 + length1); - IntSerDeUtils.putInt(array, - FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), + tupleDataEndOffset); ++tupleCount; - IntSerDeUtils - .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); return true; } return false; } + @Override public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1, int dataLen1) throws HyracksDataException { int startOffset0 = accessor0.getTupleStartOffset(tIndex0); @@ -176,21 +180,19 @@ System.arraycopy(src0.array(), startOffset0, array, tupleDataEndOffset, slotsLen0); // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0 for (int i = 0; i < fieldSlots1.length; ++i) { - IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4, - (fieldSlots1[i] + dataLen0)); + IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0)); } // Copy data0 - System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 - + slotsLen1, dataLen0); + System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, + dataLen0); // Copy bytes1 - System.arraycopy(bytes1, offset1, array, - tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0, dataLen1); + System.arraycopy(bytes1, offset1, array, tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0, + dataLen1); tupleDataEndOffset += (length0 + length1); - IntSerDeUtils.putInt(array, - FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), + tupleDataEndOffset); ++tupleCount; - IntSerDeUtils - .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); return true; } return false; @@ -219,22 +221,21 @@ src1.getInt(startOffset1 + i * 4) + dataLen0); } // Copy bytes0 - System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, - dataLen0); + System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, dataLen0); // Copy data1 - System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0 - + slotsLen1 + dataLen0, dataLen1); + System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, + tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1); tupleDataEndOffset += (length0 + length1); - IntSerDeUtils.putInt(array, - FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), + tupleDataEndOffset); ++tupleCount; - IntSerDeUtils - .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); return true; } return false; } + @Override public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) throws HyracksDataException { int fTargetSlotsLength = fields.length * 4; @@ -253,18 +254,17 @@ int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]); int fLen = accessor.getFieldEndOffset(tIndex, fields[i]) - accessor.getFieldStartOffset(tIndex, fields[i]); - System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, tupleDataEndOffset - + fTargetSlotsLength + fStartOffset, fLen); + System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, + tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen); fEndOffset += fLen; IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fEndOffset); fStartOffset = fEndOffset; } tupleDataEndOffset += length; - IntSerDeUtils.putInt(array, - FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), + tupleDataEndOffset); ++tupleCount; - IntSerDeUtils - .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); return true; } return false; diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java new file mode 100644 index 0000000..7100c11 --- /dev/null +++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.common.io; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.util.IntSerDeUtils; + +public class MessagingFrameTupleAppender extends FrameTupleAppender { + + public static final int MAX_MESSAGE_SIZE = 100; + private final IHyracksTaskContext ctx; + + public MessagingFrameTupleAppender(IHyracksTaskContext ctx) { + this.ctx = ctx; + } + + @Override + protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException { + if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) { + return true; + } + if (tupleCount == 0) { + frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE, + frame.getMinSize())); + reset(frame.getBuffer(), true); + return true; + } + return false; + } + + @Override + public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException { + if (tupleCount > 0) { + appendMessage(); + getBuffer().clear(); + outWriter.nextFrame(getBuffer()); + if (clearFrame) { + frame.reset(); + reset(getBuffer(), true); + } + } + } + + public void appendMessage() { + ByteBuffer message = (ByteBuffer) ctx.getSharedObject(); + System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit()); + tupleDataEndOffset += message.limit(); + IntSerDeUtils.putInt(getBuffer().array(), + FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); + ++tupleCount; + IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + } +} diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java index b808ac1..d2100bb 100644 --- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java +++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java @@ -26,7 +26,14 @@ + ((bytes[offset + 3] & 0xff) << 0); } + /** + * put integer value into the array bytes at the offset offset + * @param bytes byte array to put data in + * @param offset offset from the beginning of the array to write the {@code value} in + * @param value value to write to {@code bytes[offset]} + */ public static void putInt(byte[] bytes, int offset, int value) { + bytes[offset++] = (byte) (value >> 24); bytes[offset++] = (byte) (value >> 16); bytes[offset++] = (byte) (value >> 8); diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java index cf4808f..7d97507 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java @@ -44,7 +44,7 @@ } @Override - public boolean allProducersToAllConsumers(){ + public boolean allProducersToAllConsumers() { return true; } } diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java index 7856d6a..44d77ac 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java @@ -41,8 +41,8 @@ private ITuplePartitionComputerFactory tpcf; - public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf, - ILocalityMap localityMap) { + public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, + ITuplePartitionComputerFactory tpcf, ILocalityMap localityMap) { super(spec); this.localityMap = localityMap; this.tpcf = tpcf; @@ -60,7 +60,7 @@ @Override public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { + throws HyracksDataException { return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(), nConsumerPartitions, localityMap, index); } diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java index 22a4c1c..d5e4e20 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java @@ -35,7 +35,7 @@ public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor { private static final long serialVersionUID = 1L; - private ITuplePartitionComputerFactory tpcf; + protected ITuplePartitionComputerFactory tpcf; public MToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf) { super(spec); @@ -45,15 +45,13 @@ @Override public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { - final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, - recordDesc, tpcf.createPartitioner()); - return hashWriter; + throws HyracksDataException { + return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner()); } @Override - public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, - int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { + public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index, + int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { BitSet expectedPartitions = new BitSet(nProducerPartitions); expectedPartitions.set(0, nProducerPartitions); NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions, @@ -61,4 +59,8 @@ NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader); return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader); } + + public ITuplePartitionComputerFactory getTuplePartitionComputerFactory() { + return tpcf; + } } \ No newline at end of file diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java new file mode 100644 index 0000000..e90d8b0 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.connectors; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.IPartitionWriterFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; + +public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitioningConnectorDescriptor { + + private static final long serialVersionUID = 1L; + + public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec, + ITuplePartitionComputerFactory tpcf) { + super(spec, tpcf); + } + + @Override + public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, + IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) + throws HyracksDataException { + return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, + tpcf.createPartitioner()); + } +} diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java index cfa0cf9..dde29c1 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java @@ -48,13 +48,13 @@ @Override public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { + throws HyracksDataException { return edwFactory.createFrameWriter(index); } @Override - public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, - int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { + public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index, + int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { BitSet expectedPartitions = new BitSet(nProducerPartitions); expectedPartitions.set(index); NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions, @@ -69,8 +69,8 @@ OperatorDescriptorId consumer = ac.getConsumerActivity(getConnectorId()).getOperatorDescriptorId(); OperatorDescriptorId producer = ac.getProducerActivity(getConnectorId()).getOperatorDescriptorId(); - constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer), - new PartitionCountExpression(producer))); + constraintAcceptor.addConstraint( + new Constraint(new PartitionCountExpression(consumer), new PartitionCountExpression(producer))); } @Override diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java index 646883f..f84c3e4 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java @@ -51,7 +51,7 @@ for (int i = 0; i < consumerPartitionCount; ++i) { try { pWriters[i] = pwFactory.createFrameWriter(i); - appenders[i] = new FrameTupleAppender(); + appenders[i] = createTupleAppender(ctx); } catch (IOException e) { throw new HyracksDataException(e); } @@ -61,6 +61,10 @@ this.ctx = ctx; } + protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) { + return new FrameTupleAppender(); + } + @Override public void close() throws HyracksDataException { HyracksDataException closeException = null; diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java new file mode 100644 index 0000000..4055fb0 --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.connectors; + +import org.apache.hyracks.api.comm.IPartitionWriterFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; + +public class PartitionWithMessageDataWriter extends PartitionDataWriter { + + public PartitionWithMessageDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, + IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) + throws HyracksDataException { + super(ctx, consumerPartitionCount, pwFactory, recordDescriptor, tpc); + } + + @Override + protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) { + return new MessagingFrameTupleAppender(ctx); + } +} diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java index 0960927..d121ec4 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java @@ -26,7 +26,7 @@ import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -52,7 +52,7 @@ } @Override - public ITupleParser createTupleParser(final IHyracksCommonContext ctx) { + public ITupleParser createTupleParser(final IHyracksTaskContext ctx) { return new ITupleParser() { @Override public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException { diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java index daf5104..f495b75 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java @@ -20,9 +20,9 @@ import java.io.Serializable; -import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface ITupleParserFactory extends Serializable { - public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException; + public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException; } diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java index eb2714f..4558cf9 100644 --- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java +++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java @@ -27,7 +27,7 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -41,7 +41,7 @@ private static final long serialVersionUID = 1L; @Override - public ITupleParser createTupleParser(final IHyracksCommonContext ctx) { + public ITupleParser createTupleParser(final IHyracksTaskContext ctx) { return new ITupleParser() { @Override public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException { diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java index 6d954eb..9848ffb 100644 --- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java +++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java @@ -137,4 +137,13 @@ public ExecutorService getExecutorService() { return null; } + + @Override + public Object getSharedObject() { + return null; + } + + @Override + public void setSharedObject(Object sharedObject) { + } } \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/604 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28 Gerrit-PatchSet: 8 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
