Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/589
Change subject: Pass task partition to IPushRuntime
......................................................................
Pass task partition to IPushRuntime
This change includes the following:
- Pass task partition to IPushRuntime
This is needed to get partition specific info during runtime.
- Pass resource partition id IModificationOperationCallback
This is needed to include partition id to txn logs.
Change-Id: Iddd566ea97512c1dbd2217befdc2bb8822e77763
---
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
M
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
M
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
M
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
M
hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
27 files changed, 58 insertions(+), 42 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/89/589/1
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index 60fb517..ddeba74 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -25,5 +25,5 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IPushRuntimeFactory extends Serializable {
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
AlgebricksException, HyracksDataException;
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int
partition) throws AlgebricksException, HyracksDataException;
}
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index bafe8a7..fe9abf6 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -60,7 +60,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ int partition)
throws AlgebricksException {
return new AbstractOneInputOneOutputOneFramePushRuntime() {
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index fefc72e..e873864 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -54,8 +54,7 @@
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] keys, int[]
partialKeys) throws HyracksDataException {
- final AggregatorOutput outputWriter = new AggregatorOutput(subplans,
keyFieldIdx.length,
- decorFieldIdx.length);
+ final AggregatorOutput outputWriter = new AggregatorOutput(subplans,
keyFieldIdx.length, decorFieldIdx.length);
final NestedTupleSourceRuntime[] pipelines = new
NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
try {
@@ -132,8 +131,8 @@
}
@Override
- public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor,
- int tIndex, AggregateState state) throws
HyracksDataException {
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be
called");
}
@@ -152,7 +151,7 @@
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
- IPushRuntime newRuntime =
runtimeFactories[i].createPushRuntime(ctx);
+ IPushRuntime newRuntime =
runtimeFactories[i].createPushRuntime(ctx, -1);
newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i -
1]);
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 2ca933a..cae6309 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -146,7 +146,7 @@
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
- IPushRuntime newRuntime =
runtimeFactories[i].createPushRuntime(ctx);
+ IPushRuntime newRuntime =
runtimeFactories[i].createPushRuntime(ctx, -1);
newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i -
1]);
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 9fea189..eeef166 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -35,11 +35,11 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
AlgebricksException, HyracksDataException {
- return createOneOutputPushRuntime(ctx);
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int
partition) throws AlgebricksException, HyracksDataException {
+ return createOneOutputPushRuntime(ctx, partition);
}
- public abstract AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(IHyracksTaskContext ctx)
+ public abstract AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(IHyracksTaskContext ctx, int partition)
throws AlgebricksException, HyracksDataException;
}
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index 69876b0..ee349ff 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -39,7 +39,7 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
AlgebricksException {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int
partition) throws AlgebricksException {
return new AbstractOneInputSinkPushRuntime() {
@Override
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index b7a73d4..1700932 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -58,7 +58,7 @@
}
@Override
- public AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ public AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx, int partition)
throws AlgebricksException {
final IBinaryComparator[] comparators = new
IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 8322cdc..717cd8b 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -101,7 +101,7 @@
PipelineAssembler pa = new PipelineAssembler(pipeline,
inputArity, outputArity, null,
pipelineOutputRecordDescriptor);
try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
+ startOfPipeline = pa.assemblePipeline(writer, ctx,
partition);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
@@ -135,7 +135,7 @@
PipelineAssembler pa = new PipelineAssembler(pipeline,
inputArity, outputArity,
pipelineInputRecordDescriptor,
pipelineOutputRecordDescriptor);
try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
+ startOfPipeline = pa.assemblePipeline(writer, ctx,
partition);
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index cd83c94..04a733e 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -45,12 +45,12 @@
this.outputArity = outputArity;
}
- public IFrameWriter assemblePipeline(IFrameWriter writer,
IHyracksTaskContext ctx) throws AlgebricksException,
+ public IFrameWriter assemblePipeline(IFrameWriter writer,
IHyracksTaskContext ctx, int partition) throws AlgebricksException,
HyracksDataException {
// plug the operators
IFrameWriter start = writer;// this.writer;
for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
- IPushRuntime newRuntime =
pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+ IPushRuntime newRuntime =
pipeline.getRuntimeFactories()[i].createPushRuntime(ctx, partition);
if (i == pipeline.getRuntimeFactories().length - 1) {
if (outputArity == 1) {
newRuntime.setFrameWriter(0, start,
pipelineOutputRecordDescriptor);
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 25eb229..897e8c0 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -69,7 +69,7 @@
}
@Override
- public AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ public AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx, int partition)
throws AlgebricksException, HyracksDataException {
RecordDescriptor pipelineOutputRecordDescriptor = null;
@@ -141,7 +141,8 @@
IFrameWriter endPipe = new TupleOuterProduct();
- NestedTupleSourceRuntime startOfPipeline =
(NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
+ NestedTupleSourceRuntime startOfPipeline =
(NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx,
+ partition);
boolean first = true;
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index bca301f..0436058 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -55,7 +55,7 @@
}
@Override
- public AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ public AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx, int partition)
throws AlgebricksException {
return new AbstractOneInputOneOutputPushRuntime() {
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 79d9b7d..662f29f 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -84,7 +84,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ int partition)
throws AlgebricksException {
final int[] projectionToOutColumns = new int[projectionList.length];
for (int j = 0; j < projectionList.length; j++) {
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index a2b9652..57b870f 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -40,7 +40,7 @@
}
@Override
- public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
+ public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx, int
partition) throws HyracksDataException {
return new AbstractOneInputSourcePushRuntime() {
private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5fcb9ef..b520626 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -39,7 +39,7 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int
partition) throws HyracksDataException {
return new NestedTupleSourceRuntime(ctx);
}
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
index b77f7b8..87649c5 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -56,7 +56,7 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int
partition) {
IAWriter w =
PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out,
printerFactories,
inputRecordDesc);
return new SinkWriterRuntime(w, ctx, System.out, inputRecordDesc);
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 8a5f38c..447e479 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -78,8 +78,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
+ public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ int partition) throws AlgebricksException {
final int[] projectionToOutColumns = new int[projectionList.length];
for (int j = 0; j < projectionList.length; j++) {
projectionToOutColumns[j] = Arrays.binarySearch(outColumns,
projectionList[j]);
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
index 3f66e23..e5c97a6 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -67,7 +67,7 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
AlgebricksException {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx, int
partition) throws AlgebricksException {
PrintStream filePrintStream = null;
try {
filePrintStream = new PrintStream(new BufferedOutputStream(new
FileOutputStream(outputFile)));
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index ef172c7..ed93bd8 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -60,7 +60,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+ public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ int partition) {
final IBinaryIntegerInspector bii =
binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
return new AbstractOneInputOneOutputOneFramePushRuntime() {
private final IPointable p =
VoidPointable.FACTORY.createPointable();
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 2cea90d..57bfe7c 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -47,8 +47,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
+ public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ int partition) throws AlgebricksException {
return new AbstractOneInputOneOutputOneFramePushRuntime() {
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 75c3d08..69fe889 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -77,7 +77,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+ public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ int partition) {
final IBinaryBooleanInspector bbi =
binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
return new AbstractOneInputOneOutputOneFieldFramePushRuntime() {
private IPointable p = VoidPointable.FACTORY.createPointable();
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 395d321..d40f6cf 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -54,8 +54,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
+ public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ int partition) throws AlgebricksException {
final IPrinter[] printers = new IPrinter[printerFactories.length];
for (int i = 0; i < printerFactories.length; i++) {
printers[i] = printerFactories[i].createPrinter();
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 6b21cda..ae23280 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -83,8 +83,8 @@
}
@Override
- public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
- throws AlgebricksException {
+ public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ int partition) throws AlgebricksException {
return new AbstractOneInputOneOutputOneFramePushRuntime() {
private IPointable p = VoidPointable.FACTORY.createPointable();
diff --git
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
index 4357770..c410c0b 100644
---
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
+++
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
@@ -46,4 +46,9 @@
public IHyracksTaskContext getTaskContext();
public String getResourcePath();
+
+ /**
+ * @return The resource unique storage partition
+ */
+ public int getResourcePartition();
}
diff --git
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
index 120d880..8d618c8 100644
---
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
+++
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
@@ -25,5 +25,5 @@
public interface IModificationOperationCallbackFactory extends Serializable {
public IModificationOperationCallback
createModificationOperationCallback(String resourcePath, long resourceId,
- Object resource, IHyracksTaskContext ctx) throws
HyracksDataException;
+ int resourcePartition, Object resource, IHyracksTaskContext ctx)
throws HyracksDataException;
}
diff --git
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index e465aa0..9c20d3a 100644
---
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -45,6 +45,7 @@
protected final boolean durable;
protected IIndex index;
protected final String resourcePath;
+ protected final int resourcePartition;
public IndexDataflowHelper(IIndexOperatorDescriptor opDesc, final
IHyracksTaskContext ctx, int partition,
boolean durable) {
@@ -57,6 +58,7 @@
this.file = IndexFileNameUtil.getIndexAbsoluteFileRef(opDesc,
partition, ctx.getIOManager());
this.resourcePath = file.getFile().getPath();
this.durable = durable;
+ this.resourcePartition =
opDesc.getFileSplitProvider().getFileSplits()[partition].getPartition();
}
protected abstract IIndex createIndexInstance() throws
HyracksDataException;
@@ -88,7 +90,6 @@
resourceID = resourceIdFactory.createId();
ILocalResourceFactory localResourceFactory =
opDesc.getLocalResourceFactoryProvider()
.getLocalResourceFactory();
- int resourcePartition =
opDesc.getFileSplitProvider().getFileSplits()[partition].getPartition();
String resourceName =
opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile()
.getPath();
localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID,
resourceName,
@@ -160,4 +161,9 @@
public String getResourcePath() {
return resourcePath;
}
+
+ @Override
+ public int getResourcePartition() {
+ return resourcePartition;
+ }
}
\ No newline at end of file
diff --git
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 6c7e8d0..4c280ed 100644
---
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -77,7 +77,8 @@
try {
writer.open();
modCallback =
opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
- indexHelper.getResourcePath(),
indexHelper.getResourceID(), index, ctx);
+ indexHelper.getResourcePath(),
indexHelper.getResourceID(), indexHelper.getResourcePartition(),
+ index, ctx);
indexAccessor = index.createAccessor(modCallback,
NoOpOperationCallback.INSTANCE);
ITupleFilterFactory tupleFilterFactory =
opDesc.getTupleFilterFactory();
if (tupleFilterFactory != null) {
diff --git
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
index 17f9265..b42e619 100644
---
a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
+++
b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallbackFactory.java
@@ -29,9 +29,9 @@
* Dummy NoOp callback factory used primarily for testing. Always returns the
{@link NoOpOperationCallback} instance.
* Implemented as an enum to preserve singleton model while being serializable
*/
-public enum NoOpOperationCallbackFactory implements
ISearchOperationCallbackFactory,
- IModificationOperationCallbackFactory {
- INSTANCE;
+public enum NoOpOperationCallbackFactory
+ implements
ISearchOperationCallbackFactory,IModificationOperationCallbackFactory {
+ INSTANCE;
@Override
public ISearchOperationCallback createSearchOperationCallback(long
resourceId, IHyracksTaskContext ctx) {
@@ -40,7 +40,7 @@
@Override
public IModificationOperationCallback
createModificationOperationCallback(String resourcePath, long resourceId,
- Object resource, IHyracksTaskContext ctx) throws
HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx)
throws HyracksDataException {
return NoOpOperationCallback.INSTANCE;
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/589
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iddd566ea97512c1dbd2217befdc2bb8822e77763
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>