
On Mon, Nov 1, 2021 at 6:57 PM Kapoor, Rohit <rohit.kap...@envestnet.com>

> Hi,
> I am testing the aggregate push down for JDBC after going through the JIRA
> - https://issues.apache.org/jira/browse/SPARK-34952
> I have the latest Spark 3.2 setup in local mode (laptop).
> I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate
> query on “emp” table that has 1000002 rows and a simple schema with 3
> columns (empid, ename and sal) as below:
> val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"
> val jdbcDF = spark.read
>     .format("jdbc")
>     .option("url", jdbcString)
>     .option("dbtable", "emp")
>     .option("pushDownAggregate","true")
>     .option("user", "xxxx")
>     .option("password", "xxxx")
>     .load()
>     .where("empid > 1")
>     .agg(max("SAL")).alias("max_sal")
> The complete plan details are:
> == Parsed Logical Plan ==
> SubqueryAlias max_sal
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>    +- Filter (empid#0 > 1)
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
> == Analyzed Logical Plan ==
> max(SAL): int
> SubqueryAlias max_sal
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>    +- Filter (empid#0 > 1)
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
> == Optimized Logical Plan ==
> Aggregate [max(SAL#2) AS max(SAL)#10]
> +- Project [sal#2]
>    +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
>    +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
>       +- HashAggregate(keys=[], functions=[partial_max(SAL#2)],
> output=[max#13])
>          +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] 
> *PushedAggregates:
> []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)],
> PushedGroupby: [], ReadSchema: struct<sal:int>
> I also checked the sql submitted to the database, querying
> pg_stat_statements, and it confirms that the aggregate was not pushed
> down to the database. Here is the query submitted to the database:
> SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)
> All the rows are read and aggregated in the Spark layer.
> Is there any configuration I missing here? Why is aggregate push down not
> working for me?
> Any pointers would be greatly appreciated.
> Thanks,
> Rohit
> ------------------------------
> Disclaimer: The information in this email is confidential and may be
> legally privileged. Access to this Internet email by anyone else other than
> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
> do not accept time-sensitive transactional messages, including orders to
> buy and sell securities, account allocation instructions, or any other
> instructions affecting a client account, via e-mail. If you are not the
> intended recipient of this email, any disclosure, copying, or distribution
> of it is prohibited and may be unlawful. If you have received this email in
> error, please notify the sender and immediately and permanently delete it
> and destroy any copies of it that were printed out. When addressed to our
> clients, any opinions or advice contained in this email is subject to the
> terms and conditions expressed in any applicable governing terms of
> business or agreements.
> ------------------------------

Reply via email to