This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c121bb557447 [SPARK-49111][SQL] Move withProjectAndFilter to the 
companion object of DataSourceV2Strategy
c121bb557447 is described below

commit c121bb557447709c19fc01b68b3a34b8838abc6d
Author: Uros Stankovic <uros.stanko...@databricks.com>
AuthorDate: Fri Aug 9 09:25:28 2024 +0200

    [SPARK-49111][SQL] Move withProjectAndFilter to the companion object of 
DataSourceV2Strategy
    
    ### What changes were proposed in this pull request?
    Move static method `withProjectAndFilter` to object in DataSourceV2Strategy
    
    ### Why are the changes needed?
    It provides better opportunities for reuse, since object of strategy is not 
needed anymore for function invocation, and a code is also cleaner.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Simple refactor, there is no new changes that can be tested.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47606 from urosstan-db/data-source-v2-strategy-refactor.
    
    Authored-by: Uros Stankovic <uros.stanko...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../datasources/v2/DataSourceV2Strategy.scala      | 51 +++++++++++++---------
 1 file changed, 31 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index bb138c0fcd0a..b0a89173060a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -53,21 +53,6 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
   import DataSourceV2Implicits._
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
-  private def withProjectAndFilter(
-      project: Seq[NamedExpression],
-      filters: Seq[Expression],
-      scan: LeafExecNode,
-      needsUnsafeConversion: Boolean): SparkPlan = {
-    val filterCondition = filters.reduceLeftOption(And)
-    val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
-
-    if (withFilter.output != project || needsUnsafeConversion) {
-      ProjectExec(project, withFilter)
-    } else {
-      withFilter
-    }
-  }
-
   private def refreshCache(r: DataSourceV2Relation)(): Unit = {
     session.sharedState.cacheManager.recacheByPlan(session, r)
   }
@@ -128,12 +113,14 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
         unsafeRowRDD,
         v1Relation,
         tableIdentifier)
-      withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = 
false) :: Nil
+      DataSourceV2Strategy.withProjectAndFilter(
+        project, filters, dsScan, needsUnsafeConversion = false) :: Nil
 
     case PhysicalOperation(project, filters,
         DataSourceV2ScanRelation(_, scan: LocalScan, output, _, _)) =>
       val localScanExec = LocalTableScanExec(output, 
scan.rows().toImmutableArraySeq)
-      withProjectAndFilter(project, filters, localScanExec, 
needsUnsafeConversion = false) :: Nil
+      DataSourceV2Strategy.withProjectAndFilter(
+        project, filters, localScanExec, needsUnsafeConversion = false) :: Nil
 
     case PhysicalOperation(project, filters, relation: 
DataSourceV2ScanRelation) =>
       // projection and filters were already pushed down in the optimizer.
@@ -146,7 +133,8 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       val batchExec = BatchScanExec(relation.output, relation.scan, 
runtimeFilters,
         relation.ordering, relation.relation.table,
         StoragePartitionJoinParams(relation.keyGroupedPartitioning))
-      withProjectAndFilter(project, postScanFilters, batchExec, 
!batchExec.supportsColumnar) :: Nil
+      DataSourceV2Strategy.withProjectAndFilter(
+        project, postScanFilters, batchExec, !batchExec.supportsColumnar) :: 
Nil
 
     case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
       if r.startOffset.isDefined && r.endOffset.isDefined =>
@@ -156,7 +144,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
         r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
 
       // Add a Project here to make sure we produce unsafe rows.
-      withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil
+      DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec, 
!scanExec.supportsColumnar) :: Nil
 
     case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
       if r.startOffset.isDefined && r.endOffset.isEmpty =>
@@ -165,7 +153,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, 
r.startOffset.get)
 
       // Add a Project here to make sure we produce unsafe rows.
-      withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil
+      DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec, 
!scanExec.supportsColumnar) :: Nil
 
     case WriteToDataSourceV2(relationOpt, writer, query, customMetrics) =>
       val invalidateCacheFunc: () => Unit = () => relationOpt match {
@@ -645,6 +633,29 @@ private[sql] object DataSourceV2Strategy extends Logging {
       logWarning(log"Can't translate ${MDC(EXPR, other)} to source filter, 
unsupported expression")
       None
   }
+
+  /**
+   * Creates new spark plan that should apply given filters and projections to 
given scan node
+   * @param project Projection list that should be output of returned spark 
plan
+   * @param filters Filter list that should be applied to scan node
+   * @param scan Scan node
+   * @param needsUnsafeConversion Value that indicates whether unsafe 
conversion is needed
+   * @return SparkPlan tree composed of scan node and eventually 
filter/project nodes
+   */
+  protected[sql] def withProjectAndFilter(
+      project: Seq[NamedExpression],
+      filters: Seq[Expression],
+      scan: LeafExecNode,
+      needsUnsafeConversion: Boolean): SparkPlan = {
+    val filterCondition = filters.reduceLeftOption(And)
+    val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+    if (withFilter.output != project || needsUnsafeConversion) {
+      ProjectExec(project, withFilter)
+    } else {
+      withFilter
+    }
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to