[ 
https://issues.apache.org/jira/browse/SPARK-38288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498668#comment-17498668
 ] 

Andrew Murphy commented on SPARK-38288:
---------------------------------------

Hi [~llozano] I believe this is because JDBC DataSource V2 has not been fully 
implemented. Even though [29695|https://github.com/apache/spark/pull/29695] has 
merged, reading from a JDBC connection still defaults to JDBC DataSource V1.

> Aggregate push down doesnt work using Spark SQL jdbc datasource with 
> postgresql
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-38288
>                 URL: https://issues.apache.org/jira/browse/SPARK-38288
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: Luis Lozano Coira
>            Priority: Major
>              Labels: DataSource, Spark-SQL
>
> I am establishing a connection with postgresql using the Spark SQL jdbc 
> datasource. I have started the spark shell including the postgres driver and 
> I can connect and execute queries without problems. I am using this statement:
> {code:java}
> val df = spark.read.format("jdbc").option("url", 
> "jdbc:postgresql://host:port/").option("driver", 
> "org.postgresql.Driver").option("dbtable", "test").option("user", 
> "postgres").option("password", 
> "*******").option("pushDownAggregate",true).load()
> {code}
> I am adding the pushDownAggregate option because I would like the 
> aggregations are delegated to the source. But for some reason this is not 
> happening.
> Reviewing this pull request, it seems that this feature should be merged into 
> 3.2. [https://github.com/apache/spark/pull/29695]
> I am making the aggregations considering the mentioned limitations. An 
> example case where I don't see pushdown being done would be this one:
> {code:java}
> df.groupBy("name").max("age").show()
> {code}
> The results of the queryExecution are shown below:
> {code:java}
> scala> df.groupBy("name").max("age").queryExecution.executedPlan
> res19: org.apache.spark.sql.execution.SparkPlan =
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[name#274], functions=[max(age#246)], output=[name#274, 
> max(age)#544])
>    +- Exchange hashpartitioning(name#274, 200), ENSURE_REQUIREMENTS, [id=#205]
>       +- HashAggregate(keys=[name#274], functions=[partial_max(age#246)], 
> output=[name#274, max#548])
>          +- Scan JDBCRelation(test) [numPartitions=1] [age#246,name#274] 
> PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: 
> struct<age:int,name:string>
> scala> dfp.groupBy("name").max("age").queryExecution.toString
> res20: String =
> "== Parsed Logical Plan ==
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#246] JDBCRelation(test) [numPartitions=1]
> == Analyzed Logical Plan ==
> name: string, max(age): int
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#24...
> {code}
> What could be the problem? Should pushDownAggregate work in this case?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to