LennonChin commented on code in PR #7204:
URL: https://github.com/apache/kyuubi/pull/7204#discussion_r2348147365


##########
extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/plan/ChildOutputHolder.scala:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.rule.plan
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+
+import org.apache.kyuubi.plugin.spark.authz.util.WithInternalChild
+
+case class ChildOutputHolder(child: LogicalPlan, childOutput: Seq[Attribute])
+  extends UnaryNode with WithInternalChild {
+
+  val output: Seq[Attribute] = childOutput

Review Comment:
   > Why not just return `child.output`?
   
   Suppose now we have a table like:
   
   ```sql
   CREATE TABLE IF NOT EXISTS default.test
   (id int, scope int, part string)
   PARTITIONED BY(part)
   ```
   
   when we query like `select count(*) from (select id from default.test where 
part = 'part-1') t`, the original plan is following:
   
   ```
   GlobalLimit 21
   +- LocalLimit 21
      +- Project [cast(count(1)#10L as string) AS count(1)#13]
         +- Aggregate [count(1) AS count(1)#10L]
            +- Project [id#7]
               +- Filter (part#9 = part-1)
                  +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]] 
   ```
   
   because the `references` of `Aggregate [count(1) AS count(1)#21L]` is empty, 
`ColumnPruning` will add a `Project` with empty projection list onto `Project 
[id#49]` and `CollapseProject` will collapsing this empty `Project` node:
   
   ```
   === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
    GlobalLimit 21                                                              
                                                                                
       GlobalLimit 21
    +- LocalLimit 21                                                            
                                                                                
       +- LocalLimit 21
       +- Project [cast(count(1)#10L as string) AS count(1)#13]                 
                                                                                
          +- Project [cast(count(1)#10L as string) AS count(1)#13]
          +- Aggregate [count(1) AS count(1)#10L]                               
                                                                                
             +- Aggregate [count(1) AS count(1)#10L]
   !         +- Project [id#7]                                                  
                                                                                
                +- Project
   !            +- Filter (part#9 = part-1)                                     
                                                                                
                   +- Project
   !               +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]                  +- Filter (part#9 = part-1)
   !                                                                            
                                                                                
                         +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]
   
   === Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
    GlobalLimit 21                                                              
                                                                                
          GlobalLimit 21
    +- LocalLimit 21                                                            
                                                                                
          +- LocalLimit 21
   !   +- Project [cast(count(1)#10L as string) AS count(1)#13]                 
                                                                                
             +- Aggregate [cast(count(1) as string) AS count(1)#13]
   !      +- Aggregate [count(1) AS count(1)#10L]                               
                                                                                
                +- Project
   !         +- Project                                                         
                                                                                
                   +- Filter (part#9 = part-1)
   !            +- Project                                                      
                                                                                
                      +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]
   !               +- Filter (part#9 = part-1)                                  
                                                                                
          
   !                  +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]   
   ```
   
   the plan tree will be modified as following result:
   
   ```
   GlobalLimit 21
   +- LocalLimit 21
      +- Aggregate [cast(count(1) as string) AS count(1)#13]
         +- Project
            +- Filter (part#9 = part-1)
               +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]
   ```
   
   in `PrivilegeBuilder.buildQuery`, follwing code will skip `Aggregate 
[count(1) AS count(1)#21L]` because it's `p.references.toSeq ++ p.output` is 
empty, so all child nodes' privileges in the tree will be ignored:
   
   ```scala
   if (columnPrune(p.references.toSeq ++ p.output, p.inputSet).isEmpty) {
     // If plan is project and output don't have relation to input, can ignore.
     if (!p.isInstanceOf[Project]) {
      ...
     }
   }
   ```
   
   if we add a `ChildOutputHolder`, the plan tree checked by 
`PrivilegeBuilder.buildQuery` will be:
   
   ```scala
   Aggregate [cast(count(1) as string) AS count(1)#13]
   +- Project
      +- ChildOutputHolder [id#7]
         +- Project [id#7]
            +- Filter (part#9 = part-1)
               +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]
   ```
   
   `ChildOutputHolder [id#7]` fixed the output of sub node `Project [id#7]` and 
because plan tree exists `ChildOutputHolder` node, follwing code will continue 
check sub nodes' privileges:
   
   ```scala
   case p =>
      val existsChildOutputHolder = p.exists(_.isInstanceOf[ChildOutputHolder])
      for (child <- p.children) {
         if (columnPrune(p.references.toSeq ++ p.output, p.inputSet).isEmpty) {
         // 1. If plan is project and output don't have relation to input, can 
ignore.
         // 2. If sub logic plan tree exists ChildOutputHolder node, it means 
that the output of
         //    some nodes in the tree is fixed by RuleChildOutputMarker in some 
special
         //    scenarios, such as the Aggregate(count(*)) child node. To avoid 
missing child node
         //    permissions, we need to continue checking down.
         if (!p.isInstanceOf[Project] || existsChildOutputHolder) {
            buildQuery(
               child,
               privilegeObjects,
               p.inputSet.map(_.toAttribute).toSeq,
               Nil,
               spark)
         }
         } else {
         ...
         }
      }
   ```
   
   Why does `ChildOutputHolder` need to hold the output of the `Aggregate` 
node's original child node? if we just return `child.output` in 
`ChildOutputHolder` like this:
   
   ```scala
   case class ChildOutputHolder(child: LogicalPlan)
     extends UnaryNode with WithInternalChild {
   
     val output: Seq[Attribute] = child.output
   
     override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = 
copy(child = newChild)
   }
   ```
   
   after `ColumnPruning` and `CollapseProject` conversion, 
`ChildOutputHolder`'s child will be a `Project` with empty projection list, 
`ChildOutputHolder`'s input and output are empty, we lost original output of 
`Aggregate`'s child node `Project [id#7]`, therefore we will lost the privilege 
checking of `id` column:
   
   ```
   === Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
    GlobalLimit 21                                                              
                                                                                
          GlobalLimit 21
    +- LocalLimit 21                                                            
                                                                                
          +- LocalLimit 21
       +- Project [cast(count(1)#10L as string) AS count(1)#13]                 
                                                                                
             +- Project [cast(count(1)#10L as string) AS count(1)#13]
          +- Aggregate [count(1) AS count(1)#10L]                               
                                                                                
                +- Aggregate [count(1) AS count(1)#10L]
   !         +- ChildOutputHolder                                               
                                                                                
                   +- Project
   !            +- Project [id#7]                                               
                                                                                
                      +- ChildOutputHolder
   !               +- Filter (part#9 = part-1)                                  
                                                                                
                         +- Project
   !                  +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]                     +- Project
   !                                                                            
                                                                                
                               +- Filter (part#9 = part-1)
   !                                                                            
                                                                                
                                  +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]
   
   === Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
    GlobalLimit 21                                                              
                                                                                
                GlobalLimit 21
    +- LocalLimit 21                                                            
                                                                                
                +- LocalLimit 21
   !   +- Project [cast(count(1)#10L as string) AS count(1)#13]                 
                                                                                
                   +- Aggregate [cast(count(1) as string) AS count(1)#13]
   !      +- Aggregate [count(1) AS count(1)#10L]                               
                                                                                
                      +- Project
   !         +- Project                                                         
                                                                                
                         +- ChildOutputHolder
   !            +- ChildOutputHolder                                            
                                                                                
                            +- Project
   !               +- Project                                                   
                                                                                
                               +- Filter (part#9 = part-1)
   !                  +- Project                                                
                                                                                
                                  +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]
   !                     +- Filter (part#9 = part-1)                            
                                                                                
                
   !                        +- HiveTableRelation [`default`.`test`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#7, scope#8], 
Partition Cols: [part#9]]   
   ```
   
   At the same time, a `ChildOutputHolder` node that fixes the original child 
node output can prevent `ColumnPruning` from continuously adding empty 
`Project` nodes, these empty `Project` nodes will eliminate some original nodes 
that may carry useful outputs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to