Great to hear. Thanks for testing this! On Wed, Nov 3, 2021 at 4:03 AM Kapoor, Rohit <rohit.kap...@envestnet.com> wrote:
> Thanks for your guidance Huaxin. I have been able to test the push down > operators successfully against Postgresql using DS v2. > > > > > > *From: *huaxin gao <huaxin.ga...@gmail.com> > *Date: *Tuesday, 2 November 2021 at 12:35 AM > *To: *Kapoor, Rohit <rohit.kap...@envestnet.com> > *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2 > > > > *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. > ALWAYS VERIFY THE SOURCE OF MESSAGES. * > > > > *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. > ALWAYS VERIFY THE SOURCE OF MESSAGES. * > > No need to write a customized data source reader. You may want to follow > the example here > https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40 > to use DS v2. The example uses h2 database. Please modify it to use > postgresql. > > > > Huaxin > > > > > > On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit <rohit.kap...@envestnet.com> > wrote: > > Hi Huaxin, > > > > Thanks a lot for your response. Do I need to write a custom data source > reader (in my case, for PostgreSql) using the Spark DS v2 APIs, instead of > the standard spark.read.format(“jdbc”) ? > > > > > > Thanks, > > Rohit > > > > *From: *huaxin gao <huaxin.ga...@gmail.com> > *Date: *Monday, 1 November 2021 at 11:32 PM > *To: *Kapoor, Rohit <rohit.kap...@envestnet.com> > *Cc: *user@spark.apache.org <user@spark.apache.org> > *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2 > > *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. > ALWAYS VERIFY THE SOURCE OF MESSAGES.* > > *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. > ALWAYS VERIFY THE SOURCE OF MESSAGES.* > > Hi Rohit, > > > > Thanks for testing this. Seems to me that you are using DS v1. We only > support aggregate push down in DS v2. Could you please try again using DS > v2 and let me know how it goes? > > > > Thanks, > > Huaxin > > > > On Mon, Nov 1, 2021 at 10:39 AM Chao Sun <sunc...@apache.org> wrote: > > > > ---------- Forwarded message --------- > From: *Kapoor, Rohit* <rohit.kap...@envestnet.com> > Date: Mon, Nov 1, 2021 at 6:27 AM > Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2 > To: user@spark.apache.org <user@spark.apache.org> > > > > Hi, > > > > I am testing the aggregate push down for JDBC after going through the JIRA > - https://issues.apache.org/jira/browse/SPARK-34952 > > I have the latest Spark 3.2 setup in local mode (laptop). > > > > I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate > query on “emp” table that has 1000002 rows and a simple schema with 3 > columns (empid, ename and sal) as below: > > > > val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres" > > > > val jdbcDF = spark.read > > .format("jdbc") > > .option("url", jdbcString) > > .option("dbtable", "emp") > > .option("pushDownAggregate","true") > > .option("user", "xxxx") > > .option("password", "xxxx") > > .load() > > .where("empid > 1") > > .agg(max("SAL")).alias("max_sal") > > > > > > The complete plan details are: > > > > == Parsed Logical Plan == > > SubqueryAlias max_sal > > +- Aggregate [max(SAL#2) AS max(SAL)#10] > > +- Filter (empid#0 > 1) > > +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) > [numPartitions=1] > > > > == Analyzed Logical Plan == > > max(SAL): int > > SubqueryAlias max_sal > > +- Aggregate [max(SAL#2) AS max(SAL)#10] > > +- Filter (empid#0 > 1) > > +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) > [numPartitions=1] > > > > == Optimized Logical Plan == > > Aggregate [max(SAL#2) AS max(SAL)#10] > > +- Project [sal#2] > > +- Filter (isnotnull(empid#0) AND (empid#0 > 1)) > > +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) > [numPartitions=1] > > > > == Physical Plan == > > AdaptiveSparkPlan isFinalPlan=false > > +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10]) > > +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15] > > +- HashAggregate(keys=[], functions=[partial_max(SAL#2)], > output=[max#13]) > > +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] > *PushedAggregates: > []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)], > PushedGroupby: [], ReadSchema: struct<sal:int> > > > > > > I also checked the sql submitted to the database, querying > pg_stat_statements, and it confirms that the aggregate was not pushed > down to the database. Here is the query submitted to the database: > > > > SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1) > > > > All the rows are read and aggregated in the Spark layer. > > > > Is there any configuration I missing here? Why is aggregate push down not > working for me? > > Any pointers would be greatly appreciated. > > > > > > Thanks, > > Rohit > ------------------------------ > > Disclaimer: The information in this email is confidential and may be > legally privileged. Access to this Internet email by anyone else other than > the recipient is unauthorized. Envestnet, Inc. and its affiliated companies > do not accept time-sensitive transactional messages, including orders to > buy and sell securities, account allocation instructions, or any other > instructions affecting a client account, via e-mail. If you are not the > intended recipient of this email, any disclosure, copying, or distribution > of it is prohibited and may be unlawful. If you have received this email in > error, please notify the sender and immediately and permanently delete it > and destroy any copies of it that were printed out. When addressed to our > clients, any opinions or advice contained in this email is subject to the > terms and conditions expressed in any applicable governing terms of > business or agreements. > ------------------------------ > > ------------------------------ > > Disclaimer: The information in this email is confidential and may be > legally privileged. Access to this Internet email by anyone else other than > the recipient is unauthorized. Envestnet, Inc. and its affiliated companies > do not accept time-sensitive transactional messages, including orders to > buy and sell securities, account allocation instructions, or any other > instructions affecting a client account, via e-mail. If you are not the > intended recipient of this email, any disclosure, copying, or distribution > of it is prohibited and may be unlawful. If you have received this email in > error, please notify the sender and immediately and permanently delete it > and destroy any copies of it that were printed out. When addressed to our > clients, any opinions or advice contained in this email is subject to the > terms and conditions expressed in any applicable governing terms of > business or agreements. > ------------------------------ > > ------------------------------ > > Disclaimer: The information in this email is confidential and may be > legally privileged. Access to this Internet email by anyone else other than > the recipient is unauthorized. Envestnet, Inc. and its affiliated companies > do not accept time-sensitive transactional messages, including orders to > buy and sell securities, account allocation instructions, or any other > instructions affecting a client account, via e-mail. If you are not the > intended recipient of this email, any disclosure, copying, or distribution > of it is prohibited and may be unlawful. If you have received this email in > error, please notify the sender and immediately and permanently delete it > and destroy any copies of it that were printed out. When addressed to our > clients, any opinions or advice contained in this email is subject to the > terms and conditions expressed in any applicable governing terms of > business or agreements. > ------------------------------ >