http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 8701851..6052c69 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -51,6 +51,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator; @@ -70,6 +71,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclu import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator; @@ -234,6 +236,11 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule op.setPhysicalOperator(new RunningAggregatePOperator()); break; } + case RANGE_FORWARD: { + RangeForwardOperator rfo = (RangeForwardOperator) op; + op.setPhysicalOperator(new RangeForwardPOperator(rfo.getRangeId(), rfo.getRangeMap())); + break; + } case REPLICATE: { op.setPhysicalOperator(new ReplicatePOperator()); break;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java index 774dd2a..e1b7b12 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java @@ -58,7 +58,7 @@ public final class RangeId implements Serializable { @Override public String toString() { - return "RangeId(#" + id + (partition >= 0 ? "," + partition : "") + ")"; + return "RangeId(" + id + (partition >= 0 ? "," + partition : "") + ")"; } @Override @@ -74,4 +74,5 @@ public final class RangeId implements Serializable { public int hashCode() { return id; } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java index c08035a..c235afb 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java @@ -50,7 +50,7 @@ public class PartitionRangeDataWriter extends AbstractPartitionDataWriter { @Override public void open() throws HyracksDataException { super.open(); - RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(new RangeId(rangeId.getId(), ctx)); + RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx); tpc = trpcf.createPartitioner(rangeState.getRangeMap()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java index 04cfca3..0e4bc4d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java @@ -73,6 +73,16 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor { public IRangeMap getRangeMap() { return rangeMap; } + + public static RangeForwardTaskState getRangeState(int rangeId, IHyracksTaskContext ctx) + throws HyracksDataException { + RangeId rangeIdObject = new RangeId(rangeId, ctx); + RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeIdObject); + if (rangeState == null) { + throw new HyracksDataException("Range state is missing for " + rangeIdObject + "."); + } + return rangeState; + } } private final class ForwardActivityNode extends AbstractActivityNode {