Enhance PhysicalOperator to use OperatorIds for references rather than random ids.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e14a38c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e14a38c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e14a38c5 Branch: refs/heads/diagnostics2 Commit: e14a38c58157af7f379378cdd97602ceea6fa9d2 Parents: 1bc276d Author: Jacques Nadeau <[email protected]> Authored: Fri May 16 08:47:51 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon May 19 09:11:22 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/base/AbstractBase.java | 21 ++++-- .../exec/physical/base/PhysicalOperator.java | 21 ++++-- .../apache/drill/exec/TestOpSerialization.java | 67 ++++++++++++++++++++ 3 files changed, 96 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e14a38c5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index a79cbc3..a028252 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -18,9 +18,9 @@ package org.apache.drill.exec.physical.base; import org.apache.drill.common.graph.GraphVisitor; -import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; public abstract class AbstractBase implements PhysicalOperator{ @@ -28,7 +28,7 @@ public abstract class AbstractBase implements PhysicalOperator{ protected long initialAllocation = 1000000L; protected long maxAllocation = 10000000000L; - + private int id; @Override public void accept(GraphVisitor<PhysicalOperator> visitor) { @@ -36,16 +36,25 @@ public abstract class AbstractBase implements PhysicalOperator{ if(this.iterator() == null) throw new IllegalArgumentException("Null iterator for pop." + this); for(PhysicalOperator o : this){ Preconditions.checkNotNull(o, String.format("Null in iterator for pop %s.", this)); - o.accept(visitor); + o.accept(visitor); } visitor.leave(this); } - + @Override public boolean isExecutable() { return true; } - + + public final void setOperatorId(int id){ + this.id = id; + } + + @Override + public int getOperatorId() { + return id; + } + @Override public SelectionVectorMode getSVMode() { return SelectionVectorMode.NONE; @@ -60,5 +69,5 @@ public abstract class AbstractBase implements PhysicalOperator{ public long getMaxAllocation() { return maxAllocation; } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e14a38c5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index db57922..483c364 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -22,42 +22,44 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.graph.GraphValue; import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonInclude(Include.NON_NULL) @JsonPropertyOrder({ "@id" }) -@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id") +@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "@id") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop") public interface PhysicalOperator extends GraphValue<PhysicalOperator> { /** * Get the cost of execution of this particular operator. - * + * * @return */ @JsonIgnore public OperatorCost getCost(); - + /** * Get the estimated size of this particular operator. * @return */ @JsonIgnore public Size getSize(); - + /** * Describes whether or not a particular physical operator can actually be executed. Most physical operators can be * executed. However, Exchange nodes cannot be executed. In order to be executed, they must be converted into their * Exec sub components. - * + * * @return */ @JsonIgnore @@ -70,10 +72,10 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { */ @JsonIgnore public SelectionVectorMode getSVMode(); - + /** * Provides capability to build a set of output based on traversing a query graph tree. - * + * * @param physicalVisitor * @return */ @@ -97,4 +99,9 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { */ public long getMaxAllocation(); + @JsonProperty("@id") + public int getOperatorId(); + + @JsonProperty("@id") + public void setOperatorId(int id); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e14a38c5/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java new file mode 100644 index 0000000..3040de2 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java @@ -0,0 +1,67 @@ +package org.apache.drill.exec; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.logical.PlanProperties; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.Screen; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.store.mock.MockSubScanPOP; +import org.junit.Test; + +import com.google.hive12.common.collect.Lists; + +public class TestOpSerialization { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOpSerialization.class); + + @Test + public void testSerializedDeserialize() throws Throwable { + DrillConfig c = DrillConfig.create(); + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + MockSubScanPOP s = new MockSubScanPOP("abc", null); + s.setOperatorId(2); + Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", ExpressionPosition.UNKNOWN), 0.1f); + f.setOperatorId(1); + Screen screen = new Screen(f, CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + screen.setOperatorId(0); + + boolean reversed = false; + while(true){ + + List<PhysicalOperator> pops = Lists.newArrayList(); + pops.add(s); + pops.add(f); + pops.add(screen); + + if(reversed) pops = Lists.reverse(pops); + PhysicalPlan plan1 = new PhysicalPlan(PlanProperties.builder().build(), pops); + String json = plan1.unparse(c.getMapper().writer()); + System.out.println(json); + + PhysicalPlan plan2 = reader.readPhysicalPlan(json); + System.out.println("++++++++"); + System.out.println(plan2.unparse(c.getMapper().writer())); + + PhysicalOperator root = plan2.getSortedOperators(false).iterator().next(); + assertEquals(0, root.getOperatorId()); + PhysicalOperator o1 = root.iterator().next(); + assertEquals(1, o1.getOperatorId()); + PhysicalOperator o2 = o1.iterator().next(); + assertEquals(2, o2.getOperatorId()); + if(reversed) break; + reversed = !reversed; + } + + + + + } +}
