LennonChin commented on code in PR #7204: URL: https://github.com/apache/kyuubi/pull/7204#discussion_r2348147365
########## extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/plan/ChildOutputHolder.scala: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.spark.authz.rule.plan + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} + +import org.apache.kyuubi.plugin.spark.authz.util.WithInternalChild + +case class ChildOutputHolder(child: LogicalPlan, childOutput: Seq[Attribute]) + extends UnaryNode with WithInternalChild { + + val output: Seq[Attribute] = childOutput Review Comment: > Why not just return `child.output`? Suppose now we have a table like: ```sql CREATE TABLE IF NOT EXISTS default.test (id int, scope int, part string) PARTITIONED BY(part) ``` when we query like `select count(*) from (select id from default.test where part = 'part-1') t`, the original plan is following: ``` GlobalLimit 21 +- LocalLimit 21 +- Project [cast(count(1)#10L as string) AS count(1)#13] +- Aggregate [count(1) AS count(1)#10L] +- Project [id#7] +- Filter (part#9 = part-1) +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] ``` because the `references` of `Aggregate [count(1) AS count(1)#21L]` is empty, `ColumnPruning` will add a `Project` with empty projection list onto `Project [id#49]` and `CollapseProject` will collapsing this empty `Project` node: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning === GlobalLimit 21 GlobalLimit 21 +- LocalLimit 21 +- LocalLimit 21 +- Project [cast(count(1)#10L as string) AS count(1)#13] +- Project [cast(count(1)#10L as string) AS count(1)#13] +- Aggregate [count(1) AS count(1)#10L] +- Aggregate [count(1) AS count(1)#10L] ! +- Project [id#7] +- Project ! +- Filter (part#9 = part-1) +- Project ! +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] +- Filter (part#9 = part-1) ! +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] === Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject === GlobalLimit 21 GlobalLimit 21 +- LocalLimit 21 +- LocalLimit 21 ! +- Project [cast(count(1)#10L as string) AS count(1)#13] +- Aggregate [cast(count(1) as string) AS count(1)#13] ! +- Aggregate [count(1) AS count(1)#10L] +- Project ! +- Project +- Filter (part#9 = part-1) ! +- Project +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] ! +- Filter (part#9 = part-1) ! +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] ``` the plan tree will be modified as following result: ``` GlobalLimit 21 +- LocalLimit 21 +- Aggregate [cast(count(1) as string) AS count(1)#13] +- Project +- Filter (part#9 = part-1) +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] ``` in `PrivilegeBuilder.buildQuery`, follwing code will skip `Aggregate [count(1) AS count(1)#21L]` because it's `p.references.toSeq ++ p.output` is empty, so all child nodes' privileges in the tree will be ignored: ```scala if (columnPrune(p.references.toSeq ++ p.output, p.inputSet).isEmpty) { // If plan is project and output don't have relation to input, can ignore. if (!p.isInstanceOf[Project]) { ... } } ``` if we add a `ChildOutputHolder`, the plan tree checked by `PrivilegeBuilder.buildQuery` will be: ```scala Aggregate [cast(count(1) as string) AS count(1)#13] +- Project +- ChildOutputHolder [id#7] +- Project [id#7] +- Filter (part#9 = part-1) +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] ``` `ChildOutputHolder [id#7]` fixed the output of sub node `Project [id#7]` and because plan tree exists `ChildOutputHolder` node, follwing code will continue check sub nodes' privileges: ```scala case p => val existsChildOutputHolder = p.exists(_.isInstanceOf[ChildOutputHolder]) for (child <- p.children) { if (columnPrune(p.references.toSeq ++ p.output, p.inputSet).isEmpty) { // 1. If plan is project and output don't have relation to input, can ignore. // 2. If sub logic plan tree exists ChildOutputHolder node, it means that the output of // some nodes in the tree is fixed by RuleChildOutputMarker in some special // scenarios, such as the Aggregate(count(*)) child node. To avoid missing child node // permissions, we need to continue checking down. if (!p.isInstanceOf[Project] || existsChildOutputHolder) { buildQuery( child, privilegeObjects, p.inputSet.map(_.toAttribute).toSeq, Nil, spark) } } else { ... } } ``` Why does `ChildOutputHolder` need to hold the output of the `Aggregate` node's original child node? if we just return `child.output` in `ChildOutputHolder` like this: ```scala case class ChildOutputHolder(child: LogicalPlan) extends UnaryNode with WithInternalChild { val output: Seq[Attribute] = child.output override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(child = newChild) } ``` after `ColumnPruning` and `CollapseProject` conversion, `ChildOutputHolder`'s child will be a `Project` with empty projection list, `ChildOutputHolder`'s input and output are empty, we lost original output of `Aggregate`'s child node `Project [id#7]`, therefore we will lost the privilege checking of `id` column: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning === GlobalLimit 21 GlobalLimit 21 +- LocalLimit 21 +- LocalLimit 21 +- Project [cast(count(1)#10L as string) AS count(1)#13] +- Project [cast(count(1)#10L as string) AS count(1)#13] +- Aggregate [count(1) AS count(1)#10L] +- Aggregate [count(1) AS count(1)#10L] ! +- ChildOutputHolder +- Project ! +- Project [id#7] +- ChildOutputHolder ! +- Filter (part#9 = part-1) +- Project ! +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] +- Project ! +- Filter (part#9 = part-1) ! +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] === Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject === GlobalLimit 21 GlobalLimit 21 +- LocalLimit 21 +- LocalLimit 21 ! +- Project [cast(count(1)#10L as string) AS count(1)#13] +- Aggregate [cast(count(1) as string) AS count(1)#13] ! +- Aggregate [count(1) AS count(1)#10L] +- Project ! +- Project +- ChildOutputHolder ! +- ChildOutputHolder +- Project ! +- Project +- Filter (part#9 = part-1) ! +- Project +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] ! +- Filter (part#9 = part-1) ! +- HiveTableRelation [`default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], Partition Cols: [part#9]] ``` At the same time, a `ChildOutputHolder` node that fixes the original child node output can prevent `ColumnPruning` from continuously adding empty `Project` nodes, these empty `Project` nodes will eliminate some original nodes that may carry useful outputs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
