Hi,

Rohit, can you share how it looks using DSv2?

Thanks!

On Wed, 3 Nov 2021 at 19:35, huaxin gao <huaxin.ga...@gmail.com> wrote:

> Great to hear. Thanks for testing this!
>
> On Wed, Nov 3, 2021 at 4:03 AM Kapoor, Rohit <rohit.kap...@envestnet.com>
> wrote:
>
>> Thanks for your guidance Huaxin. I have been able to test the push down
>> operators successfully against Postgresql using DS v2.
>>
>>
>>
>>
>>
>> *From: *huaxin gao <huaxin.ga...@gmail.com>
>> *Date: *Tuesday, 2 November 2021 at 12:35 AM
>> *To: *Kapoor, Rohit <rohit.kap...@envestnet.com>
>> *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
>>
>>
>>
>> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
>> ALWAYS VERIFY THE SOURCE OF MESSAGES. *
>>
>>
>>
>> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
>> ALWAYS VERIFY THE SOURCE OF MESSAGES. *
>>
>> No need to write a customized data source reader. You may want to follow
>> the example here
>> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40
>> to use DS v2. The example uses h2 database. Please modify it to use
>> postgresql.
>>
>>
>>
>> Huaxin
>>
>>
>>
>>
>>
>> On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit <rohit.kap...@envestnet.com>
>> wrote:
>>
>> Hi Huaxin,
>>
>>
>>
>> Thanks a lot for your response. Do I need to write a custom data source
>> reader (in my case, for PostgreSql) using the Spark DS v2 APIs, instead of
>> the standard spark.read.format(“jdbc”) ?
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Rohit
>>
>>
>>
>> *From: *huaxin gao <huaxin.ga...@gmail.com>
>> *Date: *Monday, 1 November 2021 at 11:32 PM
>> *To: *Kapoor, Rohit <rohit.kap...@envestnet.com>
>> *Cc: *user@spark.apache.org <user@spark.apache.org>
>> *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
>>
>> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
>> ALWAYS VERIFY THE SOURCE OF MESSAGES.*
>>
>> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
>> ALWAYS VERIFY THE SOURCE OF MESSAGES.*
>>
>> Hi Rohit,
>>
>>
>>
>> Thanks for testing this. Seems to me that you are using DS v1. We only
>> support aggregate push down in DS v2. Could you please try again using DS
>> v2 and let me know how it goes?
>>
>>
>>
>> Thanks,
>>
>> Huaxin
>>
>>
>>
>> On Mon, Nov 1, 2021 at 10:39 AM Chao Sun <sunc...@apache.org> wrote:
>>
>>
>>
>> ---------- Forwarded message ---------
>> From: *Kapoor, Rohit* <rohit.kap...@envestnet.com>
>> Date: Mon, Nov 1, 2021 at 6:27 AM
>> Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
>> To: user@spark.apache.org <user@spark.apache.org>
>>
>>
>>
>> Hi,
>>
>>
>>
>> I am testing the aggregate push down for JDBC after going through the
>> JIRA - https://issues.apache.org/jira/browse/SPARK-34952
>>
>> I have the latest Spark 3.2 setup in local mode (laptop).
>>
>>
>>
>> I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate
>> query on “emp” table that has 1000002 rows and a simple schema with 3
>> columns (empid, ename and sal) as below:
>>
>>
>>
>> val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"
>>
>>
>>
>> val jdbcDF = spark.read
>>
>>     .format("jdbc")
>>
>>     .option("url", jdbcString)
>>
>>     .option("dbtable", "emp")
>>
>>     .option("pushDownAggregate","true")
>>
>>     .option("user", "xxxx")
>>
>>     .option("password", "xxxx")
>>
>>     .load()
>>
>>     .where("empid > 1")
>>
>>     .agg(max("SAL")).alias("max_sal")
>>
>>
>>
>>
>>
>> The complete plan details are:
>>
>>
>>
>> == Parsed Logical Plan ==
>>
>> SubqueryAlias max_sal
>>
>> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>>
>>    +- Filter (empid#0 > 1)
>>
>>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
>> [numPartitions=1]
>>
>>
>>
>> == Analyzed Logical Plan ==
>>
>> max(SAL): int
>>
>> SubqueryAlias max_sal
>>
>> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>>
>>    +- Filter (empid#0 > 1)
>>
>>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
>> [numPartitions=1]
>>
>>
>>
>> == Optimized Logical Plan ==
>>
>> Aggregate [max(SAL#2) AS max(SAL)#10]
>>
>> +- Project [sal#2]
>>
>>    +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
>>
>>       +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
>> [numPartitions=1]
>>
>>
>>
>> == Physical Plan ==
>>
>> AdaptiveSparkPlan isFinalPlan=false
>>
>> +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
>>
>>    +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
>>
>>       +- HashAggregate(keys=[], functions=[partial_max(SAL#2)],
>> output=[max#13])
>>
>>          +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] 
>> *PushedAggregates:
>> []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)],
>> PushedGroupby: [], ReadSchema: struct<sal:int>
>>
>>
>>
>>
>>
>> I also checked the sql submitted to the database, querying
>> pg_stat_statements, and it confirms that the aggregate was not pushed
>> down to the database. Here is the query submitted to the database:
>>
>>
>>
>> SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)
>>
>>
>>
>> All the rows are read and aggregated in the Spark layer.
>>
>>
>>
>> Is there any configuration I missing here? Why is aggregate push down not
>> working for me?
>>
>> Any pointers would be greatly appreciated.
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Rohit
>> ------------------------------
>>
>> Disclaimer: The information in this email is confidential and may be
>> legally privileged. Access to this Internet email by anyone else other than
>> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
>> do not accept time-sensitive transactional messages, including orders to
>> buy and sell securities, account allocation instructions, or any other
>> instructions affecting a client account, via e-mail. If you are not the
>> intended recipient of this email, any disclosure, copying, or distribution
>> of it is prohibited and may be unlawful. If you have received this email in
>> error, please notify the sender and immediately and permanently delete it
>> and destroy any copies of it that were printed out. When addressed to our
>> clients, any opinions or advice contained in this email is subject to the
>> terms and conditions expressed in any applicable governing terms of
>> business or agreements.
>> ------------------------------
>>
>> ------------------------------
>>
>> Disclaimer: The information in this email is confidential and may be
>> legally privileged. Access to this Internet email by anyone else other than
>> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
>> do not accept time-sensitive transactional messages, including orders to
>> buy and sell securities, account allocation instructions, or any other
>> instructions affecting a client account, via e-mail. If you are not the
>> intended recipient of this email, any disclosure, copying, or distribution
>> of it is prohibited and may be unlawful. If you have received this email in
>> error, please notify the sender and immediately and permanently delete it
>> and destroy any copies of it that were printed out. When addressed to our
>> clients, any opinions or advice contained in this email is subject to the
>> terms and conditions expressed in any applicable governing terms of
>> business or agreements.
>> ------------------------------
>>
>> ------------------------------
>>
>> Disclaimer: The information in this email is confidential and may be
>> legally privileged. Access to this Internet email by anyone else other than
>> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
>> do not accept time-sensitive transactional messages, including orders to
>> buy and sell securities, account allocation instructions, or any other
>> instructions affecting a client account, via e-mail. If you are not the
>> intended recipient of this email, any disclosure, copying, or distribution
>> of it is prohibited and may be unlawful. If you have received this email in
>> error, please notify the sender and immediately and permanently delete it
>> and destroy any copies of it that were printed out. When addressed to our
>> clients, any opinions or advice contained in this email is subject to the
>> terms and conditions expressed in any applicable governing terms of
>> business or agreements.
>> ------------------------------
>>
>

Reply via email to