abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/604

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
---
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
A 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
M 
algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M 
hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M 
hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
A 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java.orig
M 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M 
hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A 
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M 
hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
19 files changed, 608 insertions(+), 73 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/04/604/1

diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
index b95d279..f4dbee7 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -51,14 +50,21 @@
     private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
 
     private final boolean bulkload;
+    private final boolean feed;
 
     public InsertDeleteOperator(IDataSource<?> dataSource, 
Mutable<ILogicalExpression> payloadExpr,
             List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, 
boolean bulkload) {
+        this(dataSource, payloadExpr, primaryKeyExprs, operation, bulkload, 
false);
+    }
+
+    public InsertDeleteOperator(IDataSource<?> dataSource, 
Mutable<ILogicalExpression> payloadExpr,
+            List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, 
boolean bulkload, boolean feed) {
         this.dataSource = dataSource;
         this.payloadExpr = payloadExpr;
         this.primaryKeyExprs = primaryKeyExprs;
         this.operation = operation;
         this.bulkload = bulkload;
+        this.feed = feed;
     }
 
     @Override
@@ -68,7 +74,8 @@
     }
 
     @Override
-    public boolean 
acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) 
throws AlgebricksException {
+    public boolean 
acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
         boolean changed = false;
         changed = transform.transform(payloadExpr);
         for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
@@ -120,7 +127,7 @@
 
     public boolean isBulkload() {
         return bulkload;
-       }
+    }
 
     public void 
setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> 
additionalFilteringExpressions) {
         this.additionalFilteringExpressions = additionalFilteringExpressions;
@@ -130,4 +137,8 @@
         return additionalFilteringExpressions;
     }
 
+    public boolean isFeed() {
+        return feed;
+    }
+
 }
diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 0ff1e47..0bfc417 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -47,8 +47,8 @@
 
 public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
 
-    private List<LogicalVariable> hashFields;
-    private INodeDomain domain;
+    protected List<LogicalVariable> hashFields;
+    protected INodeDomain domain;
 
     public HashPartitionExchangePOperator(List<LogicalVariable> hashFields, 
INodeDomain domain) {
         this.hashFields = hashFields;
diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
new file mode 100644
index 0000000..4ddf4a9
--- /dev/null
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
@@ -0,0 +1,45 @@
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import 
org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import 
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
+
+public class HashPartitionExchangeWithMessagePOperator extends 
HashPartitionExchangePOperator {
+
+    public HashPartitionExchangeWithMessagePOperator(List<LogicalVariable> 
hashFields, INodeDomain domain) {
+        super(hashFields, domain);
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, TargetConstraint> 
createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext 
context) throws AlgebricksException {
+        int[] keys = new int[hashFields.size()];
+        IBinaryHashFunctionFactory[] hashFunctionFactories = new 
IBinaryHashFunctionFactory[hashFields.size()];
+        int i = 0;
+        IBinaryHashFunctionFactoryProvider hashFunProvider = 
context.getBinaryHashFunctionFactoryProvider();
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        for (LogicalVariable v : hashFields) {
+            keys[i] = opSchema.findVariable(v);
+            hashFunctionFactories[i] = 
hashFunProvider.getBinaryHashFunctionFactory(env.getVarType(v));
+            ++i;
+        }
+        ITuplePartitionComputerFactory tpcf = new 
FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
+        IConnectorDescriptor conn = new 
MToNPartitioningWithMessageConnectorDescriptor(spec, tpcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+    }
+}
diff --git 
a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
 
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 8bf1ad5..d105af4 100644
--- 
a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ 
b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -46,6 +46,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
@@ -54,6 +55,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangeWithMessagePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -570,7 +572,11 @@
                         }
                     }
                     if (!propWasSet) {
-                        pop = new HashPartitionExchangePOperator(varList, 
domain);
+                        if (op instanceof InsertDeleteOperator && 
((InsertDeleteOperator) op).isFeed()) {
+                            pop = new 
HashPartitionExchangeWithMessagePOperator(varList, domain);
+                        } else {
+                            pop = new HashPartitionExchangePOperator(varList, 
domain);
+                        }
                     }
                     break;
                 }
diff --git 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
index eb6b888..473f3ae 100644
--- 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
+++ 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
@@ -54,7 +54,7 @@
      * nbytes the actual data.
      * If the tupleLength includes the field slot, please set the fieldCount = 0
      */
-    public static int calcSpaceInFrame(int fieldCount, int tupleLength) {
+    public static int calcRequiredSpace(int fieldCount, int tupleLength) {
         return 4 + fieldCount * 4 + tupleLength;
     }
 
@@ -68,7 +68,7 @@
      */
     public static int calcAlignedFrameSizeToStore(int fieldCount, int 
tupleLength, int minFrameSize) {
         assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0;
-        return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + 
FrameConstants.META_DATA_LEN - 1) / minFrameSize)
+        return (1 + (calcRequiredSpace(fieldCount, tupleLength) + 
FrameConstants.META_DATA_LEN - 1) / minFrameSize)
                 * minFrameSize;
     }
 
diff --git 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index fd1d376..3b48bfd 100644
--- 
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ 
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.context;
 
+import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -41,4 +42,12 @@
     public IDatasetPartitionManager getDatasetPartitionManager();
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId 
deploymendId) throws Exception;
+
+    public void setSharedObject(HashMap<String, Object> sharedObject);
+
+    public HashMap<String, Object> getSharedObject();
+
+    public void setObject(String name, Object object);
+
+    public Object getObject(String name);
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
 
b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index ba71b0c..9e9a960 100644
--- 
a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ 
b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -76,7 +76,7 @@
             }
         }
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Result state cleanup instance successfully 
completed.");
+            //LOGGER.info("Result state cleanup instance successfully 
completed.");
         }
     }
 }
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 61baf82..3518601 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -95,6 +95,8 @@
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
+    private HashMap<String, Object> taskSharedObject;
+
     public Task(Joblet joblet, TaskAttemptId taskId, String displayName, 
ExecutorService executor,
             NodeControllerService ncs, List<List<PartitionChannel>> 
inputChannelsFromConnectors) {
         this.joblet = joblet;
@@ -383,4 +385,27 @@
     public void sendApplicationMessageToCC(byte[] message, DeploymentId 
deploymentId) throws Exception {
         this.ncs.sendApplicationMessageToCC(message, deploymentId);
     }
+
+    @Override
+    public void setObject(String name, Object object) {
+        if (taskSharedObject == null) {
+            taskSharedObject = new HashMap<>();
+        }
+        taskSharedObject.put(name, object);
+    }
+
+    @Override
+    public Object getObject(String name) {
+        return taskSharedObject.get(name);
+    }
+
+    @Override
+    public void setSharedObject(HashMap<String, Object> sharedObject) {
+        this.taskSharedObject = sharedObject;
+    }
+
+    @Override
+    public HashMap<String, Object> getSharedObject() {
+        return taskSharedObject;
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index fd71716..1553605 100644
--- 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -29,6 +29,18 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 
+/*
+ * Frame
+ *  _____________________________________________
+ * |[tuple1][tuple2][tuple3].........            |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]|
+ * |_____________________________________________|
+ */
 public class AbstractFrameAppender implements IFrameAppender {
     protected IFrame frame;
     protected byte[] array; // cached the getBuffer().array to speed up byte 
array access a little
@@ -46,7 +58,7 @@
     }
 
     protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
-        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, 
tupleLength)
+        return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, 
tupleLength)
                 + tupleCount * FrameConstants.SIZE_LEN <= 
FrameHelper.getTupleCountOffset(frame.getFrameSize());
     }
 
diff --git 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java.orig
 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java.orig
new file mode 100644
index 0000000..4c1626a
--- /dev/null
+++ 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java.orig
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.comm.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.FrameConstants;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+/*
+ * Frame
+ *  _____________________________________________
+ * |[tuple1][tuple2][tuple3].........            |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]|
+ * |_____________________________________________|
+ */
+public class AbstractFrameAppender implements IFrameAppender {
+    protected IFrame frame;
+    protected byte[] array; // cached the getBuffer().array to speed up byte 
array access a little
+
+    protected int tupleCount;
+    protected int tupleDataEndOffset;
+
+    @Override
+    public void reset(IFrame frame, boolean clear) throws HyracksDataException 
{
+        this.frame = frame;
+        if (clear) {
+            this.frame.reset();
+        }
+        reset(getBuffer(), clear);
+    }
+
+    protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
+<<<<<<< HEAD
+        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, 
tupleLength)
+=======
+        return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, 
tupleLength)
+>>>>>>> Add Feed Messages to Frames
+                + tupleCount * FrameConstants.SIZE_LEN <= 
FrameHelper.getTupleCountOffset(frame.getFrameSize());
+    }
+
+    protected void reset(ByteBuffer buffer, boolean clear) {
+        array = buffer.array();
+        if (clear) {
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), 0);
+            tupleCount = 0;
+            tupleDataEndOffset = FrameConstants.TUPLE_START_OFFSET;
+        } else {
+            tupleCount = IntSerDeUtils.getInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()));
+            tupleDataEndOffset = tupleCount == 0 ? 
FrameConstants.TUPLE_START_OFFSET
+                    : IntSerDeUtils.getInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize())
+                            - tupleCount * FrameConstants.SIZE_LEN);
+        }
+    }
+
+    @Override
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return frame.getBuffer();
+    }
+
+    @Override
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws 
HyracksDataException {
+        getBuffer().clear();
+        if (getTupleCount() > 0) {
+            outWriter.nextFrame(getBuffer());
+        }
+        if (clearFrame) {
+            frame.reset();
+            reset(getBuffer(), true);
+        }
+    }
+
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws 
HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            
frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, 
dataLength, frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+}
diff --git 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index c9c51d3..136e231 100644
--- 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -40,6 +40,10 @@
         reset(frame, clear);
     }
 
+    /**
+     * append fieldSlots and bytes to the current frame
+     */
+    @Override
     public boolean append(int[] fieldSlots, byte[] bytes, int offset, int 
length) throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
             for (int i = 0; i < fieldSlots.length; ++i) {
@@ -50,27 +54,28 @@
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(getBuffer().array(), 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(getBuffer().array(), 
FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+                    tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(byte[] bytes, int offset, int length) throws 
HyracksDataException {
         if (canHoldNewTuple(0, length)) {
             System.arraycopy(bytes, offset, getBuffer().array(), 
tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int 
offset, int length)
             throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
@@ -83,17 +88,16 @@
             }
             System.arraycopy(bytes, offset, array, tupleDataEndOffset + 
effectiveSlots * 4, length);
             tupleDataEndOffset += effectiveSlots * 4 + length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1),
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, 
int tEndOffset)
             throws HyracksDataException {
         int length = tEndOffset - tStartOffset;
@@ -101,25 +105,25 @@
             ByteBuffer src = tupleAccessor.getBuffer();
             System.arraycopy(src.array(), tStartOffset, array, 
tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1),
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) 
throws HyracksDataException {
         int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
         int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
         return append(tupleAccessor, tStartOffset, tEndOffset);
     }
 
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, 
IFrameTupleAccessor accessor1,
-            int tIndex1) throws HyracksDataException {
+    @Override
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, 
IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
@@ -143,22 +147,22 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, 
tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, 
tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, 
tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, 
dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, 
int[] fieldSlots1, byte[] bytes1,
             int offset1, int dataLen1) throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
@@ -176,21 +180,19 @@
             System.arraycopy(src0.array(), startOffset0, array, 
tupleDataEndOffset, slotsLen0);
             // Copy fieldSlots1 with the following transformation: newSlotIdx 
= oldSlotIdx + dataLen0
             for (int i = 0; i < fieldSlots1.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i 
* 4,
-                        (fieldSlots1[i] + dataLen0));
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i 
* 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, 
tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, 
tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy bytes1
-            System.arraycopy(bytes1, offset1, array,
-                    tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + 
dataLen0, dataLen1);
+            System.arraycopy(bytes1, offset1, array, tupleDataEndOffset + 
slotsLen0 + fieldSlots1.length * 4 + dataLen0,
+                    dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
@@ -219,22 +221,21 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy bytes0
-            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + 
slotsLen0 + slotsLen1,
-                    dataLen0);
+            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + 
slotsLen0 + slotsLen1, dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, 
tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, 
dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, 
int[] fields)
             throws HyracksDataException {
         int fTargetSlotsLength = fields.length * 4;
@@ -253,18 +254,17 @@
                 int fSrcStart = tStartOffset + fSrcSlotsLength + 
accessor.getFieldStartOffset(tIndex, fields[i]);
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
-                System.arraycopy(accessor.getBuffer().array(), fSrcStart, 
array, tupleDataEndOffset
-                        + fTargetSlotsLength + fStartOffset, fLen);
+                System.arraycopy(accessor.getBuffer().array(), fSrcStart, 
array,
+                        tupleDataEndOffset + fTargetSlotsLength + 
fStartOffset, fLen);
                 fEndOffset += fLen;
                 IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, 
fEndOffset);
                 fStartOffset = fEndOffset;
             }
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
diff --git 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
new file mode 100644
index 0000000..ca916f5
--- /dev/null
+++ 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.io;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class MessagingFrameTupleAppender extends FrameTupleAppender {
+    public static final String KEY_MESSAGE = "message";
+    public static final String KEY_LENGTH = "length";
+    public static final String KEY_SLOTS = "slots";
+    public static final int MAX_MESSAGE_SIZE = 100;
+    private IHyracksTaskContext ctx;
+
+    public MessagingFrameTupleAppender(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        this.ctx = ctx;
+    }
+
+    public MessagingFrameTupleAppender(IFrame frame, IHyracksTaskContext ctx) 
throws HyracksDataException {
+        super(frame);
+        this.ctx = ctx;
+    }
+
+    @Override
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws 
HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            
frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, 
dataLength + MAX_MESSAGE_SIZE,
+                    frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws 
HyracksDataException {
+        if (getTupleCount() > 0) {
+            appendMessage();
+            getBuffer().clear();
+            outWriter.nextFrame(getBuffer());
+        }
+        if (clearFrame) {
+            frame.reset();
+            reset(getBuffer(), true);
+        }
+    }
+
+    public void appendMessage() {
+        byte[] message = (byte[]) ctx.getObject(KEY_MESSAGE);
+        int length = ((MutableInt) ctx.getObject(KEY_LENGTH)).intValue();
+        int[] slots = (int[]) ctx.getObject(KEY_SLOTS);
+        for (int i = 0; i < slots.length; ++i) {
+            IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, slots[i]);
+        }
+        System.arraycopy(message, 0, array, tupleDataEndOffset + slots.length 
* 4, length);
+        tupleDataEndOffset += slots.length * 4 + length;
+        IntSerDeUtils.putInt(getBuffer().array(),
+                FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * 
(tupleCount + 1), tupleDataEndOffset);
+        ++tupleCount;
+        IntSerDeUtils.putInt(getBuffer().array(), 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+    }
+}
diff --git 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
index b808ac1..b01eb15 100644
--- 
a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ 
b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -26,7 +26,14 @@
                 + ((bytes[offset + 3] & 0xff) << 0);
     }
 
+    /**
+     * put integer value into the array bytes at the offset offset
+     * @param bytes byte array to put data in
+     * @param offset offset from the beginning of the array to write the 
{@code value} in 
+     * @param value value to write to {@code bytes[offset]}
+     */
     public static void putInt(byte[] bytes, int offset, int value) {
+
         bytes[offset++] = (byte) (value >> 24);
         bytes[offset++] = (byte) (value >> 16);
         bytes[offset++] = (byte) (value >> 8);
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
index 91c29e2..81ebba3 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class NonDeterministicFrameReader implements IFrameReader {
-    private final NonDeterministicChannelReader channelReader;
+    protected final NonDeterministicChannelReader channelReader;
 
     public NonDeterministicFrameReader(NonDeterministicChannelReader 
channelReader) {
         this.channelReader = channelReader;
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 22a4c1c..85c26ab 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -35,7 +35,7 @@
 
 public class MToNPartitioningConnectorDescriptor extends 
AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
-    private ITuplePartitionComputerFactory tpcf;
+    protected ITuplePartitionComputerFactory tpcf;
 
     public MToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry 
spec, ITuplePartitionComputerFactory tpcf) {
         super(spec);
@@ -45,15 +45,15 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, 
RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int 
nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
-        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, 
nConsumerPartitions, edwFactory,
-                recordDesc, tpcf.createPartitioner());
+                    throws HyracksDataException {
+        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, 
nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext 
ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) 
throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext 
ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws 
HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
         NonDeterministicChannelReader channelReader = new 
NonDeterministicChannelReader(nProducerPartitions,
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
new file mode 100644
index 0000000..a1909cd
--- /dev/null
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -0,0 +1,45 @@
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import 
org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
+
+public class MToNPartitioningWithMessageConnectorDescriptor extends 
MToNPartitioningConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public 
MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry 
spec,
+            ITuplePartitionComputerFactory tpcf) {
+        super(spec, tpcf);
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, 
RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int 
nProducerPartitions, int nConsumerPartitions)
+                    throws HyracksDataException {
+        final PartitionWithMessageDataWriter hashWriter = new 
PartitionWithMessageDataWriter(ctx, nConsumerPartitions,
+                edwFactory, recordDesc, tpcf.createPartitioner());
+        return hashWriter;
+    }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext 
ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws 
HyracksDataException {
+        BitSet expectedPartitions = new BitSet(nProducerPartitions);
+        expectedPartitions.set(0, nProducerPartitions);
+        NonDeterministicChannelReader channelReader = new 
NonDeterministicChannelReader(nProducerPartitions,
+                expectedPartitions);
+        NonDeterministicFrameReader frameReader = new 
NonDeterministicFrameReader(channelReader);
+        return new PartitionCollector(ctx, getConnectorId(), index, 
expectedPartitions, frameReader, channelReader);
+    }
+}
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 646883f..64ca8e7 100644
--- 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -33,14 +33,14 @@
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class PartitionDataWriter implements IFrameWriter {
-    private final int consumerPartitionCount;
-    private final IFrameWriter[] pWriters;
-    private final boolean[] isOpen;
-    private final FrameTupleAppender[] appenders;
-    private final FrameTupleAccessor tupleAccessor;
-    private final ITuplePartitionComputer tpc;
-    private final IHyracksTaskContext ctx;
-    private boolean allocatedFrame = false;
+    protected final int consumerPartitionCount;
+    protected final IFrameWriter[] pWriters;
+    protected final boolean[] isOpen;
+    protected final FrameTupleAppender[] appenders;
+    protected final FrameTupleAccessor tupleAccessor;
+    protected final ITuplePartitionComputer tpc;
+    protected final IHyracksTaskContext ctx;
+    protected boolean allocatedFrame = false;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int 
consumerPartitionCount, IPartitionWriterFactory pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) 
throws HyracksDataException {
diff --git 
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
new file mode 100644
index 0000000..3d58bf1
--- /dev/null
+++ 
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
@@ -0,0 +1,148 @@
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+
+public class PartitionWithMessageDataWriter implements IFrameWriter {
+
+    protected final int consumerPartitionCount;
+    protected final IFrameWriter[] pWriters;
+    protected final boolean[] isOpen;
+    protected final MessagingFrameTupleAppender[] appenders;
+    protected final FrameTupleAccessor tupleAccessor;
+    protected final ITuplePartitionComputer tpc;
+    protected final IHyracksTaskContext ctx;
+    protected boolean allocatedFrame = false;
+
+    public PartitionWithMessageDataWriter(IHyracksTaskContext ctx, int 
consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor 
recordDescriptor, ITuplePartitionComputer tpc)
+                    throws HyracksDataException {
+        this.consumerPartitionCount = consumerPartitionCount;
+        pWriters = new IFrameWriter[consumerPartitionCount];
+        isOpen = new boolean[consumerPartitionCount];
+        appenders = new MessagingFrameTupleAppender[consumerPartitionCount];
+        for (int i = 0; i < consumerPartitionCount; ++i) {
+            try {
+                pWriters[i] = pwFactory.createFrameWriter(i);
+                appenders[i] = new MessagingFrameTupleAppender(ctx);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.tpc = tpc;
+        this.ctx = ctx;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
+        for (int i = 0; i < pWriters.length; ++i) {
+            if (isOpen[i]) {
+                if (allocatedFrame) {
+                    try {
+                        appenders[i].write(pWriters[i], true);
+                    } catch (Throwable th) {
+                        if (closeException == null) {
+                            closeException = new HyracksDataException(th);
+                        } else {
+                            closeException.addSuppressed(th);
+                        }
+                    }
+                }
+                try {
+                    pWriters[i].close();
+                } catch (Throwable th) {
+                    if (closeException == null) {
+                        closeException = new HyracksDataException(th);
+                    } else {
+                        closeException.addSuppressed(th);
+                    }
+                }
+            }
+        }
+        if (closeException != null) {
+            throw closeException;
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        allocateFrames();
+        for (int i = 0; i < pWriters.length; ++i) {
+            isOpen[i] = true;
+            pWriters[i].open();
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tupleAccessor.reset(buffer);
+        int tupleCount = tupleAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; ++i) {
+            int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
+            FrameUtils.appendToWriter(pWriters[h], appenders[h], 
tupleAccessor, i);
+        }
+    }
+
+    private void allocateFrames() throws HyracksDataException {
+        for (int i = 0; i < appenders.length; ++i) {
+            appenders[i].reset(new VSizeFrame(ctx), true);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        HyracksDataException failException = null;
+        for (int i = 0; i < appenders.length; ++i) {
+            if (isOpen[i]) {
+                try {
+                    pWriters[i].fail();
+                } catch (Throwable th) {
+                    if (failException == null) {
+                        failException = new HyracksDataException(th);
+                    } else {
+                        failException.addSuppressed(th);
+                    }
+                }
+            }
+        }
+        if (failException != null) {
+            throw failException;
+        }
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        for (int i = 0; i < consumerPartitionCount; i++) {
+            appenders[i].flush(pWriters[i]);
+        }
+    }
+
+    public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender 
frameTupleAppender,
+            IFrameTupleAccessor tupleAccessor, int tIndex) throws 
HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.write(writer, true);
+            if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+                throw new HyracksDataException("The output cannot be fit into 
a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+}
diff --git 
a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
 
b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 6d954eb..531ba46 100644
--- 
a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ 
b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.test.support;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -39,11 +40,13 @@
     private final TestJobletContext jobletContext;
     private final TaskAttemptId taskId;
     private WorkspaceFileFactory fileFactory;
+    private HashMap<String, Object> map;
 
     public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId 
taskId) {
         this.jobletContext = jobletContext;
         this.taskId = taskId;
         fileFactory = new WorkspaceFileFactory(this, (IOManager) 
getIOManager());
+        map = new HashMap<>();
     }
 
     @Override
@@ -137,4 +140,24 @@
     public ExecutorService getExecutorService() {
         return null;
     }
+
+    @Override
+    public void setObject(String name, Object object) {
+        map.put(name, object);
+    }
+
+    @Override
+    public Object getObject(String name) {
+        return map.get(name);
+    }
+
+    @Override
+    public void setSharedObject(HashMap<String, Object> sharedObject) {
+        this.map = sharedObject;
+    }
+
+    @Override
+    public HashMap<String, Object> getSharedObject() {
+        return map;
+    }
 }
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to