This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 11f9ca5 [BEAM-7844] Custom MetadataHandler for NodeStats (RowRateWindow) new e5366eb Merge pull request #9185 from riazela/RowRateWindow 11f9ca5 is described below commit 11f9ca5adc656e06bcab168b23a3b6bc2ea09242 Author: Alireza Samadian <alireza4...@gmail.com> AuthorDate: Mon Jul 29 12:54:16 2019 -0700 [BEAM-7844] Custom MetadataHandler for NodeStats (RowRateWindow) --- .../extensions/sql/impl/CalciteQueryPlanner.java | 5 +- .../sdk/extensions/sql/impl/planner/NodeStats.java | 86 ++++++++++++++++++++++ .../sql/impl/planner/NodeStatsMetadata.java | 55 ++++++++++++++ .../sql/impl/planner/RelMdNodeStats.java | 84 +++++++++++++++++++++ .../sql/impl/rel/BeamAggregationRel.java | 7 ++ .../sdk/extensions/sql/impl/rel/BeamCalcRel.java | 6 ++ .../sdk/extensions/sql/impl/rel/BeamIOSinkRel.java | 7 ++ .../extensions/sql/impl/rel/BeamIOSourceRel.java | 6 ++ .../extensions/sql/impl/rel/BeamIntersectRel.java | 7 ++ .../sdk/extensions/sql/impl/rel/BeamJoinRel.java | 6 ++ .../sdk/extensions/sql/impl/rel/BeamMinusRel.java | 7 ++ .../sdk/extensions/sql/impl/rel/BeamRelNode.java | 4 + .../sdk/extensions/sql/impl/rel/BeamSortRel.java | 7 ++ .../extensions/sql/impl/rel/BeamUncollectRel.java | 7 ++ .../sdk/extensions/sql/impl/rel/BeamUnionRel.java | 7 ++ .../sdk/extensions/sql/impl/rel/BeamUnnestRel.java | 7 ++ .../sdk/extensions/sql/impl/rel/BeamValuesRel.java | 7 ++ .../extensions/sql/impl/planner/NodeStatsTest.java | 79 ++++++++++++++++++++ 18 files changed, 393 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index 3d4e6ca..a4ec34f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl; +import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -152,7 +153,9 @@ class CalciteQueryPlanner implements QueryPlanner { .setMetadataProvider( ChainedRelMetadataProvider.of( ImmutableList.of( - NonCumulativeCostImpl.SOURCE, root.rel.getCluster().getMetadataProvider()))); + NonCumulativeCostImpl.SOURCE, + RelMdNodeStats.SOURCE, + root.rel.getCluster().getMetadataProvider()))); RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider())); root.rel.getCluster().invalidateMetadataQuery(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStats.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStats.java new file mode 100644 index 0000000..88d7ad2 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStats.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.planner; + +import com.google.auto.value.AutoValue; + +/** This is a utility class to represent rowCount, rate and window. */ +@AutoValue +public abstract class NodeStats { + + /** + * Returns an instance with all values set to INFINITY. This will be only used when the node is + * not a BeamRelNode and we don't have an estimation implementation for it in the metadata + * handler. In this case we return INFINITE and it will be propagated up in the estimates. + */ + public static final NodeStats UNKNOWN = + create(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY); + + public abstract double getRowCount(); + + public abstract double getRate(); + + /** + * This method returns the number of tuples in each window. It is different than the windowing + * notion of Beam. + */ + public abstract double getWindow(); + + public static NodeStats create(double rowCount, double rate, double window) { + if (window < 0 || rate < 0 || rowCount < 0) { + throw new IllegalArgumentException("All the estimates in NodeStats should be positive"); + } + return new AutoValue_NodeStats(rowCount, rate, window); + } + + /** It creates an instance with rate=0 and window=rowCount for bounded sources. */ + public static NodeStats create(double rowCount) { + return create(rowCount, 0d, rowCount); + } + + /** If any of the values for rowCount, rate or window is infinite, it returns true. */ + public boolean isUnknown() { + return Double.isInfinite(getRowCount()) + || Double.isInfinite(getRate()) + || Double.isInfinite(getWindow()); + } + + public NodeStats multiply(double factor) { + return create(getRowCount() * factor, getRate() * factor, getWindow() * factor); + } + + public NodeStats plus(NodeStats that) { + if (this.isUnknown() || that.isUnknown()) { + return UNKNOWN; + } + return create( + this.getRowCount() + that.getRowCount(), + this.getRate() + that.getRate(), + this.getWindow() + that.getWindow()); + } + + public NodeStats minus(NodeStats that) { + if (this.isUnknown() || that.isUnknown()) { + return UNKNOWN; + } + return create( + Math.max(this.getRowCount() - that.getRowCount(), 0), + Math.max(this.getRate() - that.getRate(), 0), + Math.max(this.getWindow() - that.getWindow(), 0)); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java new file mode 100644 index 0000000..4a9e79f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsMetadata.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.planner; + +import java.lang.reflect.Method; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.Metadata; +import org.apache.calcite.rel.metadata.MetadataDef; +import org.apache.calcite.rel.metadata.MetadataHandler; +import org.apache.calcite.rel.metadata.RelMetadataQuery; + +/** + * This is a metadata used for row count and rate estimation. It extends Calcite's Metadata + * interface so that we can use MetadataQuery to get our own estimates. + */ +public interface NodeStatsMetadata extends Metadata { + Method METHOD = Types.lookupMethod(NodeStatsMetadata.class, "getNodeStats"); + + MetadataDef<NodeStatsMetadata> DEF = + MetadataDef.of(NodeStatsMetadata.class, NodeStatsMetadata.Handler.class, METHOD); + + // In order to use this we need to call it by relNode.metadata(RowRateWindowMetadata.class, + // mq).getRowRateWindow() where mq is the MetadataQuery (can be obtained by + // relNode.getCluster().getMetadataQuery()). After this, Calcite looks for the implementation of + // this metadata that we have registered in MetadataProvider (it is RelMdNodeStats.class in + // this case and we have registered it in CalciteQueryPlanner). Then Calcite's generated Code + // decides the type of the rel node and calls appropriate method in RelMdNodeStats. + // For instance: Join is a subclass of RelNode and if we have both getNodeStats(RelNode rel, + // RelMetadataQuery mq) and getNodeStats(Join rel, RelMetadataQuery mq) then if the rel is an + // instance of Join it will call getNodeStats((Join) rel, mq). + // Currently we only register it in SQLTransform path. JDBC does not register this and it does not + // use it. (because it does not register the our NonCumulativeMetadata implementation either). + NodeStats getNodeStats(); + + /** Handler API. */ + interface Handler extends MetadataHandler<NodeStatsMetadata> { + NodeStats getNodeStats(RelNode r, RelMetadataQuery mq); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/RelMdNodeStats.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/RelMdNodeStats.java new file mode 100644 index 0000000..c01fbb5 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/RelMdNodeStats.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.planner; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.MetadataDef; +import org.apache.calcite.rel.metadata.MetadataHandler; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; + +/** + * This is the implementation of NodeStatsMetadata. Methods to estimate rate and row count for + * Calcite's logical nodes be implemented here. + */ +public class RelMdNodeStats implements MetadataHandler<NodeStatsMetadata> { + + public static final RelMetadataProvider SOURCE = + ReflectiveRelMetadataProvider.reflectiveSource( + NodeStatsMetadata.METHOD, new RelMdNodeStats()); + + @Override + public MetadataDef<NodeStatsMetadata> getDef() { + return NodeStatsMetadata.DEF; + } + + @SuppressWarnings("UnusedDeclaration") + public NodeStats getNodeStats(RelNode rel, RelMetadataQuery mq) { + + if (rel instanceof BeamRelNode) { + return this.getBeamNodeStats((BeamRelNode) rel, mq); + } + + // We can later define custom methods for all different RelNodes to prevent hitting this point. + // Similar to RelMdRowCount in calcite. + + return NodeStats.UNKNOWN; + } + + private NodeStats getBeamNodeStats(BeamRelNode rel, RelMetadataQuery mq) { + + // Removing the unknown results. + // Calcite caches previous results in mq.map. This is done to prevent cyclic calls of this + // method and also improving the performance. However, we might have returned an unknown result + // because one of the inputs of the node was unknown (it is a logical node that we have not + // implemented getNodeStats for it). Later we should not get the Unknown, therefore we need to + // remove unknown results everyTime that this method is called. + // Results are also cached in CachingRelMetadataProvider because calcite PlannerImpl#Transform + // wraps the metadata provider with CachingRelMetadataProvider. However, + // CachingRelMetadataProvider checks timestamp before returning previous results. Therefore, + // there wouldn't be a problem in that case. + List<List> keys = + mq.map.entrySet().stream() + .filter(entry -> entry.getValue() instanceof NodeStats) + .filter(entry -> ((NodeStats) entry.getValue()).isUnknown()) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + for (List key : keys) { + mq.map.remove(key); + } + + return rel.estimateNodeStats(mq); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index 15038f7..14e7475 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -24,6 +24,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import java.io.Serializable; import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.schemas.Schema; @@ -55,6 +56,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.ImmutableBitSet; import org.joda.time.Duration; @@ -81,6 +83,11 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { } @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } + + @Override public RelWriter explainTerms(RelWriter pw) { super.explainTerms(pw); if (this.windowFn != null) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index f8f69a5..31ae94a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.DateType; @@ -213,6 +214,11 @@ public class BeamCalcRel extends Calc implements BeamRelNode { throw new RuntimeException("Could not get the limit count from a non BeamSortRel input."); } + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } + public boolean isInputSortRelAndLimitOnly() { return (input instanceof BeamSortRel) && ((BeamSortRel) input).isLimitOnly(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java index 2c77f56..5aaa635 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSinkRel.java @@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; @@ -34,6 +35,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.prepare.Prepare; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql2rel.RelStructuredTypeFlattener; @@ -71,6 +73,11 @@ public class BeamIOSinkRel extends TableModify } @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { boolean flattened = isFlattened() || isFlattening; BeamIOSinkRel newRel = diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java index e22b64b..d87f152 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -64,6 +65,11 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { } @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } + + @Override public PCollection.IsBounded isBounded() { return beamTable.isBounded(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java index cc2590e..a90972a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -27,6 +28,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Intersect; import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.metadata.RelMetadataQuery; /** * {@code BeamRelNode} to replace a {@code Intersect} node. @@ -49,4 +51,9 @@ public class BeamIntersectRel extends Intersect implements BeamRelNode { public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { return new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.INTERSECT, all); } + + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 94c5a6b..167b8a5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.schemas.Schema; @@ -141,6 +142,11 @@ public class BeamJoinRel extends Join implements BeamRelNode { return super.computeSelfCost(planner, mq); } + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } + /** * This method checks if a join is legal and can be converted into Beam SQL. It is used during * planning and applying {@link diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java index 482b8be..0a8103d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -27,6 +28,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Minus; import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.metadata.RelMetadataQuery; /** * {@code BeamRelNode} to replace a {@code Minus} node. @@ -49,4 +51,9 @@ public class BeamMinusRel extends Minus implements BeamRelNode { public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { return new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.MINUS, all); } + + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java index 29e53d1..8b1d577 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java @@ -19,11 +19,13 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.Row; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; /** A {@link RelNode} that can also give a {@link PTransform} that implements the expression. */ public interface BeamRelNode extends RelNode { @@ -61,4 +63,6 @@ public interface BeamRelNode extends RelNode { } return options; } + + NodeStats estimateNodeStats(RelMetadataQuery mq); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java index 57270c3..f252f11 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.state.StateSpec; @@ -56,6 +57,7 @@ import org.apache.calcite.rel.RelCollationImpl; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; @@ -134,6 +136,11 @@ public class BeamSortRel extends Sort implements BeamRelNode { } } + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } + public boolean isLimitOnly() { return fieldIndices.isEmpty(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java index 6424d9a..28d2d69 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUncollectRel.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; @@ -31,6 +32,7 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Uncollect; +import org.apache.calcite.rel.metadata.RelMetadataQuery; /** {@link BeamRelNode} to implement an uncorrelated {@link Uncollect}, aka UNNEST. */ public class BeamUncollectRel extends Uncollect implements BeamRelNode { @@ -72,6 +74,11 @@ public class BeamUncollectRel extends Uncollect implements BeamRelNode { } } + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } + private static class UncollectDoFn extends DoFn<Row, Row> { private final Schema schema; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java index 175b139..95a1826 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; @@ -28,6 +29,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.SetOp; import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.metadata.RelMetadataQuery; /** * {@link BeamRelNode} to replace a {@link Union}. @@ -76,4 +78,9 @@ public class BeamUnionRel extends Union implements BeamRelNode { public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { return new BeamSetOperatorRelBase(this, BeamSetOperatorRelBase.OpType.UNION, all); } + + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java index da675d1..b51df06 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnnestRel.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; @@ -35,6 +36,7 @@ import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Correlate; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.Uncollect; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.validate.SqlValidatorUtil; @@ -75,6 +77,11 @@ public class BeamUnnestRel extends Uncollect implements BeamRelNode { } @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } + + @Override public RelWriter explainTerms(RelWriter pw) { return super.explainTerms(pw).item("unnestIndex", Integer.toString(unnestIndex)); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index 3b285c4..c4405ba 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Map; import java.util.stream.IntStream; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; @@ -37,6 +38,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexLiteral; @@ -91,4 +93,9 @@ public class BeamValuesRel extends Values implements BeamRelNode { .mapToObj(i -> autoCastField(schema.getField(i), tuple.get(i).getValue())) .collect(toRow(schema)); } + + @Override + public NodeStats estimateNodeStats(RelMetadataQuery mq) { + return NodeStats.create(mq.getRowCount(this)); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java new file mode 100644 index 0000000..820f4fc --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStatsTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.planner; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest; +import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** This tests the NodeStats Metadata handler and the estimations. */ +public class NodeStatsTest extends BaseRelTest { + static class UnknownRel extends SingleRel { + protected UnknownRel(RelOptCluster cluster, RelTraitSet traits, RelNode input) { + super(cluster, traits, input); + } + } + + public static final TestBoundedTable ORDER_DETAILS1 = + TestBoundedTable.of( + Schema.FieldType.INT32, "order_id", + Schema.FieldType.INT32, "site_id", + Schema.FieldType.INT32, "price") + .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5); + + public static final TestBoundedTable ORDER_DETAILS2 = + TestBoundedTable.of( + Schema.FieldType.INT32, "order_id", + Schema.FieldType.INT32, "site_id", + Schema.FieldType.INT32, "price") + .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5); + + @BeforeClass + public static void prepare() { + registerTable("ORDER_DETAILS1", ORDER_DETAILS1); + registerTable("ORDER_DETAILS2", ORDER_DETAILS2); + } + + @Test + public void testUnknownRel() { + String sql = " select * from ORDER_DETAILS1 "; + RelNode root = env.parseQuery(sql); + RelNode unknown = new UnknownRel(root.getCluster(), null, null); + NodeStats nodeStats = + unknown + .metadata(NodeStatsMetadata.class, unknown.getCluster().getMetadataQuery()) + .getNodeStats(); + Assert.assertTrue(nodeStats.isUnknown()); + } + + @Test + public void testKnownRel() { + String sql = " select * from ORDER_DETAILS1 "; + RelNode root = env.parseQuery(sql); + NodeStats nodeStats = + root.metadata(NodeStatsMetadata.class, root.getCluster().getMetadataQuery()).getNodeStats(); + Assert.assertFalse(nodeStats.isUnknown()); + } +}