http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 8701851..6052c69 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -51,6 +51,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
@@ -70,6 +71,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclu
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
@@ -234,6 +236,11 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
                     op.setPhysicalOperator(new RunningAggregatePOperator());
                     break;
                 }
+                case RANGE_FORWARD: {
+                    RangeForwardOperator rfo = (RangeForwardOperator) op;
+                    op.setPhysicalOperator(new 
RangeForwardPOperator(rfo.getRangeId(), rfo.getRangeMap()));
+                    break;
+                }
                 case REPLICATE: {
                     op.setPhysicalOperator(new ReplicatePOperator());
                     break;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
index 774dd2a..e1b7b12 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
@@ -58,7 +58,7 @@ public final class RangeId implements Serializable {
 
     @Override
     public String toString() {
-        return "RangeId(#" + id + (partition >= 0 ? "," + partition : "") + 
")";
+        return "RangeId(" + id + (partition >= 0 ? "," + partition : "") + ")";
     }
 
     @Override
@@ -74,4 +74,5 @@ public final class RangeId implements Serializable {
     public int hashCode() {
         return id;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
index c08035a..c235afb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
@@ -50,7 +50,7 @@ public class PartitionRangeDataWriter extends 
AbstractPartitionDataWriter {
     @Override
     public void open() throws HyracksDataException {
         super.open();
-        RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx.getStateObject(new RangeId(rangeId.getId(), ctx));
+        RangeForwardTaskState rangeState = 
RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
         tpc = trpcf.createPartitioner(rangeState.getRangeMap());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
index 04cfca3..0e4bc4d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
@@ -73,6 +73,16 @@ public class RangeForwardOperatorDescriptor extends 
AbstractOperatorDescriptor {
         public IRangeMap getRangeMap() {
             return rangeMap;
         }
+
+        public static RangeForwardTaskState getRangeState(int rangeId, 
IHyracksTaskContext ctx)
+                throws HyracksDataException {
+            RangeId rangeIdObject = new RangeId(rangeId, ctx);
+            RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx.getStateObject(rangeIdObject);
+            if (rangeState == null) {
+                throw new HyracksDataException("Range state is missing for " + 
rangeIdObject + ".");
+            }
+            return rangeState;
+        }
     }
 
     private final class ForwardActivityNode extends AbstractActivityNode {

Reply via email to