Unsubscribe.

On Mon, Nov 1, 2021 at 6:57 PM Kapoor, Rohit <rohit.kap...@envestnet.com>
wrote:

> Hi,
>
>
>
> I am testing the aggregate push down for JDBC after going through the JIRA
> - https://issues.apache.org/jira/browse/SPARK-34952
>
> I have the latest Spark 3.2 setup in local mode (laptop).
>
>
>
> I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate
> query on “emp” table that has 1000002 rows and a simple schema with 3
> columns (empid, ename and sal) as below:
>
>
>
> val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"
>
>
>
> val jdbcDF = spark.read
>
>     .format("jdbc")
>
>     .option("url", jdbcString)
>
>     .option("dbtable", "emp")
>
>     .option("pushDownAggregate","true")
>
>     .option("user", "xxxx")
>
>     .option("password", "xxxx")
>
>     .load()
>
>     .where("empid > 1")
>
>     .agg(max("SAL")).alias("max_sal")
>
>
>
>
>
> The complete plan details are:
>
>
>
> == Parsed Logical Plan ==
>
> SubqueryAlias max_sal
>
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>
>    +- Filter (empid#0 > 1)
>
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Analyzed Logical Plan ==
>
> max(SAL): int
>
> SubqueryAlias max_sal
>
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>
>    +- Filter (empid#0 > 1)
>
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Optimized Logical Plan ==
>
> Aggregate [max(SAL#2) AS max(SAL)#10]
>
> +- Project [sal#2]
>
>    +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
>
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=false
>
> +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
>
>    +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
>
>       +- HashAggregate(keys=[], functions=[partial_max(SAL#2)],
> output=[max#13])
>
>          +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] 
> *PushedAggregates:
> []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)],
> PushedGroupby: [], ReadSchema: struct<sal:int>
>
>
>
>
>
> I also checked the sql submitted to the database, querying
> pg_stat_statements, and it confirms that the aggregate was not pushed
> down to the database. Here is the query submitted to the database:
>
>
>
> SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)
>
>
>
> All the rows are read and aggregated in the Spark layer.
>
>
>
> Is there any configuration I missing here? Why is aggregate push down not
> working for me?
>
> Any pointers would be greatly appreciated.
>
>
>
>
>
> Thanks,
>
> Rohit
> ------------------------------
>
> Disclaimer: The information in this email is confidential and may be
> legally privileged. Access to this Internet email by anyone else other than
> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
> do not accept time-sensitive transactional messages, including orders to
> buy and sell securities, account allocation instructions, or any other
> instructions affecting a client account, via e-mail. If you are not the
> intended recipient of this email, any disclosure, copying, or distribution
> of it is prohibited and may be unlawful. If you have received this email in
> error, please notify the sender and immediately and permanently delete it
> and destroy any copies of it that were printed out. When addressed to our
> clients, any opinions or advice contained in this email is subject to the
> terms and conditions expressed in any applicable governing terms of
> business or agreements.
> ------------------------------
>

Reply via email to