ahshahid opened a new pull request #30185:
URL: https://github.com/apache/spark/pull/30185


   ### What changes were proposed in this pull request?
   This PR proposes new logic to store the constraint and track aliases in 
projection which eliminates the need of pessimistically generating all the 
permutations of a given constraint. It is also more effective in correctly 
identifying the filters which can be pruned, apart from minimizing the memory 
used as compared to the current code. This also has changes to push compound 
filters if the join condition is on multiple attributes and the constraint 
comprises of more than 1 attributes of the join conditions.
   
   Right now the code retains the old logic of constraint management along with 
the new logic. It is controlled by a sql conf property 
"spark.sql.optimizer.optimizedConstraintPropagation.enabled" which is by 
default true. Once the PR is approved it would make sense to remove the old 
code & merge the code of ConstraintSet into ExpressionSet and removing certain 
if else blocks in the Optimizer & the function Optimizer.getAllConstraints and 
LogicalPlan.getAllValidConstraints.
   It is the code of getAllValidConstraints which generates all the 
permutations of the base constraint.
   
   The new logic is as follows:
   In the class ConstraintSet which extends ExpressionSet, we track the aliases 
, along with the base constraint.
   Any constraint which is added to the ConstraintSet is stored in the most 
canonicalized form (i.e consisting of only those attributes which are part of 
the output set and NOT the Alias's attribute).
   
   for eg consider a hypothetical plan
   Projection1 ( a, a as a1, a as a2, b , b as b1, b as b2, c, a +b as z)
   |
   Filter ( a + b > 10)
   |
   base relation (a, b, c, d)
   
   At the node Filter the constraint set will just have constraint a + b > 10
   At the Node Projection1 , the constraint set will have
   constraint a + b > 10
   and maintain following buffers
   buff1 -> a , a1.attribute, a2. attribute
   buff2 -> b, b1.attribute, b2.attribute
   buff3 -> a + b, z.attribute
   
   constraint a + b > 10 is already canonicalized in terms of output attributes.
   
   Now suppose there are two filters on top of projection1
   Filter( z > 10) and Filter ( a1 + b2 > 10)
   
   To prune the above two filters, we canonicalize z as a + b ( from the data 
maintained in the constraintset) & check if the underlying set contains a +b > 
10 & so can be pruned.
   For Filter a1 + b2 > 10, we identify the buffer to which a1 & b2 belong to 
and replace it with 0th elements of the buffer, which will yield a +b > 10, and 
so filter can be pruned.
   
   Now suppose there is another Project2 ( a1, a2, b1, b2, z, c)
   i.e say attributes a & b are no longer part of outputset.
   The idea is that "as much as possible try to make a constraint survive).
   So in Project2 , the atttributes a & b are being eliminated.
   we have a constraint a + b > 10 which is dependent on it.
   so in the constraintset of the ProjectP2, we update it such that
   a + b > 10 becomes ----> a1.attr + b1.attr > 10
   buff1 a , a1.attribute, a2. attribute ---> a1.attribute, a2. attribute
   buff2 b , b1.attribute, b2. attribute ---> b1.attribute, b2. attribute
   buff3 a +b , z.attribute -->. a1.attr + b1.attr , z.attr
   
   This way by tracking aliases & just storing the canonicalized base 
constraints we can eliminate the need of pessimistically generating all 
combination of constraints.
   
   For inferring new Filter from constraints ,
   we use following logic
   New Filter = Filter.constraints -- ( Filter.child.constraints ++ 
Filter.constraints.convertToCanonicalizedIfRequired(Filter.conditions) )
   So the idea is that new filter conditions without redundancy can be obtained 
by difference of current node's constraints & the child node's constraints & 
the condition itself properly canonicalized in terms of base attributes which 
will be part of the output set of filter node.
   
   For inferring new filters for Join push down, we identify all the equality 
conditions & then the attributes are segregated on the lines of LHS & RHS of 
joins. So to identify filters for push down on RHS side, we get all equality 
atttributes of LHS side & ask the constraintset to return all the constraints 
which are subset of the passed LHS attributes. The LHS attributes are 
appropriately canonicalized & the constrainst identified.
   Once the constraints are know, we can replace the attributes with the 
corresponding RHS attributes. This helps in identifying the compound filters 
for push down & not just single attribute filters.
   
   Below is a description of the changes proposed.
   
   ExpressionSet: Apart from adding some new functions, fixed the two bugs in 
the ExpressionSet where in filter & filterNot, e.canonicalized was being used. 
e is already canonicalized. Also in very complex expressions canonicalization 
of a canonicalized object ( especially if contains a join expression ) does not 
behave correctly in current spark code.
   The ExpressionSet has added methods just to retain the existing constraints 
code.
   
   ConstraintSet: This is the class which does the tracking of the aliases , 
stores the constraints in the canonicalized form, updates the constraints using 
available aliases if any of the attribute comprising constraint is getting 
eliminated. The contains method of this class is used for filter pruning. It 
also identifies those constraints which can generated new filters for push down 
in join nodes.
   
   Rest all the changes are just to integrate the new logic as well as retain 
the old constraints logic.
   
   Pls notice that related to tpcds plan stability , I had to add new golden 
files for q75. The change as such is trivial.
   previously pushed filter was generated as
   PushedFilters: [IsNotNull(cr_order_number), IsNotNull(cr_item_sk)]
   and with the change it is
   PushedFilters: [IsNotNull(cr_item_sk), IsNotNull(cr_order_number)]
   
   
   ### Why are the changes needed?
   1) This issue if not fixed can cause OutOfMemory issue or unacceptable query 
compilation times.
   Added a test "plan equivalence with case statements and performance 
comparison with benefit of more than 10x conservatively"
   in
   org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite
   
   with this PR the compilation time is 247 ms vs 13958 ms without the change
   2) It is more effective in filter pruning as  is evident in some of the 
tests in 
org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite  where 
current code is not able to identify the redundant filter in some cases.
   
   3) It is able to generate a better optimized plan for join queries as it can 
push compound predicates.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   I have been running the target /build/mvn clean install.
   As seen below the sql, catalyst modules are passing cleanly.
   The hive module reports failure due to 1 suite aborted, but no test failures.
   I have been told by my colleagues that there is an issue in Hive module ( 
unrelated to this change).
   I am looking further as to what is causing the abort. 
   In case if you have any information, please add.
   02:25:27.539 WARN org.apache.hadoop.hive.conf.HiveConf: HiveConf of name 
hive.stats.jdbc.timeout does not exist
   02:25:27.539 WARN org.apache.hadoop.hive.conf.HiveConf: HiveConf of name 
hive.stats.retries.wait does not exist
   Run completed in 2 hours, 27 minutes, 59 seconds.
   Total number of tests run: 3123
   Suites: completed 108, aborted 1
   Tests: succeeded 3123, failed 0, canceled 0, ignored 18, pending 0
   *** 1 SUITE ABORTED ***
   
   [INFO] 
------------------------------------------------------------------------
   [INFO] Reactor Summary for Spark Project Parent POM 3.1.0-SNAPSHOT:
   [INFO] 
   [INFO] Spark Project Parent POM ........................... SUCCESS [ 17.115 
s]
   [INFO] Spark Project Tags ................................. SUCCESS [ 21.426 
s]
   [INFO] Spark Project Sketch ............................... SUCCESS [ 41.413 
s]
   [INFO] Spark Project Local DB ............................. SUCCESS [ 20.063 
s]
   [INFO] Spark Project Networking ........................... SUCCESS [01:12 
min]
   [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 24.297 
s]
   [INFO] Spark Project Unsafe ............................... SUCCESS [ 32.305 
s]
   [INFO] Spark Project Launcher ............................. SUCCESS [ 17.877 
s]
   [INFO] Spark Project Core ................................. SUCCESS [59:57 
min]
   [INFO] Spark Project ML Local Library ..................... SUCCESS [01:12 
min]
   [INFO] Spark Project GraphX ............................... SUCCESS [05:22 
min]
   [INFO] Spark Project Streaming ............................ SUCCESS [08:48 
min]
   [INFO] Spark Project Catalyst ............................. SUCCESS [16:55 
min]
   [INFO] Spark Project SQL .................................. SUCCESS [  02:25 
h]
   [INFO] Spark Project ML Library ........................... SUCCESS [42:14 
min]
   [INFO] Spark Project Tools ................................ SUCCESS [ 19.337 
s]
   [INFO] Spark Project Hive ................................. FAILURE [  02:33 
h]
   [INFO] Spark Project REPL ................................. SKIPPED
   [INFO] Spark Project Assembly ............................. SKIPPED
   [INFO] Kafka 0.10+ Token Provider for Streaming ........... SKIPPED
   [INFO] Spark Integration for Kafka 0.10 ................... SKIPPED
   [INFO] Kafka 0.10+ Source for Structured Streaming ........ SKIPPED
   [INFO] Spark Project Examples ............................. SKIPPED
   [INFO] Spark Integration for Kafka 0.10 Assembly .......... SKIPPED
   [INFO] Spark Avro ......................................... SKIPPED
   [INFO] 
------------------------------------------------------------------------
   [INFO] BUILD FAILURE
   [INFO] 
------------------------------------------------------------------------
   [INFO] Total time:  07:18 h
   [INFO] Finished at: 2020-10-29T02:27:10-07:00
   [INFO] 
------------------------------------------------------------------------
   [ERROR] Failed to execute goal 
org.scalatest:scalatest-maven-plugin:2.0.0:test (test) on project 
spark-hive_2.12: There are test failures -> [Help 1]
   [ERROR] 
   [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
   [ERROR] Re-run Maven using the -X switch to enable full debug logging.
   [ERROR] 
   [ERROR] For more information about the errors and possible solutions, please 
read the following articles:
   [ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
   [ERROR] 
   [ERROR] After correcting the problems, you can resume the build with the 
command
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to