working partition join.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b34a426a Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b34a426a Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b34a426a Branch: refs/heads/ecarm002/interval_join_merge Commit: b34a426a48dbf16a2d313b2333eb0e6fb23227e4 Parents: ce59341 Author: Preston Carman <prest...@apache.org> Authored: Fri Sep 30 13:48:03 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Fri Sep 30 13:48:03 2016 -0700 ---------------------------------------------------------------------- .../IntervalPartitionJoinPOperator.java | 2 +- .../intervalindex/IntervalIndexJoiner.java | 62 ++-- ...IntervalPartitionJoinOperatorDescriptor.java | 319 +++++++++++++++++++ .../IntervalPartitionJoinTaskState.java | 33 ++ .../IntervalPartitionJoiner.java | 288 +++++++++++++++++ .../dataflow/common/io/RunFileReader.java | 4 + .../dataflow/std/structures/RunFilePointer.java | 102 ++++++ 7 files changed, 778 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java index 73d159e..af77a92 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.logging.Logger; import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory; -import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionJoinOperatorDescriptor; +import org.apache.asterix.runtime.operators.joins.intervalpartition2.IntervalPartitionJoinOperatorDescriptor; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index bac2f45..8b7a12e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -61,7 +61,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { private final int[] streamIndex; private final RunFileStream[] runFileStream; -// private final LinkedList<TuplePointer> buffer = new LinkedList<>(); + // private final LinkedList<TuplePointer> buffer = new LinkedList<>(); private final IIntervalMergeJoinChecker imjc; @@ -342,7 +342,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { if (activeManager[LEFT_PARTITION].addTuple(inputAccessor[LEFT_PARTITION], tp)) { processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], inputAccessor[LEFT_PARTITION], true, writer); -// buffer.add(tp); + // buffer.add(tp); } else { // Spill case freezeAndSpill(); @@ -356,10 +356,10 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } while (loadLeftTuple().isLoaded() && loadRightTuple().isLoaded() && !checkToProcessRightTuple()); // Add Results -// if (!buffer.isEmpty()) { -// processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, -// memoryAccessor[LEFT_PARTITION], true, writer); -// } + // if (!buffer.isEmpty()) { + // processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, + // memoryAccessor[LEFT_PARTITION], true, writer); + // } } private void processRightTuple(IFrameWriter writer) throws HyracksDataException { @@ -374,7 +374,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { if (activeManager[RIGHT_PARTITION].addTuple(inputAccessor[RIGHT_PARTITION], tp)) { processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION], false, writer); -// buffer.add(tp); + // buffer.add(tp); } else { // Spill case freezeAndSpill(); @@ -388,32 +388,32 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } while (loadRightTuple().isLoaded() && checkToProcessRightTuple()); // Add Results -// if (!buffer.isEmpty()) { -// processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, -// memoryAccessor[RIGHT_PARTITION], false, writer); -// } + // if (!buffer.isEmpty()) { + // processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, + // memoryAccessor[RIGHT_PARTITION], false, writer); + // } } -// private void processActiveJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor, -// List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer) -// throws HyracksDataException { -// for (TuplePointer outerTp : outer) { -// outerAccessor.reset(outerTp); -// for (TuplePointer innerTp : inner) { -// innerAccessor.reset(innerTp); -// if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, -// innerTp.getTupleIndex(), reversed)) { -// addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), -// reversed, writer); -// } -// joinComparisonCount++; -// } -// } -// if (LOGGER.isLoggable(Level.FINE)) { -// LOGGER.fine("Sweep for " + buffer.size() + " tuples"); -// } -// buffer.clear(); -// } + // private void processActiveJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor, + // List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer) + // throws HyracksDataException { + // for (TuplePointer outerTp : outer) { + // outerAccessor.reset(outerTp); + // for (TuplePointer innerTp : inner) { + // innerAccessor.reset(innerTp); + // if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, + // innerTp.getTupleIndex(), reversed)) { + // addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), + // reversed, writer); + // } + // joinComparisonCount++; + // } + // } + // if (LOGGER.isLoggable(Level.FINE)) { + // LOGGER.fine("Sweep for " + buffer.size() + " tuples"); + // } + // buffer.clear(); + // } private void processTupleJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor, ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java new file mode 100644 index 0000000..a985eee --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.runtime.operators.joins.intervalpartition2; + +import java.nio.ByteBuffer; +import java.util.logging.Logger; + +import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; +import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory; +import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionComputerFactory; +import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivity; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.RangeId; +import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage; +import org.apache.hyracks.dataflow.std.join.MergeJoinLocks; +import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState; + +public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor { + private static final long serialVersionUID = 1L; + + private static final int LEFT_ACTIVITY_ID = 0; + private static final int RIGHT_ACTIVITY_ID = 1; + private final int[] leftKeys; + private final int[] rightKeys; + private final int memoryForJoin; + private final IIntervalMergeJoinCheckerFactory imjcf; + private final RangeId rangeId; + private final int k; + + private final int probeKey; + private final int buildKey; + + private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName()); + + public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memoryForJoin, int k, + int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf, + RangeId rangeId) { + super(spec, 2, 1); + recordDescriptors[0] = recordDescriptor; + this.buildKey = leftKeys[0]; + this.probeKey = rightKeys[0]; + this.k = k; + this.leftKeys = leftKeys; + this.rightKeys = rightKeys; + this.memoryForJoin = memoryForJoin; + this.imjcf = imjcf; + this.rangeId = rangeId; + } + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + MergeJoinLocks locks = new MergeJoinLocks(); + + ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID); + ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID); + + IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, locks); + IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, locks); + + builder.addActivity(this, rightAN); + builder.addSourceEdge(1, rightAN, 0); + + builder.addActivity(this, leftAN); + builder.addSourceEdge(0, leftAN, 0); + builder.addTargetEdge(0, leftAN, 0); + } + + private class LeftJoinerActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + private final MergeJoinLocks locks; + + public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { + super(id); + this.locks = locks; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + locks.setPartitions(nPartitions); + final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + return new LeftJoinerOperator(ctx, partition, inRecordDesc); + } + + private class LeftJoinerOperator extends AbstractUnaryInputUnaryOutputOperatorNodePushable { + + private final IHyracksTaskContext ctx; + private final int partition; + private final RecordDescriptor leftRd; + private IntervalPartitionJoinTaskState state; + private boolean first = true; + + public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { + this.ctx = ctx; + this.partition = partition; + this.leftRd = inRecordDesc; + } + + @Override + public void open() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + writer.open(); + state = new IntervalPartitionJoinTaskState(ctx.getJobletContext().getJobId(), + new TaskId(getActivityId(), partition));; + state.leftRd = leftRd; + ctx.setStateObject(state); + locks.getRight(partition).signal(); + + do { + // Continue after joiner created in right branch. + if (state.partitionJoiner == null) { + locks.getLeft(partition).await(); + } + } while (state.partitionJoiner == null); + state.status.branch[LEFT_ACTIVITY_ID].setStageOpen(); + locks.getRight(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + locks.getLock(partition).lock(); + if (first) { + state.status.branch[LEFT_ACTIVITY_ID].setStageData(); + first = false; + } + try { + state.partitionJoiner.setFrame(LEFT_ACTIVITY_ID, buffer); + state.partitionJoiner.processLeftFrame(writer); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void fail() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.failed = true; + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void close() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.status.branch[LEFT_ACTIVITY_ID].noMore(); + if (state.failed) { + writer.fail(); + } else { + state.partitionJoiner.processLeftClose(writer); + writer.close(); + } + state.status.branch[LEFT_ACTIVITY_ID].setStageClose(); + locks.getRight(partition).signal(); + } finally { + locks.getLock(partition).unlock(); + } + } + } + } + + private class RightDataActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + private final ActivityId joinAid; + private final MergeJoinLocks locks; + + public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) { + super(id); + this.joinAid = joinAid; + this.locks = locks; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + locks.setPartitions(nPartitions); + RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + return new RightDataOperator(ctx, partition, inRecordDesc); + } + + private class RightDataOperator extends AbstractUnaryInputSinkOperatorNodePushable { + + private int partition; + private IHyracksTaskContext ctx; + private final RecordDescriptor rightRd; + private IntervalPartitionJoinTaskState state; + private boolean first = true; + + public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) { + this.ctx = ctx; + this.partition = partition; + this.rightRd = inRecordDesc; + } + + @Override + public void open() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + do { + // Wait for the state to be set in the context form Left. + state = (IntervalPartitionJoinTaskState) ctx.getStateObject(new TaskId(joinAid, partition)); + if (state == null) { + locks.getRight(partition).await(); + } + } while (state == null); + state.k = k; + + RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx); + long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), + partition); + long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition); + ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k, + partitionStart, partitionEnd).createPartitioner(); + ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k, + partitionStart, partitionEnd).createPartitioner(); + IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx); + + state.rightRd = rightRd; + state.partitionJoiner = new IntervalPartitionJoiner(ctx, memoryForJoin, partition, state.k, + state.status, locks, imjc, state.leftRd, state.rightRd, buildHpc, probeHpc); + state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen(); + locks.getLeft(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + locks.getLock(partition).lock(); + if (first) { + state.status.branch[RIGHT_ACTIVITY_ID].setStageData(); + first = false; + } + try { + while (!state.status.continueRightLoad + && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) { + // Wait for the state to request right frame unless left has finished. + locks.getRight(partition).await(); + } + state.partitionJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer); + state.status.continueRightLoad = false; + locks.getLeft(partition).signal(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void fail() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.failed = true; + locks.getLeft(partition).signal(); + } finally { + locks.getLock(partition).unlock(); + } + } + + @Override + public void close() throws HyracksDataException { + locks.getLock(partition).lock(); + try { + state.status.branch[RIGHT_ACTIVITY_ID].setStageClose(); + locks.getLeft(partition).signal(); + } finally { + locks.getLock(partition).unlock(); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java new file mode 100644 index 0000000..e8563c2 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators.joins.intervalpartition2; + +import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.dataflow.std.join.MergeJoinTaskState; + +public class IntervalPartitionJoinTaskState extends MergeJoinTaskState { + protected IntervalPartitionJoiner partitionJoiner; + public int k; + + public IntervalPartitionJoinTaskState(JobId jobId, TaskId taskId) { + super(jobId, taskId); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java new file mode 100644 index 0000000..fb2edd7 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators.joins.intervalpartition2; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; +import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.io.RunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo; +import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; +import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager; +import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner; +import org.apache.hyracks.dataflow.std.join.MergeJoinLocks; +import org.apache.hyracks.dataflow.std.join.MergeStatus; +import org.apache.hyracks.dataflow.std.structures.RunFilePointer; +import org.apache.hyracks.dataflow.std.structures.TuplePointer; + +public class IntervalPartitionJoiner extends AbstractMergeJoiner { + + private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName()); + + private final RunFileWriter probeRunFileWriter; + private int probeRunFilePid = -1; + + private final ITuplePartitionComputer buildHpc; + private final ITuplePartitionComputer probeHpc; + + private final int buildMemory; + private final int k; + private final int numOfPartitions; + private long buildSize = 0; + private long probeSize = 0; + private final TreeMap<RunFilePointer, Integer> probeRunFilePointers; + + private final VPartitionTupleBufferManager buildBufferManager; + private final TuplePointer tempPtr = new TuplePointer(); + private final List<Integer> buildInMemoryPartitions; + private final FrameTupleAccessor accessorBuild; + private BufferInfo bufferInfo; + + private long spillWriteCount = 0; + private long spillReadCount = 0; + private long joinComparisonCount = 0; + private long joinResultCount = 0; + private final IIntervalMergeJoinChecker imjc; + private final FrameTupleAccessor accessorProbe; + private final IFrame reloadBuffer; + private boolean moreBuildProcessing = true; + private final List<IFrameBufferManager> fbms = new ArrayList<>(); + + public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, int partition, int k, MergeStatus status, + MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, RecordDescriptor leftRd, RecordDescriptor rightRd, + ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) throws HyracksDataException { + super(ctx, partition, status, locks, leftRd, rightRd); + + bufferInfo = new BufferInfo(null, -1, -1); + + this.accessorProbe = new FrameTupleAccessor(leftRd); + reloadBuffer = new VSizeFrame(ctx); + + this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);; + this.imjc = imjc; + + // TODO fix available memory size + this.buildMemory = memorySize; + buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN, + numOfPartitions, buildMemory * ctx.getInitialFrameSize()); + + this.k = k; + this.buildHpc = buildHpc; + this.probeHpc = probeHpc; + + FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner"); + probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager()); + probeRunFileWriter.open(); + + probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC); + buildInMemoryPartitions = new LinkedList<>(); + + this.accessorBuild = new FrameTupleAccessor(rightRd); + + LOGGER.setLevel(Level.FINE); + System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel()); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize + + " frames of memory."); + } + } + + @Override + public void processLeftFrame(IFrameWriter writer) throws HyracksDataException { + while (inputAccessor[LEFT_PARTITION].exists()) { + int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), k); + + if (probeRunFilePid != pid) { + // Log new partition locations. + RunFilePointer rfp = new RunFilePointer(probeRunFileWriter.getFileSize(), + inputAccessor[LEFT_PARTITION].getTupleId()); + probeRunFilePointers.put(rfp, pid); + probeRunFilePid = pid; + } + inputAccessor[LEFT_PARTITION].next(); + probeSize++; + } + inputBuffer[LEFT_PARTITION].rewind(); + probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]); + spillWriteCount++; + } + + @Override + public void processLeftClose(IFrameWriter writer) throws HyracksDataException { + joinLoopOnMemory(writer); + + // Flush result. + resultAppender.write(writer, true); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, " + + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read."); + } + } + + private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException { + RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader(); + pReader.open(); + // Load first frame. + loadReaderNextFrame(pReader); + + while (moreBuildProcessing) { + fillMemory(); + joinMemoryBlockWithRunFile(writer, pReader); + + // Clean up + for (int pid : buildInMemoryPartitions) { + buildBufferManager.clearPartition(pid); + } + buildInMemoryPartitions.clear(); + } + pReader.close(); + } + + private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader pReader) throws HyracksDataException { + // Join Disk partitions with Memory partitions + for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) { + Pair<Integer, Integer> probe = IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId), + k); + for (int buildId : buildInMemoryPartitions) { + Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k); + if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) { + fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId)); + } + } + if (!fbms.isEmpty()) { + join(pReader, probeId, fbms, writer); + } + fbms.clear(); + } + } + + private void join(RunFileReader pReader, RunFilePointer rfpStart, List<IFrameBufferManager> buildFbms, + IFrameWriter writer) throws HyracksDataException { + long fileOffsetStart = rfpStart.getFileOffset(); + int tupleStart = rfpStart.getTupleIndex(); + + RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart); + long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : rfpEnd.getFileOffset(); + int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : rfpEnd.getTupleIndex(); + + if (pReader.getReadPointer() != fileOffsetStart) { + pReader.reset(fileOffsetStart); + loadReaderNextFrame(pReader); + } + do { + int start = pReader.getReadPointer() == fileOffsetStart ? tupleStart : 0; + int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : accessorProbe.getTupleCount(); + + for (int i = start; i < end; ++i) { + // Tuple has potential match from build phase + for (IFrameBufferManager fbm : buildFbms) { + joinTupleWithMemoryPartition(accessorProbe, i, fbm, writer); + } + } + } while (pReader.getReadPointer() < fileOffsetEnd && loadReaderNextFrame(pReader)); + } + + private boolean loadReaderNextFrame(RunFileReader pReader) throws HyracksDataException { + if (pReader.nextFrame(reloadBuffer)) { + accessorProbe.reset(reloadBuffer.getBuffer()); + spillReadCount++; + return true; + } + return false; + } + + public void joinTupleWithMemoryPartition(IFrameTupleAccessor accessorProbe, int probeTupleIndex, + IFrameBufferManager fbm, IFrameWriter writer) throws HyracksDataException { + if (fbm.getNumFrames() == 0) { + return; + } + fbm.resetIterator(); + int frameIndex = fbm.next(); + while (fbm.exists()) { + fbm.getFrame(frameIndex, bufferInfo); + accessorBuild.reset(bufferInfo.getBuffer()); + for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) { + if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) { + appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer); + } + joinComparisonCount++; + } + frameIndex = fbm.next(); + } + } + + private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe, + int probeSidetIx, IFrameWriter writer) throws HyracksDataException { + FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, buildSidetIx, accessorProbe, + probeSidetIx); + joinResultCount++; + } + + private void fillMemory() throws HyracksDataException { + int buildPid = -1; + TupleStatus ts; + for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) { + int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], inputAccessor[RIGHT_PARTITION].getTupleId(), + k); + if (!buildBufferManager.insertTuple(pid, inputAccessor[RIGHT_PARTITION], + inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) { + return; + } + + if (buildPid != pid) { + // Track new partitions in memory. + buildInMemoryPartitions.add(pid); + buildPid = pid; + } + inputAccessor[RIGHT_PARTITION].next(); + buildSize++; + } + if (ts.isEmpty()) { + moreBuildProcessing = false; + } + } + + private TupleStatus loadRightTuple() throws HyracksDataException { + TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION); + if (loaded == TupleStatus.UNKNOWN) { + loaded = pauseAndLoadRightTuple(); + } + return loaded; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java index ae2f0b4..06de054 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java @@ -105,4 +105,8 @@ public class RunFileReader implements IFrameReader { public long getReadPointer() { return readPreviousPtr; } + + public long getReadPointerCurrent() { + return readPtr; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b34a426a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/RunFilePointer.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/RunFilePointer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/RunFilePointer.java new file mode 100644 index 0000000..0c4e6c1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/RunFilePointer.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.structures; + +import java.util.Comparator; + +public final class RunFilePointer implements IResetable<RunFilePointer> { + public static final int INVALID_ID = -1; + private long fileOffset; + private int tupleIndex; + + public static final Comparator<RunFilePointer> ASC = new Comparator<RunFilePointer>() { + @Override + public int compare(RunFilePointer tp1, RunFilePointer tp2) { + int c = (int) (tp1.getFileOffset() - tp2.getFileOffset()); + if (c == 0) { + c = tp1.getTupleIndex() - tp2.getTupleIndex(); + } + return c; + } + + }; + + public static final Comparator<RunFilePointer> DESC = new Comparator<RunFilePointer>() { + @Override + public int compare(RunFilePointer tp1, RunFilePointer tp2) { + int c = (int) (tp2.getFileOffset() - tp1.getFileOffset()); + if (c == 0) { + c = tp2.getTupleIndex() - tp1.getTupleIndex(); + } + return c; + } + + }; + + public RunFilePointer() { + this(INVALID_ID, INVALID_ID); + } + + public RunFilePointer(long fileOffset, int tupleId) { + reset(fileOffset, tupleId); + } + + public long getFileOffset() { + return fileOffset; + } + + public int getTupleIndex() { + return tupleIndex; + } + + @Override + public void reset(RunFilePointer other) { + reset(other.fileOffset, other.tupleIndex); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o == null || getClass() != o.getClass()) { + return false; + } else { + final RunFilePointer that = (RunFilePointer) o; + return fileOffset == that.fileOffset && tupleIndex == that.tupleIndex; + } + } + + @Override + public int hashCode() { + int result = (int) fileOffset; + result = 31 * result + tupleIndex; + return result; + } + + public void reset(long fileOffset, int tupleId) { + this.fileOffset = fileOffset; + this.tupleIndex = tupleId; + } + + @Override + public String toString() { + return "RunFilePointer(" + fileOffset + ", " + tupleIndex + ")"; + } + +}