Re: [Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-04 Thread Kapoor, Rohit
My basic test is here - https://github.com/rohitkapoor1/sparkPushDownAggregate


From: German Schiavon 
Date: Thursday, 4 November 2021 at 2:17 AM
To: huaxin gao 
Cc: Kapoor, Rohit , user@spark.apache.org 

Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.

Hi,

Rohit, can you share how it looks using DSv2?

Thanks!

On Wed, 3 Nov 2021 at 19:35, huaxin gao 
mailto:huaxin.ga...@gmail.com>> wrote:
Great to hear. Thanks for testing this!

On Wed, Nov 3, 2021 at 4:03 AM Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>> wrote:
Thanks for your guidance Huaxin. I have been able to test the push down 
operators successfully against Postgresql using DS v2.


From: huaxin gao mailto:huaxin.ga...@gmail.com>>
Date: Tuesday, 2 November 2021 at 12:35 AM
To: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
No need to write a customized data source reader. You may want to follow the 
example here 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40<https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40>
 to use DS v2. The example uses h2 database. Please modify it to use postgresql.

Huaxin


On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>> wrote:
Hi Huaxin,

Thanks a lot for your response. Do I need to write a custom data source reader 
(in my case, for PostgreSql) using the Spark DS v2 APIs, instead of the 
standard spark.read.format(“jdbc”) ?


Thanks,
Rohit

From: huaxin gao mailto:huaxin.ga...@gmail.com>>
Date: Monday, 1 November 2021 at 11:32 PM
To: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
Hi Rohit,

Thanks for testing this. Seems to me that you are using DS v1. We only support 
aggregate push down in DS v2. Could you please try again using DS v2 and let me 
know how it goes?

Thanks,
Huaxin

On Mon, Nov 1, 2021 at 10:39 AM Chao Sun 
mailto:sunc...@apache.org>> wrote:

-- Forwarded message -
From: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Date: Mon, Nov 1, 2021 at 6:27 AM
Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
To: user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>

Hi,

I am testing the aggregate push down for JDBC after going through the JIRA - 
https://issues.apache.org/jira/browse/SPARK-34952<https://issues.apache.org/jira/browse/SPARK-34952>
I have the latest Spark 3.2 setup in local mode (laptop).

I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate query 
on “emp” table that has 102 rows and a simple schema with 3 columns (empid, 
ename and sal) as below:

val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcString)
.option("dbtable", "emp")
.option("pushDownAggregate","true")
.option("user", "")
.option("password", "")
.load()
.where("empid > 1")
.agg(max("SAL")).alias("max_sal")


The complete plan details are:

== Parsed Logical Plan ==
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Analyzed Logical Plan ==
max(SAL): int
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Optimized Logical Plan ==
Aggregate [max(SAL#2) AS max(SAL)#10]
+- Project [sal#2]
   +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
  +- HashAggregate(ke

Re: [Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-03 Thread Kapoor, Rohit
Thanks for your guidance Huaxin. I have been able to test the push down 
operators successfully against Postgresql using DS v2.


From: huaxin gao 
Date: Tuesday, 2 November 2021 at 12:35 AM
To: Kapoor, Rohit 
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.

EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.

No need to write a customized data source reader. You may want to follow the 
example here 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40<https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40>
 to use DS v2. The example uses h2 database. Please modify it to use postgresql.

Huaxin


On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>> wrote:
Hi Huaxin,

Thanks a lot for your response. Do I need to write a custom data source reader 
(in my case, for PostgreSql) using the Spark DS v2 APIs, instead of the 
standard spark.read.format(“jdbc”) ?


Thanks,
Rohit

From: huaxin gao mailto:huaxin.ga...@gmail.com>>
Date: Monday, 1 November 2021 at 11:32 PM
To: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
Hi Rohit,

Thanks for testing this. Seems to me that you are using DS v1. We only support 
aggregate push down in DS v2. Could you please try again using DS v2 and let me 
know how it goes?

Thanks,
Huaxin

On Mon, Nov 1, 2021 at 10:39 AM Chao Sun 
mailto:sunc...@apache.org>> wrote:

------ Forwarded message -
From: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Date: Mon, Nov 1, 2021 at 6:27 AM
Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
To: user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>

Hi,

I am testing the aggregate push down for JDBC after going through the JIRA - 
https://issues.apache.org/jira/browse/SPARK-34952<https://issues.apache.org/jira/browse/SPARK-34952>
I have the latest Spark 3.2 setup in local mode (laptop).

I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate query 
on “emp” table that has 102 rows and a simple schema with 3 columns (empid, 
ename and sal) as below:

val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcString)
.option("dbtable", "emp")
.option("pushDownAggregate","true")
.option("user", "")
.option("password", "")
.load()
.where("empid > 1")
.agg(max("SAL")).alias("max_sal")


The complete plan details are:

== Parsed Logical Plan ==
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Analyzed Logical Plan ==
max(SAL): int
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Optimized Logical Plan ==
Aggregate [max(SAL#2) AS max(SAL)#10]
+- Project [sal#2]
   +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
  +- HashAggregate(keys=[], functions=[partial_max(SAL#2)], output=[max#13])
 +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] PushedAggregates: 
[], PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)], PushedGroupby: 
[], ReadSchema: struct


I also checked the sql submitted to the database, querying pg_stat_statements, 
and it confirms that the aggregate was not pushed down to the database. Here is 
the query submitted to the database:

SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)

All the rows are read and aggregated in the Spark layer.

Is there any configuration I missing here? Why is aggregate push down not 
working for me?
Any pointers would be greatly appreciated.


Th

Re: [Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-01 Thread Kapoor, Rohit
Hi Huaxin,

Thanks a lot for your response. Do I need to write a custom data source reader 
(in my case, for PostgreSql) using the Spark DS v2 APIs, instead of the 
standard spark.read.format(“jdbc”) ?


Thanks,
Rohit

From: huaxin gao 
Date: Monday, 1 November 2021 at 11:32 PM
To: Kapoor, Rohit 
Cc: user@spark.apache.org 
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.

EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.

Hi Rohit,

Thanks for testing this. Seems to me that you are using DS v1. We only support 
aggregate push down in DS v2. Could you please try again using DS v2 and let me 
know how it goes?

Thanks,
Huaxin

On Mon, Nov 1, 2021 at 10:39 AM Chao Sun 
mailto:sunc...@apache.org>> wrote:

-- Forwarded message -
From: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Date: Mon, Nov 1, 2021 at 6:27 AM
Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
To: user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>

Hi,

I am testing the aggregate push down for JDBC after going through the JIRA - 
https://issues.apache.org/jira/browse/SPARK-34952<https://issues.apache.org/jira/browse/SPARK-34952>
I have the latest Spark 3.2 setup in local mode (laptop).

I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate query 
on “emp” table that has 102 rows and a simple schema with 3 columns (empid, 
ename and sal) as below:

val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcString)
.option("dbtable", "emp")
.option("pushDownAggregate","true")
.option("user", "")
.option("password", "")
.load()
.where("empid > 1")
.agg(max("SAL")).alias("max_sal")


The complete plan details are:

== Parsed Logical Plan ==
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Analyzed Logical Plan ==
max(SAL): int
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Optimized Logical Plan ==
Aggregate [max(SAL#2) AS max(SAL)#10]
+- Project [sal#2]
   +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
  +- HashAggregate(keys=[], functions=[partial_max(SAL#2)], output=[max#13])
 +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] PushedAggregates: 
[], PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)], PushedGroupby: 
[], ReadSchema: struct


I also checked the sql submitted to the database, querying pg_stat_statements, 
and it confirms that the aggregate was not pushed down to the database. Here is 
the query submitted to the database:

SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)

All the rows are read and aggregated in the Spark layer.

Is there any configuration I missing here? Why is aggregate push down not 
working for me?
Any pointers would be greatly appreciated.


Thanks,
Rohit


Disclaimer: The information in this email is confidential and may be legally 
privileged. Access to this Internet email by anyone else other than the 
recipient is unauthorized. Envestnet, Inc. and its affiliated companies do not 
accept time-sensitive transactional messages, including orders to buy and sell 
securities, account allocation instructions, or any other instructions 
affecting a client account, via e-mail. If you are not the intended recipient 
of this email, any disclosure, copying, or distribution of it is prohibited and 
may be unlawful. If you have received this email in error, please notify the 
sender and immediately and permanently delete it and destroy any copies of it 
that were printed out. When addressed to our clients, any opinions or advice 
contained in this email is subject to the terms and conditions expressed in any 
applicable governing terms of business or agreements.




Disclaimer: The information in this email is confidential and may be legally 
privileged. Access to this Internet email by anyone else other than the 
recipient is unauthorized. Envestnet, Inc. 

[Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-01 Thread Kapoor, Rohit
Hi,

I am testing the aggregate push down for JDBC after going through the JIRA - 
https://issues.apache.org/jira/browse/SPARK-34952
I have the latest Spark 3.2 setup in local mode (laptop).

I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate query 
on “emp” table that has 102 rows and a simple schema with 3 columns (empid, 
ename and sal) as below:

val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcString)
.option("dbtable", "emp")
.option("pushDownAggregate","true")
.option("user", "")
.option("password", "")
.load()
.where("empid > 1")
.agg(max("SAL")).alias("max_sal")


The complete plan details are:

== Parsed Logical Plan ==
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Analyzed Logical Plan ==
max(SAL): int
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Optimized Logical Plan ==
Aggregate [max(SAL#2) AS max(SAL)#10]
+- Project [sal#2]
   +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
  +- HashAggregate(keys=[], functions=[partial_max(SAL#2)], output=[max#13])
 +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] PushedAggregates: 
[], PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)], PushedGroupby: 
[], ReadSchema: struct


I also checked the sql submitted to the database, querying pg_stat_statements, 
and it confirms that the aggregate was not pushed down to the database. Here is 
the query submitted to the database:

SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)

All the rows are read and aggregated in the Spark layer.

Is there any configuration I missing here? Why is aggregate push down not 
working for me?
Any pointers would be greatly appreciated.


Thanks,
Rohit


Disclaimer: The information in this email is confidential and may be legally 
privileged. Access to this Internet email by anyone else other than the 
recipient is unauthorized. Envestnet, Inc. and its affiliated companies do not 
accept time-sensitive transactional messages, including orders to buy and sell 
securities, account allocation instructions, or any other instructions 
affecting a client account, via e-mail. If you are not the intended recipient 
of this email, any disclosure, copying, or distribution of it is prohibited and 
may be unlawful. If you have received this email in error, please notify the 
sender and immediately and permanently delete it and destroy any copies of it 
that were printed out. When addressed to our clients, any opinions or advice 
contained in this email is subject to the terms and conditions expressed in any 
applicable governing terms of business or agreements.