working partition join.

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b34a426a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b34a426a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b34a426a

Branch: refs/heads/ecarm002/interval_join_merge
Commit: b34a426a48dbf16a2d313b2333eb0e6fb23227e4
Parents: ce59341
Author: Preston Carman <prest...@apache.org>
Authored: Fri Sep 30 13:48:03 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Fri Sep 30 13:48:03 2016 -0700

----------------------------------------------------------------------
 .../IntervalPartitionJoinPOperator.java         |   2 +-
 .../intervalindex/IntervalIndexJoiner.java      |  62 ++--
 ...IntervalPartitionJoinOperatorDescriptor.java | 319 +++++++++++++++++++
 .../IntervalPartitionJoinTaskState.java         |  33 ++
 .../IntervalPartitionJoiner.java                | 288 +++++++++++++++++
 .../dataflow/common/io/RunFileReader.java       |   4 +
 .../dataflow/std/structures/RunFilePointer.java | 102 ++++++
 7 files changed, 778 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
index 73d159e..af77a92 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.logging.Logger;
 
 import 
org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory;
-import 
org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionJoinOperatorDescriptor;
+import 
org.apache.asterix.runtime.operators.joins.intervalpartition2.IntervalPartitionJoinOperatorDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index bac2f45..8b7a12e 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -61,7 +61,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
     private final int[] streamIndex;
     private final RunFileStream[] runFileStream;
 
-//    private final LinkedList<TuplePointer> buffer = new LinkedList<>();
+    //    private final LinkedList<TuplePointer> buffer = new LinkedList<>();
 
     private final IIntervalMergeJoinChecker imjc;
 
@@ -342,7 +342,7 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
                 if 
(activeManager[LEFT_PARTITION].addTuple(inputAccessor[LEFT_PARTITION], tp)) {
                     
processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION],
                             inputAccessor[LEFT_PARTITION], true, writer);
-//                    buffer.add(tp);
+                    //                    buffer.add(tp);
                 } else {
                     // Spill case
                     freezeAndSpill();
@@ -356,10 +356,10 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         } while (loadLeftTuple().isLoaded() && loadRightTuple().isLoaded() && 
!checkToProcessRightTuple());
 
         // Add Results
-//        if (!buffer.isEmpty()) {
-//            
processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer,
-//                    memoryAccessor[LEFT_PARTITION], true, writer);
-//        }
+        //        if (!buffer.isEmpty()) {
+        //            
processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer,
+        //                    memoryAccessor[LEFT_PARTITION], true, writer);
+        //        }
     }
 
     private void processRightTuple(IFrameWriter writer) throws 
HyracksDataException {
@@ -374,7 +374,7 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
                 if 
(activeManager[RIGHT_PARTITION].addTuple(inputAccessor[RIGHT_PARTITION], tp)) {
                     
processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION],
                             inputAccessor[RIGHT_PARTITION], false, writer);
-//                    buffer.add(tp);
+                    //                    buffer.add(tp);
                 } else {
                     // Spill case
                     freezeAndSpill();
@@ -388,32 +388,32 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         } while (loadRightTuple().isLoaded() && checkToProcessRightTuple());
 
         // Add Results
-//        if (!buffer.isEmpty()) {
-//            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer,
-//                    memoryAccessor[RIGHT_PARTITION], false, writer);
-//        }
+        //        if (!buffer.isEmpty()) {
+        //            
processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer,
+        //                    memoryAccessor[RIGHT_PARTITION], false, writer);
+        //        }
     }
 
-//    private void processActiveJoin(List<TuplePointer> outer, 
ITuplePointerAccessor outerAccessor,
-//            List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, 
boolean reversed, IFrameWriter writer)
-//            throws HyracksDataException {
-//        for (TuplePointer outerTp : outer) {
-//            outerAccessor.reset(outerTp);
-//            for (TuplePointer innerTp : inner) {
-//                innerAccessor.reset(innerTp);
-//                if (imjc.checkToSaveInResult(outerAccessor, 
outerTp.getTupleIndex(), innerAccessor,
-//                        innerTp.getTupleIndex(), reversed)) {
-//                    addToResult(outerAccessor, outerTp.getTupleIndex(), 
innerAccessor, innerTp.getTupleIndex(),
-//                            reversed, writer);
-//                }
-//                joinComparisonCount++;
-//            }
-//        }
-//        if (LOGGER.isLoggable(Level.FINE)) {
-//            LOGGER.fine("Sweep for " + buffer.size() + " tuples");
-//        }
-//        buffer.clear();
-//    }
+    //    private void processActiveJoin(List<TuplePointer> outer, 
ITuplePointerAccessor outerAccessor,
+    //            List<TuplePointer> inner, ITuplePointerAccessor 
innerAccessor, boolean reversed, IFrameWriter writer)
+    //            throws HyracksDataException {
+    //        for (TuplePointer outerTp : outer) {
+    //            outerAccessor.reset(outerTp);
+    //            for (TuplePointer innerTp : inner) {
+    //                innerAccessor.reset(innerTp);
+    //                if (imjc.checkToSaveInResult(outerAccessor, 
outerTp.getTupleIndex(), innerAccessor,
+    //                        innerTp.getTupleIndex(), reversed)) {
+    //                    addToResult(outerAccessor, outerTp.getTupleIndex(), 
innerAccessor, innerTp.getTupleIndex(),
+    //                            reversed, writer);
+    //                }
+    //                joinComparisonCount++;
+    //            }
+    //        }
+    //        if (LOGGER.isLoggable(Level.FINE)) {
+    //            LOGGER.fine("Sweep for " + buffer.size() + " tuples");
+    //        }
+    //        buffer.clear();
+    //    }
 
     private void processTupleJoin(List<TuplePointer> outer, 
ITuplePointerAccessor outerAccessor,
             ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter 
writer) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
new file mode 100644
index 0000000..a985eee
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators.joins.intervalpartition2;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Logger;
+
+import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
+import 
org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory;
+import 
org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionComputerFactory;
+import 
org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.RangeId;
+import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
+import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
+import 
org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
+
+public class IntervalPartitionJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private static final int LEFT_ACTIVITY_ID = 0;
+    private static final int RIGHT_ACTIVITY_ID = 1;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final int memoryForJoin;
+    private final IIntervalMergeJoinCheckerFactory imjcf;
+    private final RangeId rangeId;
+    private final int k;
+
+    private final int probeKey;
+    private final int buildKey;
+
+    private static final Logger LOGGER = 
Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
+
+    public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry 
spec, int memoryForJoin, int k,
+            int[] leftKeys, int[] rightKeys, RecordDescriptor 
recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
+            RangeId rangeId) {
+        super(spec, 2, 1);
+        recordDescriptors[0] = recordDescriptor;
+        this.buildKey = leftKeys[0];
+        this.probeKey = rightKeys[0];
+        this.k = k;
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.memoryForJoin = memoryForJoin;
+        this.imjcf = imjcf;
+        this.rangeId = rangeId;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        MergeJoinLocks locks = new MergeJoinLocks();
+
+        ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID);
+        ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID);
+
+        IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, 
locks);
+        IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, 
locks);
+
+        builder.addActivity(this, rightAN);
+        builder.addSourceEdge(1, rightAN, 0);
+
+        builder.addActivity(this, leftAN);
+        builder.addSourceEdge(0, leftAN, 0);
+        builder.addTargetEdge(0, leftAN, 0);
+    }
+
+    private class LeftJoinerActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final MergeJoinLocks locks;
+
+        public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, 
MergeJoinLocks locks) {
+            super(id);
+            this.locks = locks;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
+                throws HyracksDataException {
+            locks.setPartitions(nPartitions);
+            final RecordDescriptor inRecordDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            return new LeftJoinerOperator(ctx, partition, inRecordDesc);
+        }
+
+        private class LeftJoinerOperator extends 
AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+            private final IHyracksTaskContext ctx;
+            private final int partition;
+            private final RecordDescriptor leftRd;
+            private IntervalPartitionJoinTaskState state;
+            private boolean first = true;
+
+            public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, 
RecordDescriptor inRecordDesc) {
+                this.ctx = ctx;
+                this.partition = partition;
+                this.leftRd = inRecordDesc;
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    writer.open();
+                    state = new 
IntervalPartitionJoinTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));;
+                    state.leftRd = leftRd;
+                    ctx.setStateObject(state);
+                    locks.getRight(partition).signal();
+
+                    do {
+                        // Continue after joiner created in right branch.
+                        if (state.partitionJoiner == null) {
+                            locks.getLeft(partition).await();
+                        }
+                    } while (state.partitionJoiner == null);
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageOpen();
+                    locks.getRight(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                locks.getLock(partition).lock();
+                if (first) {
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageData();
+                    first = false;
+                }
+                try {
+                    state.partitionJoiner.setFrame(LEFT_ACTIVITY_ID, buffer);
+                    state.partitionJoiner.processLeftFrame(writer);
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.failed = true;
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.status.branch[LEFT_ACTIVITY_ID].noMore();
+                    if (state.failed) {
+                        writer.fail();
+                    } else {
+                        state.partitionJoiner.processLeftClose(writer);
+                        writer.close();
+                    }
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageClose();
+                    locks.getRight(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+        }
+    }
+
+    private class RightDataActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final ActivityId joinAid;
+        private final MergeJoinLocks locks;
+
+        public RightDataActivityNode(ActivityId id, ActivityId joinAid, 
MergeJoinLocks locks) {
+            super(id);
+            this.joinAid = joinAid;
+            this.locks = locks;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
+                throws HyracksDataException {
+            locks.setPartitions(nPartitions);
+            RecordDescriptor inRecordDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            return new RightDataOperator(ctx, partition, inRecordDesc);
+        }
+
+        private class RightDataOperator extends 
AbstractUnaryInputSinkOperatorNodePushable {
+
+            private int partition;
+            private IHyracksTaskContext ctx;
+            private final RecordDescriptor rightRd;
+            private IntervalPartitionJoinTaskState state;
+            private boolean first = true;
+
+            public RightDataOperator(IHyracksTaskContext ctx, int partition, 
RecordDescriptor inRecordDesc) {
+                this.ctx = ctx;
+                this.partition = partition;
+                this.rightRd = inRecordDesc;
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    do {
+                        // Wait for the state to be set in the context form 
Left.
+                        state = (IntervalPartitionJoinTaskState) 
ctx.getStateObject(new TaskId(joinAid, partition));
+                        if (state == null) {
+                            locks.getRight(partition).await();
+                        }
+                    } while (state == null);
+                    state.k = k;
+
+                    RangeForwardTaskState rangeState = 
RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
+                    long partitionStart = 
IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
+                            partition);
+                    long partitionEnd = 
IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
+                    ITuplePartitionComputer buildHpc = new 
IntervalPartitionComputerFactory(buildKey, state.k,
+                            partitionStart, partitionEnd).createPartitioner();
+                    ITuplePartitionComputer probeHpc = new 
IntervalPartitionComputerFactory(probeKey, state.k,
+                            partitionStart, partitionEnd).createPartitioner();
+                    IIntervalMergeJoinChecker imjc = 
imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx);
+
+                    state.rightRd = rightRd;
+                    state.partitionJoiner = new IntervalPartitionJoiner(ctx, 
memoryForJoin, partition, state.k,
+                            state.status, locks, imjc, state.leftRd, 
state.rightRd, buildHpc, probeHpc);
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen();
+                    locks.getLeft(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                locks.getLock(partition).lock();
+                if (first) {
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageData();
+                    first = false;
+                }
+                try {
+                    while (!state.status.continueRightLoad
+                            && 
state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
+                        // Wait for the state to request right frame unless 
left has finished.
+                        locks.getRight(partition).await();
+                    }
+                    state.partitionJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer);
+                    state.status.continueRightLoad = false;
+                    locks.getLeft(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.failed = true;
+                    locks.getLeft(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageClose();
+                    locks.getLeft(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
new file mode 100644
index 0000000..e8563c2
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.intervalpartition2;
+
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.std.join.MergeJoinTaskState;
+
+public class IntervalPartitionJoinTaskState extends MergeJoinTaskState {
+    protected IntervalPartitionJoiner partitionJoiner;
+    public int k;
+
+    public IntervalPartitionJoinTaskState(JobId jobId, TaskId taskId) {
+        super(jobId, taskId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
new file mode 100644
index 0000000..fb2edd7
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.intervalpartition2;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
+import 
org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
+import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
+import 
org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
+import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner;
+import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
+import org.apache.hyracks.dataflow.std.join.MergeStatus;
+import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public class IntervalPartitionJoiner extends AbstractMergeJoiner {
+
+    private static final Logger LOGGER = 
Logger.getLogger(IntervalPartitionJoiner.class.getName());
+
+    private final RunFileWriter probeRunFileWriter;
+    private int probeRunFilePid = -1;
+
+    private final ITuplePartitionComputer buildHpc;
+    private final ITuplePartitionComputer probeHpc;
+
+    private final int buildMemory;
+    private final int k;
+    private final int numOfPartitions;
+    private long buildSize = 0;
+    private long probeSize = 0;
+    private final TreeMap<RunFilePointer, Integer> probeRunFilePointers;
+
+    private final VPartitionTupleBufferManager buildBufferManager;
+    private final TuplePointer tempPtr = new TuplePointer();
+    private final List<Integer> buildInMemoryPartitions;
+    private final FrameTupleAccessor accessorBuild;
+    private BufferInfo bufferInfo;
+
+    private long spillWriteCount = 0;
+    private long spillReadCount = 0;
+    private long joinComparisonCount = 0;
+    private long joinResultCount = 0;
+    private final IIntervalMergeJoinChecker imjc;
+    private final FrameTupleAccessor accessorProbe;
+    private final IFrame reloadBuffer;
+    private boolean moreBuildProcessing = true;
+    private final List<IFrameBufferManager> fbms = new ArrayList<>();
+
+    public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, 
int partition, int k, MergeStatus status,
+            MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, 
RecordDescriptor leftRd, RecordDescriptor rightRd,
+            ITuplePartitionComputer buildHpc, ITuplePartitionComputer 
probeHpc) throws HyracksDataException {
+        super(ctx, partition, status, locks, leftRd, rightRd);
+
+        bufferInfo = new BufferInfo(null, -1, -1);
+
+        this.accessorProbe = new FrameTupleAccessor(leftRd);
+        reloadBuffer = new VSizeFrame(ctx);
+
+        this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);;
+        this.imjc = imjc;
+
+        // TODO fix available memory size
+        this.buildMemory = memorySize;
+        buildBufferManager = new VPartitionTupleBufferManager(ctx, 
VPartitionTupleBufferManager.NO_CONSTRAIN,
+                numOfPartitions, buildMemory * ctx.getInitialFrameSize());
+
+        this.k = k;
+        this.buildHpc = buildHpc;
+        this.probeHpc = probeHpc;
+
+        FileReference file = 
ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner");
+        probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager());
+        probeRunFileWriter.open();
+
+        probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC);
+        buildInMemoryPartitions = new LinkedList<>();
+
+        this.accessorBuild = new FrameTupleAccessor(rightRd);
+
+        LOGGER.setLevel(Level.FINE);
+        System.out.println("IntervalIndexJoiner: Logging level is: " + 
LOGGER.getLevel());
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("IntervalIndexJoiner has started partition " + 
partition + " with " + memorySize
+                    + " frames of memory.");
+        }
+    }
+
+    @Override
+    public void processLeftFrame(IFrameWriter writer) throws 
HyracksDataException {
+        while (inputAccessor[LEFT_PARTITION].exists()) {
+            int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(), k);
+
+            if (probeRunFilePid != pid) {
+                // Log new partition locations.
+                RunFilePointer rfp = new 
RunFilePointer(probeRunFileWriter.getFileSize(),
+                        inputAccessor[LEFT_PARTITION].getTupleId());
+                probeRunFilePointers.put(rfp, pid);
+                probeRunFilePid = pid;
+            }
+            inputAccessor[LEFT_PARTITION].next();
+            probeSize++;
+        }
+        inputBuffer[LEFT_PARTITION].rewind();
+        probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]);
+        spillWriteCount++;
+    }
+
+    @Override
+    public void processLeftClose(IFrameWriter writer) throws 
HyracksDataException {
+        joinLoopOnMemory(writer);
+
+        // Flush result.
+        resultAppender.write(writer, true);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " 
+ joinComparisonCount + " comparisons, "
+                    + joinResultCount + " results, " + spillWriteCount + " 
written, " + spillReadCount + " read.");
+        }
+    }
+
+    private void joinLoopOnMemory(IFrameWriter writer) throws 
HyracksDataException {
+        RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader();
+        pReader.open();
+        // Load first frame.
+        loadReaderNextFrame(pReader);
+
+        while (moreBuildProcessing) {
+            fillMemory();
+            joinMemoryBlockWithRunFile(writer, pReader);
+
+            // Clean up
+            for (int pid : buildInMemoryPartitions) {
+                buildBufferManager.clearPartition(pid);
+            }
+            buildInMemoryPartitions.clear();
+        }
+        pReader.close();
+    }
+
+    private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader 
pReader) throws HyracksDataException {
+        // Join Disk partitions with Memory partitions
+        for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) {
+            Pair<Integer, Integer> probe = 
IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId),
+                    k);
+            for (int buildId : buildInMemoryPartitions) {
+                Pair<Integer, Integer> build = 
IntervalPartitionUtil.getIntervalPartition(buildId, k);
+                if (imjc.compareIntervalPartition(probe.first, probe.second, 
build.first, build.second)) {
+                    
fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId));
+                }
+            }
+            if (!fbms.isEmpty()) {
+                join(pReader, probeId, fbms, writer);
+            }
+            fbms.clear();
+        }
+    }
+
+    private void join(RunFileReader pReader, RunFilePointer rfpStart, 
List<IFrameBufferManager> buildFbms,
+            IFrameWriter writer) throws HyracksDataException {
+        long fileOffsetStart = rfpStart.getFileOffset();
+        int tupleStart = rfpStart.getTupleIndex();
+
+        RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart);
+        long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : 
rfpEnd.getFileOffset();
+        int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : 
rfpEnd.getTupleIndex();
+
+        if (pReader.getReadPointer() != fileOffsetStart) {
+            pReader.reset(fileOffsetStart);
+            loadReaderNextFrame(pReader);
+        }
+        do {
+            int start = pReader.getReadPointer() == fileOffsetStart ? 
tupleStart : 0;
+            int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : 
accessorProbe.getTupleCount();
+
+            for (int i = start; i < end; ++i) {
+                // Tuple has potential match from build phase
+                for (IFrameBufferManager fbm : buildFbms) {
+                    joinTupleWithMemoryPartition(accessorProbe, i, fbm, 
writer);
+                }
+            }
+        } while (pReader.getReadPointer() < fileOffsetEnd && 
loadReaderNextFrame(pReader));
+    }
+
+    private boolean loadReaderNextFrame(RunFileReader pReader) throws 
HyracksDataException {
+        if (pReader.nextFrame(reloadBuffer)) {
+            accessorProbe.reset(reloadBuffer.getBuffer());
+            spillReadCount++;
+            return true;
+        }
+        return false;
+    }
+
+    public void joinTupleWithMemoryPartition(IFrameTupleAccessor 
accessorProbe, int probeTupleIndex,
+            IFrameBufferManager fbm, IFrameWriter writer) throws 
HyracksDataException {
+        if (fbm.getNumFrames() == 0) {
+            return;
+        }
+        fbm.resetIterator();
+        int frameIndex = fbm.next();
+        while (fbm.exists()) {
+            fbm.getFrame(frameIndex, bufferInfo);
+            accessorBuild.reset(bufferInfo.getBuffer());
+            for (int buildTupleIndex = 0; buildTupleIndex < 
accessorBuild.getTupleCount(); ++buildTupleIndex) {
+                if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, 
accessorProbe, probeTupleIndex, false)) {
+                    appendToResult(accessorBuild, buildTupleIndex, 
accessorProbe, probeTupleIndex, writer);
+                }
+                joinComparisonCount++;
+            }
+            frameIndex = fbm.next();
+        }
+    }
+
+    private void appendToResult(IFrameTupleAccessor accessorBuild, int 
buildSidetIx, IFrameTupleAccessor accessorProbe,
+            int probeSidetIx, IFrameWriter writer) throws HyracksDataException 
{
+        FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, 
buildSidetIx, accessorProbe,
+                probeSidetIx);
+        joinResultCount++;
+    }
+
+    private void fillMemory() throws HyracksDataException {
+        int buildPid = -1;
+        TupleStatus ts;
+        for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) {
+            int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], 
inputAccessor[RIGHT_PARTITION].getTupleId(),
+                    k);
+            if (!buildBufferManager.insertTuple(pid, 
inputAccessor[RIGHT_PARTITION],
+                    inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) {
+                return;
+            }
+
+            if (buildPid != pid) {
+                // Track new partitions in memory.
+                buildInMemoryPartitions.add(pid);
+                buildPid = pid;
+            }
+            inputAccessor[RIGHT_PARTITION].next();
+            buildSize++;
+        }
+        if (ts.isEmpty()) {
+            moreBuildProcessing = false;
+        }
+    }
+
+    private TupleStatus loadRightTuple() throws HyracksDataException {
+        TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION);
+        if (loaded == TupleStatus.UNKNOWN) {
+            loaded = pauseAndLoadRightTuple();
+        }
+        return loaded;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index ae2f0b4..06de054 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -105,4 +105,8 @@ public class RunFileReader implements IFrameReader {
     public long getReadPointer() {
         return readPreviousPtr;
     }
+
+    public long getReadPointerCurrent() {
+        return readPtr;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/RunFilePointer.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/RunFilePointer.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/RunFilePointer.java
new file mode 100644
index 0000000..0c4e6c1
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/RunFilePointer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import java.util.Comparator;
+
+public final class RunFilePointer implements IResetable<RunFilePointer> {
+    public static final int INVALID_ID = -1;
+    private long fileOffset;
+    private int tupleIndex;
+
+    public static final Comparator<RunFilePointer> ASC = new 
Comparator<RunFilePointer>() {
+        @Override
+        public int compare(RunFilePointer tp1, RunFilePointer tp2) {
+            int c = (int) (tp1.getFileOffset() - tp2.getFileOffset());
+            if (c == 0) {
+                c = tp1.getTupleIndex() - tp2.getTupleIndex();
+            }
+            return c;
+        }
+
+    };
+
+    public static final Comparator<RunFilePointer> DESC = new 
Comparator<RunFilePointer>() {
+        @Override
+        public int compare(RunFilePointer tp1, RunFilePointer tp2) {
+            int c = (int) (tp2.getFileOffset() - tp1.getFileOffset());
+            if (c == 0) {
+                c = tp2.getTupleIndex() - tp1.getTupleIndex();
+            }
+            return c;
+        }
+
+    };
+
+    public RunFilePointer() {
+        this(INVALID_ID, INVALID_ID);
+    }
+
+    public RunFilePointer(long fileOffset, int tupleId) {
+        reset(fileOffset, tupleId);
+    }
+
+    public long getFileOffset() {
+        return fileOffset;
+    }
+
+    public int getTupleIndex() {
+        return tupleIndex;
+    }
+
+    @Override
+    public void reset(RunFilePointer other) {
+        reset(other.fileOffset, other.tupleIndex);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        } else if (o == null || getClass() != o.getClass()) {
+            return false;
+        } else {
+            final RunFilePointer that = (RunFilePointer) o;
+            return fileOffset == that.fileOffset && tupleIndex == 
that.tupleIndex;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) fileOffset;
+        result = 31 * result + tupleIndex;
+        return result;
+    }
+
+    public void reset(long fileOffset, int tupleId) {
+        this.fileOffset = fileOffset;
+        this.tupleIndex = tupleId;
+    }
+
+    @Override
+    public String toString() {
+        return "RunFilePointer(" + fileOffset + ", " + tupleIndex + ")";
+    }
+
+}

Reply via email to