viirya opened a new pull request #25106: [SPARK-28345][SQL][Python] PythonUDF 
predicate should be able to pushdown to join
URL: https://github.com/apache/spark/pull/25106
 
 
   ## What changes were proposed in this pull request?
   
   A `Filter` predicate using `PythonUDF` can't be push down into join 
condition, currently. A predicate like that should be able to push down to join 
condition. For `PythonUDF`s that can't be evaluated in join condition, 
`PullOutPythonUDFInJoinCondition` will pull them out later.
   
   An example like:
   
   ```scala
   val pythonTestUDF = TestPythonUDF(name = "udf")
   
   val left = Seq((1, 2), (2, 3)).toDF("a", "b")
   val right = Seq((1, 2), (3, 4)).toDF("c", "d")
   val df = left.crossJoin(right).where(pythonTestUDF($"a") === 
pythonTestUDF($"c"))
   ```
   
   Query plan before the PR:
   ```
   == Physical Plan ==                                                          
    
   *(3) Project [a#2121, b#2122, c#2132, d#2133]                      
   +- *(3) Filter (pythonUDF0#2142 = pythonUDF1#2143)                           
                                                                         
      +- BatchEvalPython [udf(a#2121), udf(c#2132)], [pythonUDF0#2142, 
pythonUDF1#2143]
         +- BroadcastNestedLoopJoin BuildRight, Cross                           
                                     
            :- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]              
     
            :  +- LocalTableScan [_1#2116, _2#2117]                             
      
            +- BroadcastExchange IdentityBroadcastMode                       
               +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]           
     
                  +- LocalTableScan [_1#2127, _2#2128]            
   ```
   
   Query plan after the PR:
   ```
   == Physical Plan ==
   *(3) Project [a#2121, b#2122, c#2132, d#2133]
   +- *(3) BroadcastHashJoin [pythonUDF0#2142], [pythonUDF0#2143], Cross, 
BuildRight
      :- BatchEvalPython [udf(a#2121)], [pythonUDF0#2142]
      :  +- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]
      :     +- LocalTableScan [_1#2116, _2#2117]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[2, string, 
true]))
         +- BatchEvalPython [udf(c#2132)], [pythonUDF0#2143]
            +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]
               +- LocalTableScan [_1#2127, _2#2128]
   ```
   
   After this PR, the join can use `BroadcastHashJoin`, instead of 
`BroadcastNestedLoopJoin`.
   
   ## How was this patch tested?
   
   Added tests.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to