Enhance PhysicalOperator to use OperatorIds for references rather than random 
ids.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e14a38c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e14a38c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e14a38c5

Branch: refs/heads/diagnostics2
Commit: e14a38c58157af7f379378cdd97602ceea6fa9d2
Parents: 1bc276d
Author: Jacques Nadeau <[email protected]>
Authored: Fri May 16 08:47:51 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Mon May 19 09:11:22 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/base/AbstractBase.java  | 21 ++++--
 .../exec/physical/base/PhysicalOperator.java    | 21 ++++--
 .../apache/drill/exec/TestOpSerialization.java  | 67 ++++++++++++++++++++
 3 files changed, 96 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e14a38c5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index a79cbc3..a028252 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -18,9 +18,9 @@
 package org.apache.drill.exec.physical.base;
 
 import org.apache.drill.common.graph.GraphVisitor;
-import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 
 public abstract class AbstractBase implements PhysicalOperator{
@@ -28,7 +28,7 @@ public abstract class AbstractBase implements 
PhysicalOperator{
 
   protected long initialAllocation = 1000000L;
   protected long maxAllocation = 10000000000L;
-
+  private int id;
 
   @Override
   public void accept(GraphVisitor<PhysicalOperator> visitor) {
@@ -36,16 +36,25 @@ public abstract class AbstractBase implements 
PhysicalOperator{
     if(this.iterator() == null) throw new IllegalArgumentException("Null 
iterator for pop." + this);
     for(PhysicalOperator o : this){
       Preconditions.checkNotNull(o, String.format("Null in iterator for pop 
%s.", this));
-      o.accept(visitor);  
+      o.accept(visitor);
     }
     visitor.leave(this);
   }
-  
+
   @Override
   public boolean isExecutable() {
     return true;
   }
-  
+
+  public final void setOperatorId(int id){
+    this.id = id;
+  }
+
+  @Override
+  public int getOperatorId() {
+    return id;
+  }
+
   @Override
   public SelectionVectorMode getSVMode() {
     return SelectionVectorMode.NONE;
@@ -60,5 +69,5 @@ public abstract class AbstractBase implements 
PhysicalOperator{
   public long getMaxAllocation() {
     return maxAllocation;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e14a38c5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index db57922..483c364 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -22,42 +22,44 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.graph.GraphValue;
 import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.fasterxml.jackson.annotation.JsonIdentityInfo;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
 @JsonInclude(Include.NON_NULL)
 @JsonPropertyOrder({ "@id" })
-@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, 
property = "@id")
+@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, 
property = "@id")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "pop")
 public interface  PhysicalOperator extends GraphValue<PhysicalOperator> {
 
   /**
    * Get the cost of execution of this particular operator.
-   * 
+   *
    * @return
    */
   @JsonIgnore
   public OperatorCost getCost();
-  
+
   /**
    * Get the estimated size of this particular operator.
    * @return
    */
   @JsonIgnore
   public Size getSize();
-  
+
   /**
    * Describes whether or not a particular physical operator can actually be 
executed. Most physical operators can be
    * executed. However, Exchange nodes cannot be executed. In order to be 
executed, they must be converted into their
    * Exec sub components.
-   * 
+   *
    * @return
    */
   @JsonIgnore
@@ -70,10 +72,10 @@ public interface  PhysicalOperator extends 
GraphValue<PhysicalOperator> {
    */
   @JsonIgnore
   public SelectionVectorMode getSVMode();
-  
+
   /**
    * Provides capability to build a set of output based on traversing a query 
graph tree.
-   * 
+   *
    * @param physicalVisitor
    * @return
    */
@@ -97,4 +99,9 @@ public interface  PhysicalOperator extends 
GraphValue<PhysicalOperator> {
    */
   public long getMaxAllocation();
 
+  @JsonProperty("@id")
+  public int getOperatorId();
+
+  @JsonProperty("@id")
+  public void setOperatorId(int id);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e14a38c5/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
new file mode 100644
index 0000000..3040de2
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
@@ -0,0 +1,67 @@
+package org.apache.drill.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.PlanProperties;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.mock.MockSubScanPOP;
+import org.junit.Test;
+
+import com.google.hive12.common.collect.Lists;
+
+public class TestOpSerialization {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestOpSerialization.class);
+
+  @Test
+  public void testSerializedDeserialize() throws Throwable {
+    DrillConfig c = DrillConfig.create();
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    MockSubScanPOP s = new MockSubScanPOP("abc", null);
+    s.setOperatorId(2);
+    Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", 
ExpressionPosition.UNKNOWN), 0.1f);
+    f.setOperatorId(1);
+    Screen screen = new Screen(f, 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    screen.setOperatorId(0);
+
+    boolean reversed = false;
+    while(true){
+
+      List<PhysicalOperator> pops = Lists.newArrayList();
+      pops.add(s);
+      pops.add(f);
+      pops.add(screen);
+
+      if(reversed) pops = Lists.reverse(pops);
+      PhysicalPlan plan1 = new PhysicalPlan(PlanProperties.builder().build(), 
pops);
+      String json = plan1.unparse(c.getMapper().writer());
+      System.out.println(json);
+
+      PhysicalPlan plan2 = reader.readPhysicalPlan(json);
+      System.out.println("++++++++");
+      System.out.println(plan2.unparse(c.getMapper().writer()));
+
+      PhysicalOperator root = 
plan2.getSortedOperators(false).iterator().next();
+      assertEquals(0, root.getOperatorId());
+      PhysicalOperator o1 = root.iterator().next();
+      assertEquals(1, o1.getOperatorId());
+      PhysicalOperator o2 = o1.iterator().next();
+      assertEquals(2, o2.getOperatorId());
+      if(reversed) break;
+      reversed = !reversed;
+    }
+
+
+
+
+  }
+}

Reply via email to