Unsubscribe. On Mon, Nov 1, 2021 at 6:57 PM Kapoor, Rohit <rohit.kap...@envestnet.com> wrote:
> Hi, > > > > I am testing the aggregate push down for JDBC after going through the JIRA > - https://issues.apache.org/jira/browse/SPARK-34952 > > I have the latest Spark 3.2 setup in local mode (laptop). > > > > I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate > query on “emp” table that has 1000002 rows and a simple schema with 3 > columns (empid, ename and sal) as below: > > > > val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres" > > > > val jdbcDF = spark.read > > .format("jdbc") > > .option("url", jdbcString) > > .option("dbtable", "emp") > > .option("pushDownAggregate","true") > > .option("user", "xxxx") > > .option("password", "xxxx") > > .load() > > .where("empid > 1") > > .agg(max("SAL")).alias("max_sal") > > > > > > The complete plan details are: > > > > == Parsed Logical Plan == > > SubqueryAlias max_sal > > +- Aggregate [max(SAL#2) AS max(SAL)#10] > > +- Filter (empid#0 > 1) > > +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) > [numPartitions=1] > > > > == Analyzed Logical Plan == > > max(SAL): int > > SubqueryAlias max_sal > > +- Aggregate [max(SAL#2) AS max(SAL)#10] > > +- Filter (empid#0 > 1) > > +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) > [numPartitions=1] > > > > == Optimized Logical Plan == > > Aggregate [max(SAL#2) AS max(SAL)#10] > > +- Project [sal#2] > > +- Filter (isnotnull(empid#0) AND (empid#0 > 1)) > > +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) > [numPartitions=1] > > > > == Physical Plan == > > AdaptiveSparkPlan isFinalPlan=false > > +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10]) > > +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15] > > +- HashAggregate(keys=[], functions=[partial_max(SAL#2)], > output=[max#13]) > > +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] > *PushedAggregates: > []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)], > PushedGroupby: [], ReadSchema: struct<sal:int> > > > > > > I also checked the sql submitted to the database, querying > pg_stat_statements, and it confirms that the aggregate was not pushed > down to the database. Here is the query submitted to the database: > > > > SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1) > > > > All the rows are read and aggregated in the Spark layer. > > > > Is there any configuration I missing here? Why is aggregate push down not > working for me? > > Any pointers would be greatly appreciated. > > > > > > Thanks, > > Rohit > ------------------------------ > > Disclaimer: The information in this email is confidential and may be > legally privileged. Access to this Internet email by anyone else other than > the recipient is unauthorized. Envestnet, Inc. and its affiliated companies > do not accept time-sensitive transactional messages, including orders to > buy and sell securities, account allocation instructions, or any other > instructions affecting a client account, via e-mail. If you are not the > intended recipient of this email, any disclosure, copying, or distribution > of it is prohibited and may be unlawful. If you have received this email in > error, please notify the sender and immediately and permanently delete it > and destroy any copies of it that were printed out. When addressed to our > clients, any opinions or advice contained in this email is subject to the > terms and conditions expressed in any applicable governing terms of > business or agreements. > ------------------------------ >