Great to hear. Thanks for testing this!

On Wed, Nov 3, 2021 at 4:03 AM Kapoor, Rohit <rohit.kap...@envestnet.com>
wrote:

> Thanks for your guidance Huaxin. I have been able to test the push down
> operators successfully against Postgresql using DS v2.
>
>
>
>
>
> *From: *huaxin gao <huaxin.ga...@gmail.com>
> *Date: *Tuesday, 2 November 2021 at 12:35 AM
> *To: *Kapoor, Rohit <rohit.kap...@envestnet.com>
> *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
>
>
>
> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
> ALWAYS VERIFY THE SOURCE OF MESSAGES. *
>
>
>
> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
> ALWAYS VERIFY THE SOURCE OF MESSAGES. *
>
> No need to write a customized data source reader. You may want to follow
> the example here
> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40
> to use DS v2. The example uses h2 database. Please modify it to use
> postgresql.
>
>
>
> Huaxin
>
>
>
>
>
> On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit <rohit.kap...@envestnet.com>
> wrote:
>
> Hi Huaxin,
>
>
>
> Thanks a lot for your response. Do I need to write a custom data source
> reader (in my case, for PostgreSql) using the Spark DS v2 APIs, instead of
> the standard spark.read.format(“jdbc”) ?
>
>
>
>
>
> Thanks,
>
> Rohit
>
>
>
> *From: *huaxin gao <huaxin.ga...@gmail.com>
> *Date: *Monday, 1 November 2021 at 11:32 PM
> *To: *Kapoor, Rohit <rohit.kap...@envestnet.com>
> *Cc: *user@spark.apache.org <user@spark.apache.org>
> *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
>
> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
> ALWAYS VERIFY THE SOURCE OF MESSAGES.*
>
> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
> ALWAYS VERIFY THE SOURCE OF MESSAGES.*
>
> Hi Rohit,
>
>
>
> Thanks for testing this. Seems to me that you are using DS v1. We only
> support aggregate push down in DS v2. Could you please try again using DS
> v2 and let me know how it goes?
>
>
>
> Thanks,
>
> Huaxin
>
>
>
> On Mon, Nov 1, 2021 at 10:39 AM Chao Sun <sunc...@apache.org> wrote:
>
>
>
> ---------- Forwarded message ---------
> From: *Kapoor, Rohit* <rohit.kap...@envestnet.com>
> Date: Mon, Nov 1, 2021 at 6:27 AM
> Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
> To: user@spark.apache.org <user@spark.apache.org>
>
>
>
> Hi,
>
>
>
> I am testing the aggregate push down for JDBC after going through the JIRA
> - https://issues.apache.org/jira/browse/SPARK-34952
>
> I have the latest Spark 3.2 setup in local mode (laptop).
>
>
>
> I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate
> query on “emp” table that has 1000002 rows and a simple schema with 3
> columns (empid, ename and sal) as below:
>
>
>
> val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"
>
>
>
> val jdbcDF = spark.read
>
>     .format("jdbc")
>
>     .option("url", jdbcString)
>
>     .option("dbtable", "emp")
>
>     .option("pushDownAggregate","true")
>
>     .option("user", "xxxx")
>
>     .option("password", "xxxx")
>
>     .load()
>
>     .where("empid > 1")
>
>     .agg(max("SAL")).alias("max_sal")
>
>
>
>
>
> The complete plan details are:
>
>
>
> == Parsed Logical Plan ==
>
> SubqueryAlias max_sal
>
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>
>    +- Filter (empid#0 > 1)
>
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Analyzed Logical Plan ==
>
> max(SAL): int
>
> SubqueryAlias max_sal
>
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>
>    +- Filter (empid#0 > 1)
>
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Optimized Logical Plan ==
>
> Aggregate [max(SAL#2) AS max(SAL)#10]
>
> +- Project [sal#2]
>
>    +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
>
>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=false
>
> +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
>
>    +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
>
>       +- HashAggregate(keys=[], functions=[partial_max(SAL#2)],
> output=[max#13])
>
>          +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] 
> *PushedAggregates:
> []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)],
> PushedGroupby: [], ReadSchema: struct<sal:int>
>
>
>
>
>
> I also checked the sql submitted to the database, querying
> pg_stat_statements, and it confirms that the aggregate was not pushed
> down to the database. Here is the query submitted to the database:
>
>
>
> SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)
>
>
>
> All the rows are read and aggregated in the Spark layer.
>
>
>
> Is there any configuration I missing here? Why is aggregate push down not
> working for me?
>
> Any pointers would be greatly appreciated.
>
>
>
>
>
> Thanks,
>
> Rohit
> ------------------------------
>
> Disclaimer: The information in this email is confidential and may be
> legally privileged. Access to this Internet email by anyone else other than
> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
> do not accept time-sensitive transactional messages, including orders to
> buy and sell securities, account allocation instructions, or any other
> instructions affecting a client account, via e-mail. If you are not the
> intended recipient of this email, any disclosure, copying, or distribution
> of it is prohibited and may be unlawful. If you have received this email in
> error, please notify the sender and immediately and permanently delete it
> and destroy any copies of it that were printed out. When addressed to our
> clients, any opinions or advice contained in this email is subject to the
> terms and conditions expressed in any applicable governing terms of
> business or agreements.
> ------------------------------
>
> ------------------------------
>
> Disclaimer: The information in this email is confidential and may be
> legally privileged. Access to this Internet email by anyone else other than
> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
> do not accept time-sensitive transactional messages, including orders to
> buy and sell securities, account allocation instructions, or any other
> instructions affecting a client account, via e-mail. If you are not the
> intended recipient of this email, any disclosure, copying, or distribution
> of it is prohibited and may be unlawful. If you have received this email in
> error, please notify the sender and immediately and permanently delete it
> and destroy any copies of it that were printed out. When addressed to our
> clients, any opinions or advice contained in this email is subject to the
> terms and conditions expressed in any applicable governing terms of
> business or agreements.
> ------------------------------
>
> ------------------------------
>
> Disclaimer: The information in this email is confidential and may be
> legally privileged. Access to this Internet email by anyone else other than
> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
> do not accept time-sensitive transactional messages, including orders to
> buy and sell securities, account allocation instructions, or any other
> instructions affecting a client account, via e-mail. If you are not the
> intended recipient of this email, any disclosure, copying, or distribution
> of it is prohibited and may be unlawful. If you have received this email in
> error, please notify the sender and immediately and permanently delete it
> and destroy any copies of it that were printed out. When addressed to our
> clients, any opinions or advice contained in this email is subject to the
> terms and conditions expressed in any applicable governing terms of
> business or agreements.
> ------------------------------
>

Reply via email to