Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Gourav Sengupta
Hi,

I am copying Dr. Zaharia in this email as I am quoting from his book (once
again I may be wrong):
Chapter 5: Basic Structured Operations >> Creating Rows

You can create rows by manually instantiating a Row object with the values
that belong in each column. It’s important to note that only DataFrames
have schemas. Rows themselves do not have schemas. This means that if you
create a Row manually, you must specify the values in the same order as the
schema of the DataFrame to which they might be appended (we will see this
when we discuss creating DataFrames):

Chapter 6: Working with different types of data
Starting this Python process is expensive, but the real cost is in
serializing the data to Python. This is costly for two reasons: it is an
expensive computation, but also, after the data enters Python, Spark cannot
manage the memory of the worker. This means that you could potentially
cause a worker to fail if it becomes resource constrained (because both the
JVM and Python are competing for memory on the same machine).

Chapter 18: Monitoring and Debugging (as Sean was mentioning this is about
Driver OOM error)
Issues with JVMs running out of memory can happen if you are using another
language binding, such as Python, due to data conversion between the two
requiring too much memory in the JVM. Try to see whether your issue is
specific to your chosen language and bring back less data to the driver
node, or write it to a file instead of bringing it back as in-memory
objects.

Regards,
Gourav Sengupta


On Wed, Nov 3, 2021 at 10:09 PM Sergey Ivanychev 
wrote:

> I want to further clarify the use case I have: an ML engineer collects
> data so as to use it for training an ML model. The driver is created within
> Jupiter notebook and has 64G of ram for fetching the training set and
> feeding it to the model. Naturally, in this case executors shouldn’t be as
> big as the driver.
>
> Currently, the best solution I found is to write the dataframe to S3, and
> then read it via pd.read_parquet.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
> написал(а):
>
> 
> Thanks for clarification on the koalas case.
>
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
> the data gets shuffled to a single executor that fails with OOM,
>
> I still believe that this may be related to the way k8s handles shuffling.
> In a balanced k8s cluster this could be avoided which does not seem to be
> the case here as the so called driver node has 8 times more RAM than the
> other nodes.
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>
>> I think you're talking about koalas, which is in Spark 3.2, but that is
>> unrelated to toPandas(), nor to the question of how it differs from
>> collect().
>> Shuffle is also unrelated.
>>
>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> As I understood in the previous versions of Spark the data could not be
>>> processed and stored in Pandas data frames in a distributed mode as these
>>> data frames store data in RAM which is the driver in this case.
>>> However, I was under the impression that this limitation no longer
>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>> expect the process to confine itself to the master node? What will happen
>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>> cluster) and run the job again?
>>>
>>> Worth noting that the current Spark on k8s  does not support external
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>> These are
>>>
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>
>>>
>>> The idea is to use dynamic resource allocation where the driver tracks
>>> the shuffle files and evicts only executors not storing active shuffle
>>> files. So in a nutshell these shuffle files are stored in the executors
>>> themselves in the absence of the external shuffle. The model works on the
>>> basis of the "one-container-per-Pod" model
>>>  meaning that for
>>> each node of the cluster there will be one node running the driver and each
>>> remaining node running one executor each.
>>>
>>>
>>>
>>> HTH
>>> ,
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I want to further clarify the use case I have: an ML engineer collects data so 
as to use it for training an ML model. The driver is created within Jupiter 
notebook and has 64G of ram for fetching the training set and feeding it to the 
model. Naturally, in this case executors shouldn’t be as big as the driver.

Currently, the best solution I found is to write the dataframe to S3, and then 
read it via pd.read_parquet.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
> написал(а):
> 
> 
> Thanks for clarification on the koalas case.
> 
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all the 
> data gets shuffled to a single executor that fails with OOM, 
> 
> I still believe that this may be related to the way k8s handles shuffling. In 
> a balanced k8s cluster this could be avoided which does not seem to be the 
> case here as the so called driver node has 8 times more RAM than the other 
> nodes. 
> 
> HTH
> 
>view my Linkedin profile
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>> unrelated to toPandas(), nor to the question of how it differs from 
>> collect().
>> Shuffle is also unrelated.
>> 
>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh  
>>> wrote:
>>> Hi,
>>> 
>>> As I understood in the previous versions of Spark the data could not be 
>>> processed and stored in Pandas data frames in a distributed mode as these 
>>> data frames store data in RAM which is the driver in this case. 
>>> However, I was under the impression that this limitation no longer exists 
>>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB 
>>> of RAM for others, and PySpark running in cluster mode,  how do you expect 
>>> the process to confine itself to the master node? What will happen if you 
>>> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster) 
>>> and run the job again?
>>> 
>>> Worth noting that the current Spark on k8s  does not support external 
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation. 
>>> These are 
>>> 
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>  
>>> The idea is to use dynamic resource allocation where the driver tracks the 
>>> shuffle files and evicts only executors not storing active shuffle files. 
>>> So in a nutshell these shuffle files are stored in the executors themselves 
>>> in the absence of the external shuffle. The model works on the basis of the 
>>> "one-container-per-Pod" model  meaning that for each node of the cluster 
>>> there will be one node running the driver and each remaining node running 
>>> one executor each. 
>>> 
>>> 
>>> HTH
>>> , 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Mich Talebzadeh
Thanks for clarification on the koalas case.

The thread owner states and I quote: .. IIUC, in the `toPandas` case all
the data gets shuffled to a single executor that fails with OOM,

I still believe that this may be related to the way k8s handles shuffling.
In a balanced k8s cluster this could be avoided which does not seem to be
the case here as the so called driver node has 8 times more RAM than the
other nodes.

HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:

> I think you're talking about koalas, which is in Spark 3.2, but that is
> unrelated to toPandas(), nor to the question of how it differs from
> collect().
> Shuffle is also unrelated.
>
> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> As I understood in the previous versions of Spark the data could not be
>> processed and stored in Pandas data frames in a distributed mode as these
>> data frames store data in RAM which is the driver in this case.
>> However, I was under the impression that this limitation no longer exists
>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB
>> of RAM for others, and PySpark running in cluster mode,  how do you expect
>> the process to confine itself to the master node? What will happen if you
>> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster)
>> and run the job again?
>>
>> Worth noting that the current Spark on k8s  does not support external
>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>> These are
>>
>>  --conf spark.dynamicAllocation.enabled=true \
>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>
>>
>> The idea is to use dynamic resource allocation where the driver tracks
>> the shuffle files and evicts only executors not storing active shuffle
>> files. So in a nutshell these shuffle files are stored in the executors
>> themselves in the absence of the external shuffle. The model works on the
>> basis of the "one-container-per-Pod" model
>>  meaning that for
>> each node of the cluster there will be one node running the driver and each
>> remaining node running one executor each.
>>
>>
>>
>> HTH
>> ,
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sean Owen
I think you're talking about koalas, which is in Spark 3.2, but that is
unrelated to toPandas(), nor to the question of how it differs from
collect().
Shuffle is also unrelated.

On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
wrote:

> Hi,
>
> As I understood in the previous versions of Spark the data could not be
> processed and stored in Pandas data frames in a distributed mode as these
> data frames store data in RAM which is the driver in this case.
> However, I was under the impression that this limitation no longer exists
> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB
> of RAM for others, and PySpark running in cluster mode,  how do you expect
> the process to confine itself to the master node? What will happen if you
> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster)
> and run the job again?
>
> Worth noting that the current Spark on k8s  does not support external
> shuffle. For now we have two parameters for Dynamic Resource Allocation.
> These are
>
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>
>
> The idea is to use dynamic resource allocation where the driver tracks
> the shuffle files and evicts only executors not storing active shuffle
> files. So in a nutshell these shuffle files are stored in the executors
> themselves in the absence of the external shuffle. The model works on the
> basis of the "one-container-per-Pod" model
>  meaning that for
> each node of the cluster there will be one node running the driver and each
> remaining node running one executor each.
>
>
>
> HTH
> ,
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: [Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-03 Thread German Schiavon
Hi,

Rohit, can you share how it looks using DSv2?

Thanks!

On Wed, 3 Nov 2021 at 19:35, huaxin gao  wrote:

> Great to hear. Thanks for testing this!
>
> On Wed, Nov 3, 2021 at 4:03 AM Kapoor, Rohit 
> wrote:
>
>> Thanks for your guidance Huaxin. I have been able to test the push down
>> operators successfully against Postgresql using DS v2.
>>
>>
>>
>>
>>
>> *From: *huaxin gao 
>> *Date: *Tuesday, 2 November 2021 at 12:35 AM
>> *To: *Kapoor, Rohit 
>> *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
>>
>>
>>
>> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
>> ALWAYS VERIFY THE SOURCE OF MESSAGES. *
>>
>>
>>
>> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
>> ALWAYS VERIFY THE SOURCE OF MESSAGES. *
>>
>> No need to write a customized data source reader. You may want to follow
>> the example here
>> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40
>> to use DS v2. The example uses h2 database. Please modify it to use
>> postgresql.
>>
>>
>>
>> Huaxin
>>
>>
>>
>>
>>
>> On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit 
>> wrote:
>>
>> Hi Huaxin,
>>
>>
>>
>> Thanks a lot for your response. Do I need to write a custom data source
>> reader (in my case, for PostgreSql) using the Spark DS v2 APIs, instead of
>> the standard spark.read.format(“jdbc”) ?
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Rohit
>>
>>
>>
>> *From: *huaxin gao 
>> *Date: *Monday, 1 November 2021 at 11:32 PM
>> *To: *Kapoor, Rohit 
>> *Cc: *user@spark.apache.org 
>> *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
>>
>> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
>> ALWAYS VERIFY THE SOURCE OF MESSAGES.*
>>
>> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
>> ALWAYS VERIFY THE SOURCE OF MESSAGES.*
>>
>> Hi Rohit,
>>
>>
>>
>> Thanks for testing this. Seems to me that you are using DS v1. We only
>> support aggregate push down in DS v2. Could you please try again using DS
>> v2 and let me know how it goes?
>>
>>
>>
>> Thanks,
>>
>> Huaxin
>>
>>
>>
>> On Mon, Nov 1, 2021 at 10:39 AM Chao Sun  wrote:
>>
>>
>>
>> -- Forwarded message -
>> From: *Kapoor, Rohit* 
>> Date: Mon, Nov 1, 2021 at 6:27 AM
>> Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
>> To: user@spark.apache.org 
>>
>>
>>
>> Hi,
>>
>>
>>
>> I am testing the aggregate push down for JDBC after going through the
>> JIRA - https://issues.apache.org/jira/browse/SPARK-34952
>>
>> I have the latest Spark 3.2 setup in local mode (laptop).
>>
>>
>>
>> I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate
>> query on “emp” table that has 102 rows and a simple schema with 3
>> columns (empid, ename and sal) as below:
>>
>>
>>
>> val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"
>>
>>
>>
>> val jdbcDF = spark.read
>>
>> .format("jdbc")
>>
>> .option("url", jdbcString)
>>
>> .option("dbtable", "emp")
>>
>> .option("pushDownAggregate","true")
>>
>> .option("user", "")
>>
>> .option("password", "")
>>
>> .load()
>>
>> .where("empid > 1")
>>
>> .agg(max("SAL")).alias("max_sal")
>>
>>
>>
>>
>>
>> The complete plan details are:
>>
>>
>>
>> == Parsed Logical Plan ==
>>
>> SubqueryAlias max_sal
>>
>> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>>
>>+- Filter (empid#0 > 1)
>>
>>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
>> [numPartitions=1]
>>
>>
>>
>> == Analyzed Logical Plan ==
>>
>> max(SAL): int
>>
>> SubqueryAlias max_sal
>>
>> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>>
>>+- Filter (empid#0 > 1)
>>
>>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
>> [numPartitions=1]
>>
>>
>>
>> == Optimized Logical Plan ==
>>
>> Aggregate [max(SAL#2) AS max(SAL)#10]
>>
>> +- Project [sal#2]
>>
>>+- Filter (isnotnull(empid#0) AND (empid#0 > 1))
>>
>>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
>> [numPartitions=1]
>>
>>
>>
>> == Physical Plan ==
>>
>> AdaptiveSparkPlan isFinalPlan=false
>>
>> +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
>>
>>+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
>>
>>   +- HashAggregate(keys=[], functions=[partial_max(SAL#2)],
>> output=[max#13])
>>
>>  +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] 
>> *PushedAggregates:
>> []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)],
>> PushedGroupby: [], ReadSchema: struct
>>
>>
>>
>>
>>
>> I also checked the sql submitted to the database, querying
>> pg_stat_statements, and it confirms that the aggregate was not pushed
>> down to the database. Here is the query submitted to the database:
>>
>>
>>
>> SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)
>>
>>
>>
>> All the rows are read and aggregated in the Spark layer.
>>
>>
>>
>> Is there any configuration I missing here? 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I’m pretty sure

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 20) (10.20.167.28 executor 
2): java.lang.OutOfMemoryError
at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
If you look at the «toPandas» you can see the exchange stage that doesn’t occur 
in the «collect» example. I suppose it tries to pull all the data to a single 
executor for some reason.

> 3 нояб. 2021 г., в 21:34, Sean Owen  написал(а):
> 
> No, you can collect() a DataFrame. You get Rows. collect() must create an 
> in-memory representation - how else could it work? Those aren't differences.
> Are you sure it's the executor that's OOM, not the driver? 
> 
> On Wed, Nov 3, 2021 at 1:32 PM Gourav Sengupta  > wrote:
> Hi,
> 
> I might be wrong but toPandas() works with dataframes, where as collect works 
> at RDD. Also toPandas() converts to Python objects in memory I do not think 
> that collect does it.
> 
> Regards,
> Gourav
> 
> On Wed, Nov 3, 2021 at 2:24 PM Sergey Ivanychev  > wrote:
> Hi, 
> 
> Spark 3.1.2 K8s.
> 
> I encountered OOM error while trying to create a Pandas DataFrame from Spark 
> DataFrame. My Spark driver has 60G of ram, but the executors are tiny 
> compared to that (8G)
> 
> If I do `spark.table(…).limit(100).collect()` I get the following plan
> 
> <2021-11-03_17-07-55.png>
> 
> 
> If I do `spark.table(…).limit(100).toPandas()` I get a more complicated 
> plan with an extra shuffle
> 
> <2021-11-03_17-08-31.png>
> 
> IIUC, in the `toPandas` case all the data gets shuffled to a single executor 
> that fails with OOM, which doesn’t happen in `collect` case. This does it 
> work like that? How do I collect a large dataset that fits into memory of the 
> driver?



Re: [Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-03 Thread huaxin gao
Great to hear. Thanks for testing this!

On Wed, Nov 3, 2021 at 4:03 AM Kapoor, Rohit 
wrote:

> Thanks for your guidance Huaxin. I have been able to test the push down
> operators successfully against Postgresql using DS v2.
>
>
>
>
>
> *From: *huaxin gao 
> *Date: *Tuesday, 2 November 2021 at 12:35 AM
> *To: *Kapoor, Rohit 
> *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
>
>
>
> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
> ALWAYS VERIFY THE SOURCE OF MESSAGES. *
>
>
>
> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
> ALWAYS VERIFY THE SOURCE OF MESSAGES. *
>
> No need to write a customized data source reader. You may want to follow
> the example here
> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40
> to use DS v2. The example uses h2 database. Please modify it to use
> postgresql.
>
>
>
> Huaxin
>
>
>
>
>
> On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit 
> wrote:
>
> Hi Huaxin,
>
>
>
> Thanks a lot for your response. Do I need to write a custom data source
> reader (in my case, for PostgreSql) using the Spark DS v2 APIs, instead of
> the standard spark.read.format(“jdbc”) ?
>
>
>
>
>
> Thanks,
>
> Rohit
>
>
>
> *From: *huaxin gao 
> *Date: *Monday, 1 November 2021 at 11:32 PM
> *To: *Kapoor, Rohit 
> *Cc: *user@spark.apache.org 
> *Subject: *Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
>
> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
> ALWAYS VERIFY THE SOURCE OF MESSAGES.*
>
> *EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS.
> ALWAYS VERIFY THE SOURCE OF MESSAGES.*
>
> Hi Rohit,
>
>
>
> Thanks for testing this. Seems to me that you are using DS v1. We only
> support aggregate push down in DS v2. Could you please try again using DS
> v2 and let me know how it goes?
>
>
>
> Thanks,
>
> Huaxin
>
>
>
> On Mon, Nov 1, 2021 at 10:39 AM Chao Sun  wrote:
>
>
>
> -- Forwarded message -
> From: *Kapoor, Rohit* 
> Date: Mon, Nov 1, 2021 at 6:27 AM
> Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
> To: user@spark.apache.org 
>
>
>
> Hi,
>
>
>
> I am testing the aggregate push down for JDBC after going through the JIRA
> - https://issues.apache.org/jira/browse/SPARK-34952
>
> I have the latest Spark 3.2 setup in local mode (laptop).
>
>
>
> I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate
> query on “emp” table that has 102 rows and a simple schema with 3
> columns (empid, ename and sal) as below:
>
>
>
> val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"
>
>
>
> val jdbcDF = spark.read
>
> .format("jdbc")
>
> .option("url", jdbcString)
>
> .option("dbtable", "emp")
>
> .option("pushDownAggregate","true")
>
> .option("user", "")
>
> .option("password", "")
>
> .load()
>
> .where("empid > 1")
>
> .agg(max("SAL")).alias("max_sal")
>
>
>
>
>
> The complete plan details are:
>
>
>
> == Parsed Logical Plan ==
>
> SubqueryAlias max_sal
>
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>
>+- Filter (empid#0 > 1)
>
>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Analyzed Logical Plan ==
>
> max(SAL): int
>
> SubqueryAlias max_sal
>
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>
>+- Filter (empid#0 > 1)
>
>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Optimized Logical Plan ==
>
> Aggregate [max(SAL#2) AS max(SAL)#10]
>
> +- Project [sal#2]
>
>+- Filter (isnotnull(empid#0) AND (empid#0 > 1))
>
>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=false
>
> +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
>
>+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
>
>   +- HashAggregate(keys=[], functions=[partial_max(SAL#2)],
> output=[max#13])
>
>  +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] 
> *PushedAggregates:
> []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)],
> PushedGroupby: [], ReadSchema: struct
>
>
>
>
>
> I also checked the sql submitted to the database, querying
> pg_stat_statements, and it confirms that the aggregate was not pushed
> down to the database. Here is the query submitted to the database:
>
>
>
> SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)
>
>
>
> All the rows are read and aggregated in the Spark layer.
>
>
>
> Is there any configuration I missing here? Why is aggregate push down not
> working for me?
>
> Any pointers would be greatly appreciated.
>
>
>
>
>
> Thanks,
>
> Rohit
> --
>
> Disclaimer: The information in this email is confidential and may be
> legally privileged. Access to this Internet email by anyone else other than
> the recipient is 

Re: [Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-03 Thread Kapoor, Rohit
Thanks for your guidance Huaxin. I have been able to test the push down 
operators successfully against Postgresql using DS v2.


From: huaxin gao 
Date: Tuesday, 2 November 2021 at 12:35 AM
To: Kapoor, Rohit 
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.

EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.

No need to write a customized data source reader. You may want to follow the 
example here 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40
 to use DS v2. The example uses h2 database. Please modify it to use postgresql.

Huaxin


On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>> wrote:
Hi Huaxin,

Thanks a lot for your response. Do I need to write a custom data source reader 
(in my case, for PostgreSql) using the Spark DS v2 APIs, instead of the 
standard spark.read.format(“jdbc”) ?


Thanks,
Rohit

From: huaxin gao mailto:huaxin.ga...@gmail.com>>
Date: Monday, 1 November 2021 at 11:32 PM
To: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Cc: user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
Hi Rohit,

Thanks for testing this. Seems to me that you are using DS v1. We only support 
aggregate push down in DS v2. Could you please try again using DS v2 and let me 
know how it goes?

Thanks,
Huaxin

On Mon, Nov 1, 2021 at 10:39 AM Chao Sun 
mailto:sunc...@apache.org>> wrote:

-- Forwarded message -
From: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Date: Mon, Nov 1, 2021 at 6:27 AM
Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
To: user@spark.apache.org 
mailto:user@spark.apache.org>>

Hi,

I am testing the aggregate push down for JDBC after going through the JIRA - 
https://issues.apache.org/jira/browse/SPARK-34952
I have the latest Spark 3.2 setup in local mode (laptop).

I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate query 
on “emp” table that has 102 rows and a simple schema with 3 columns (empid, 
ename and sal) as below:

val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcString)
.option("dbtable", "emp")
.option("pushDownAggregate","true")
.option("user", "")
.option("password", "")
.load()
.where("empid > 1")
.agg(max("SAL")).alias("max_sal")


The complete plan details are:

== Parsed Logical Plan ==
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Analyzed Logical Plan ==
max(SAL): int
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Optimized Logical Plan ==
Aggregate [max(SAL#2) AS max(SAL)#10]
+- Project [sal#2]
   +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
  +- HashAggregate(keys=[], functions=[partial_max(SAL#2)], output=[max#13])
 +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] PushedAggregates: 
[], PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)], PushedGroupby: 
[], ReadSchema: struct


I also checked the sql submitted to the database, querying pg_stat_statements, 
and it confirms that the aggregate was not pushed down to the database. Here is 
the query submitted to the database:

SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)

All the rows are read and aggregated in the Spark layer.

Is there any configuration I missing here? Why is aggregate push down not 
working for me?
Any pointers would be greatly appreciated.


Thanks,
Rohit


Disclaimer: The information in this email is confidential and may be legally 
privileged. Access to this Internet email by anyone else other than the 
recipient is unauthorized. Envestnet, Inc. and its affiliated companies do not