Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/589

Change subject: Pass task partition to IPushRuntime
......................................................................

Pass task partition to IPushRuntime

This change includes the following:
- Pass task partition to IPushRuntime
This is needed to get partition specific info during runtime.
- Pass resource partition id IModificationOperationCallback
This is needed to include partition id to txn logs.

Change-Id: Iddd566ea97512c1dbd2217befdc2bb8822e77763
---
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M 
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
M 
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
M 
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
M 
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
M 
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
27 files changed, 58 insertions(+), 42 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/89/589/1

diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index 60fb517..ddeba74 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -25,5 +25,5 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IPushRuntimeFactory extends Serializable {
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
AlgebricksException, HyracksDataException;
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int 
partition) throws AlgebricksException, HyracksDataException;
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index bafe8a7..fe9abf6 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -60,7 +60,8 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            int partition)
             throws AlgebricksException {
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
 
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index fefc72e..e873864 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -54,8 +54,7 @@
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] keys, int[] 
partialKeys) throws HyracksDataException {
 
-        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, 
keyFieldIdx.length,
-                decorFieldIdx.length);
+        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, 
keyFieldIdx.length, decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new 
NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
             try {
@@ -132,8 +131,8 @@
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state) throws 
HyracksDataException {
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, 
IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be 
called");
             }
 
@@ -152,7 +151,7 @@
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx);
+            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx, -1);
             newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 
1]);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 2ca933a..cae6309 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -146,7 +146,7 @@
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx);
+            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx, -1);
             newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 
1]);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 9fea189..eeef166 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -35,11 +35,11 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
AlgebricksException, HyracksDataException {
-        return createOneOutputPushRuntime(ctx);
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int 
partition) throws AlgebricksException, HyracksDataException {
+        return createOneOutputPushRuntime(ctx, partition);
     }
 
-    public abstract AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx)
+    public abstract AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx, int partition)
             throws AlgebricksException, HyracksDataException;
 
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index 69876b0..ee349ff 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -39,7 +39,7 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
AlgebricksException {
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int 
partition) throws AlgebricksException {
         return new AbstractOneInputSinkPushRuntime() {
 
             @Override
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index b7a73d4..1700932 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -58,7 +58,7 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+    public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx, int partition)
             throws AlgebricksException {
         final IBinaryComparator[] comparators = new 
IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 8322cdc..717cd8b 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -101,7 +101,7 @@
                 PipelineAssembler pa = new PipelineAssembler(pipeline, 
inputArity, outputArity, null,
                         pipelineOutputRecordDescriptor);
                 try {
-                    startOfPipeline = pa.assemblePipeline(writer, ctx);
+                    startOfPipeline = pa.assemblePipeline(writer, ctx, 
partition);
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);
                 }
@@ -135,7 +135,7 @@
                     PipelineAssembler pa = new PipelineAssembler(pipeline, 
inputArity, outputArity,
                             pipelineInputRecordDescriptor, 
pipelineOutputRecordDescriptor);
                     try {
-                        startOfPipeline = pa.assemblePipeline(writer, ctx);
+                        startOfPipeline = pa.assemblePipeline(writer, ctx, 
partition);
                     } catch (AlgebricksException ae) {
                         throw new HyracksDataException(ae);
                     }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index cd83c94..04a733e 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -45,12 +45,12 @@
         this.outputArity = outputArity;
     }
 
-    public IFrameWriter assemblePipeline(IFrameWriter writer, 
IHyracksTaskContext ctx) throws AlgebricksException,
+    public IFrameWriter assemblePipeline(IFrameWriter writer, 
IHyracksTaskContext ctx, int partition) throws AlgebricksException,
             HyracksDataException {
         // plug the operators
         IFrameWriter start = writer;// this.writer;
         for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+            IPushRuntime newRuntime = 
pipeline.getRuntimeFactories()[i].createPushRuntime(ctx, partition);
             if (i == pipeline.getRuntimeFactories().length - 1) {
                 if (outputArity == 1) {
                     newRuntime.setFrameWriter(0, start, 
pipelineOutputRecordDescriptor);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 25eb229..897e8c0 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -69,7 +69,7 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+    public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx, int partition)
             throws AlgebricksException, HyracksDataException {
 
         RecordDescriptor pipelineOutputRecordDescriptor = null;
@@ -141,7 +141,8 @@
 
             IFrameWriter endPipe = new TupleOuterProduct();
 
-            NestedTupleSourceRuntime startOfPipeline = 
(NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
+            NestedTupleSourceRuntime startOfPipeline = 
(NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx,
+                    partition);
 
             boolean first = true;
 
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index bca301f..0436058 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -55,7 +55,7 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+    public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx, int partition)
             throws AlgebricksException {
 
         return new AbstractOneInputOneOutputPushRuntime() {
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 79d9b7d..662f29f 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -84,7 +84,8 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            int partition)
             throws AlgebricksException {
         final int[] projectionToOutColumns = new int[projectionList.length];
         for (int j = 0; j < projectionList.length; j++) {
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index a2b9652..57b870f 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -40,7 +40,7 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) 
throws HyracksDataException {
+    public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx, int 
partition) throws HyracksDataException {
         return new AbstractOneInputSourcePushRuntime() {
 
             private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5fcb9ef..b520626 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -39,7 +39,7 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int 
partition) throws HyracksDataException {
         return new NestedTupleSourceRuntime(ctx);
     }
 
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
index b77f7b8..87649c5 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -56,7 +56,7 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int 
partition) {
         IAWriter w = 
PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, 
printerFactories,
                 inputRecordDesc);
         return new SinkWriterRuntime(w, ctx, System.out, inputRecordDesc);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 8a5f38c..447e479 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -78,8 +78,8 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
-            throws AlgebricksException {
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            int partition) throws AlgebricksException {
         final int[] projectionToOutColumns = new int[projectionList.length];
         for (int j = 0; j < projectionList.length; j++) {
             projectionToOutColumns[j] = Arrays.binarySearch(outColumns, 
projectionList[j]);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
index 3f66e23..e5c97a6 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -67,7 +67,7 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
AlgebricksException {
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int 
partition) throws AlgebricksException {
         PrintStream filePrintStream = null;
         try {
             filePrintStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outputFile)));
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index ef172c7..ed93bd8 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -60,7 +60,8 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            int partition) {
         final IBinaryIntegerInspector bii = 
binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
             private final IPointable p = 
VoidPointable.FACTORY.createPointable();
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 2cea90d..57bfe7c 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -47,8 +47,8 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
-            throws AlgebricksException {
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            int partition) throws AlgebricksException {
 
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
 
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 75c3d08..69fe889 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -77,7 +77,8 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            int partition) {
         final IBinaryBooleanInspector bbi = 
binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
         return new AbstractOneInputOneOutputOneFieldFramePushRuntime() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 395d321..d40f6cf 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -54,8 +54,8 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
-            throws AlgebricksException {
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            int partition) throws AlgebricksException {
         final IPrinter[] printers = new IPrinter[printerFactories.length];
         for (int i = 0; i < printerFactories.length; i++) {
             printers[i] = printerFactories[i].createPrinter();
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 6b21cda..ae23280 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -83,8 +83,8 @@
     }
 
     @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
-            throws AlgebricksException {
+    public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            int partition) throws AlgebricksException {
 
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
diff --git 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
index 4357770..c410c0b 100644
--- 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
+++ 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
@@ -46,4 +46,9 @@
     public IHyracksTaskContext getTaskContext();
 
     public String getResourcePath();
+    
+    /**
+     * @return The resource unique storage partition
+     */
+    public int getResourcePartition();
 }
diff --git 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
index 120d880..8d618c8 100644
--- 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
+++ 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
@@ -25,5 +25,5 @@
 
 public interface IModificationOperationCallbackFactory extends Serializable {
     public IModificationOperationCallback 
createModificationOperationCallback(String resourcePath, long resourceId,
-            Object resource, IHyracksTaskContext ctx) throws 
HyracksDataException;
+            int resourcePartition, Object resource, IHyracksTaskContext ctx) 
throws HyracksDataException;
 }
diff --git 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index e465aa0..9c20d3a 100644
--- 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -45,6 +45,7 @@
     protected final boolean durable;
     protected IIndex index;
     protected final String resourcePath;
+    protected final int resourcePartition;
 
     public IndexDataflowHelper(IIndexOperatorDescriptor opDesc, final 
IHyracksTaskContext ctx, int partition,
             boolean durable) {
@@ -57,6 +58,7 @@
         this.file = IndexFileNameUtil.getIndexAbsoluteFileRef(opDesc, 
partition, ctx.getIOManager());
         this.resourcePath = file.getFile().getPath();
         this.durable = durable;
+        this.resourcePartition = 
opDesc.getFileSplitProvider().getFileSplits()[partition].getPartition();
     }
 
     protected abstract IIndex createIndexInstance() throws 
HyracksDataException;
@@ -88,7 +90,6 @@
                 resourceID = resourceIdFactory.createId();
                 ILocalResourceFactory localResourceFactory = 
opDesc.getLocalResourceFactoryProvider()
                         .getLocalResourceFactory();
-                int resourcePartition = 
opDesc.getFileSplitProvider().getFileSplits()[partition].getPartition();
                 String resourceName = 
opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile()
                         .getPath();
                 
localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID,
 resourceName,
@@ -160,4 +161,9 @@
     public String getResourcePath() {
         return resourcePath;
     }
+
+    @Override
+    public int getResourcePartition() {
+        return resourcePartition;
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 6c7e8d0..4c280ed 100644
--- 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -77,7 +77,8 @@
         try {
             writer.open();
             modCallback = 
opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResourcePath(), 
indexHelper.getResourceID(), index, ctx);
+                    indexHelper.getResourcePath(), 
indexHelper.getResourceID(), indexHelper.getResourcePartition(),
+                    index, ctx);
             indexAccessor = index.createAccessor(modCallback, 
NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = 
opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {
diff --git 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
index 17f9265..b42e619 100644
--- 
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
+++ 
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
@@ -29,9 +29,9 @@
  * Dummy NoOp callback factory used primarily for testing. Always returns the 
{@link NoOpOperationCallback} instance.
  * Implemented as an enum to preserve singleton model while being serializable
  */
-public enum NoOpOperationCallbackFactory implements 
ISearchOperationCallbackFactory,
-        IModificationOperationCallbackFactory {
-    INSTANCE;
+public enum NoOpOperationCallbackFactory
+    implements 
ISearchOperationCallbackFactory,IModificationOperationCallbackFactory {
+        INSTANCE;
 
     @Override
     public ISearchOperationCallback createSearchOperationCallback(long 
resourceId, IHyracksTaskContext ctx) {
@@ -40,7 +40,7 @@
 
     @Override
     public IModificationOperationCallback 
createModificationOperationCallback(String resourcePath, long resourceId,
-            Object resource, IHyracksTaskContext ctx) throws 
HyracksDataException {
+            int resourcePartition, Object resource, IHyracksTaskContext ctx) 
throws HyracksDataException {
         return NoOpOperationCallback.INSTANCE;
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/589
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iddd566ea97512c1dbd2217befdc2bb8822e77763
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to