Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Shuporno Choudhury
 Hi,
Responding to your queries:
I am using Spark 2.2.1.I have tried with both dynamic resource allocation
turned on and off and have encountered the same behaviour.

The way data is being read is that filepaths (for each independent data
set) are passed to a method, then the method does the processing for those
particular files and writes the result. So, even that doesn't seem to
release memory.
There are multiple independent data sets (for which the method is called
sequentially).
While doing this, memory consumption just keeps stacking up.

You can replicate this behaviour in spark-shell (pyspark:
%SPARK_HOME%/bin/pyspark) by:
1. Creating a method that reads data from filepaths passed to it as
arguments and creates a dataframe on top of that
2. Doing some processing (filter etc) on that dataframe
3. Write the results to a target (can be passed to the method)
4. Try running this method again and again (either by providing different
target paths/deleting target folder before calling the method again) -> to
replicate behaviour of multiple datasets [OR you can provide different data
sets altogether for each run of the method]
You will notice that the memory consumption for that particular JVM started
by spark shell will continuously increase (observe from Task Manager).

Maybe, Jon is right. Probably I need to run different spark-submit for
different data sets (as they are completely independent).

Any other advice would also be really appreciated.

On Tue, 5 Jun 2018 at 10:46, Jörn Franke [via Apache Spark User List] <
ml+s1001560n3246...@n3.nabble.com> wrote:

> Additionally I meant with modularization that jobs that have really
> nothing to do with each other should be in separate python programs
>
> On 5. Jun 2018, at 04:50, Thakrar, Jayesh <[hidden email]
> > wrote:
>
> Disclaimer - I use Spark with Scala and not Python.
>
>
>
> But I am guessing that Jorn's reference to modularization is to ensure
> that you do the processing inside methods/functions and call those methods
> sequentially.
>
> I believe that as long as an RDD/dataset variable is in scope, its memory
> may not be getting released.
>
> By having functions, they will get out of scope and their memory can be
> released.
>
>
>
> Also, assuming that the variables are not daisy-chained/inter-related as
> that too will not make it easy.
>
>
>
>
>
> *From: *Jay <[hidden email]
> >
> *Date: *Monday, June 4, 2018 at 9:41 PM
> *To: *Shuporno Choudhury <[hidden email]
> >
> *Cc: *"Jörn Franke [via Apache Spark User List]" <[hidden email]
> >, <[hidden email]
> >
> *Subject: *Re: [PySpark] Releasing memory after a spark job is finished
>
>
>
> Can you tell us what version of Spark you are using and if Dynamic
> Allocation is enabled ?
>
>
>
> Also, how are the files being read ? Is it a single read of all files
> using a file matching regex or are you running different threads in the
> same pyspark job?
>
>
>
>
>
> On Mon 4 Jun, 2018, 1:27 PM Shuporno Choudhury, <[hidden email]
> > wrote:
>
> Thanks a lot for the insight.
>
> Actually I have the exact same transformations for all the datasets, hence
> only 1 python code.
>
> Now, do you suggest that I run different spark-submit for all the
> different datasets given that I have the exact same transformations?
>
>
>
> On Tue 5 Jun, 2018, 1:48 AM Jörn Franke [via Apache Spark User List], <[hidden
> email] > wrote:
>
> Yes if they are independent with different transformations then I would
> create a separate python program. Especially for big data processing
> frameworks one should avoid to put everything in one big monotholic
> applications.
>
>
>
>
> On 4. Jun 2018, at 22:02, Shuporno Choudhury <[hidden email]
> > wrote:
>
> Hi,
>
>
>
> Thanks for the input.
>
> I was trying to get the functionality first, hence I was using local mode.
> I will be running on a cluster definitely but later.
>
>
>
> Sorry for my naivety, but can you please elaborate on the modularity
> concept that you mentioned and how it will affect whatever I am already
> doing?
>
> Do you mean running a different spark-submit for each different dataset
> when you say 'an independent python program for each process '?
>
>
>
> On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] <[hidden
> email] > wrote:
>
> Why don’t you modularize your code and write for each process an
> independent python program that is submitted via Spark?
>
>
>
> Not sure though if Spark local make sense. If you don’t have a cluster
> then a normal python program can be much better.
>
>
> On 4. Jun 2018, at 21:37, Shuporno Choudhury 

Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Jörn Franke
Additionally I meant with modularization that jobs that have really nothing to 
do with each other should be in separate python programs

> On 5. Jun 2018, at 04:50, Thakrar, Jayesh  
> wrote:
> 
> Disclaimer - I use Spark with Scala and not Python.
>  
> But I am guessing that Jorn's reference to modularization is to ensure that 
> you do the processing inside methods/functions and call those methods 
> sequentially.
> I believe that as long as an RDD/dataset variable is in scope, its memory may 
> not be getting released.
> By having functions, they will get out of scope and their memory can be 
> released.
>  
> Also, assuming that the variables are not daisy-chained/inter-related as that 
> too will not make it easy.
>  
>  
> From: Jay 
> Date: Monday, June 4, 2018 at 9:41 PM
> To: Shuporno Choudhury 
> Cc: "Jörn Franke [via Apache Spark User List]" 
> , 
> Subject: Re: [PySpark] Releasing memory after a spark job is finished
>  
> Can you tell us what version of Spark you are using and if Dynamic Allocation 
> is enabled ? 
>  
> Also, how are the files being read ? Is it a single read of all files using a 
> file matching regex or are you running different threads in the same pyspark 
> job?
>  
>  
> 
> On Mon 4 Jun, 2018, 1:27 PM Shuporno Choudhury, 
>  wrote:
> Thanks a lot for the insight.
> Actually I have the exact same transformations for all the datasets, hence 
> only 1 python code.
> Now, do you suggest that I run different spark-submit for all the different 
> datasets given that I have the exact same transformations?
>  
> On Tue 5 Jun, 2018, 1:48 AM Jörn Franke [via Apache Spark User List], 
>  wrote:
> Yes if they are independent with different transformations then I would 
> create a separate python program. Especially for big data processing 
> frameworks one should avoid to put everything in one big monotholic 
> applications.
>  
> 
> On 4. Jun 2018, at 22:02, Shuporno Choudhury <[hidden email]> wrote:
> 
> Hi,
>  
> Thanks for the input.
> I was trying to get the functionality first, hence I was using local mode. I 
> will be running on a cluster definitely but later.
>  
> Sorry for my naivety, but can you please elaborate on the modularity concept 
> that you mentioned and how it will affect whatever I am already doing?
> Do you mean running a different spark-submit for each different dataset when 
> you say 'an independent python program for each process '?
>  
> On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] 
> <[hidden email]> wrote:
> Why don’t you modularize your code and write for each process an independent 
> python program that is submitted via Spark?
>  
> Not sure though if Spark local make sense. If you don’t have a cluster then a 
> normal python program can be much better.
> 
> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]> wrote:
> 
> Hi everyone,
> I am trying to run a pyspark code on some data sets sequentially [basically 
> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3. Write 
> modified data in parquet format to a target location]
> Now, while running this pyspark code across multiple independent data sets 
> sequentially, the memory usage from the previous data set doesn't seem to get 
> released/cleared and hence spark's memory consumption (JVM memory consumption 
> from Task Manager) keeps on increasing till it fails at some data set.
> So, is there a way to clear/remove dataframes that I know are not going to be 
> used later? 
> Basically, can I clear out some memory programmatically (in the pyspark code) 
> when processing for a particular data set ends?
> At no point, I am caching any dataframe (so unpersist() is also not a 
> solution).
>  
> I am running spark using local[*] as master. There is a single SparkSession 
> that is doing all the processing.
> If it is not possible to clear out memory, what can be a better approach for 
> this problem?
>  
> Can someone please help me with this and tell me if I am going wrong anywhere?
>  
> --Thanks,
> Shuporno Choudhury
>  
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
> To start a new topic under Apache Spark User List, email [hidden email]
> To unsubscribe from Apache Spark User List, click here.
> NAML
> 
>  
> --
> --Thanks,
> Shuporno Choudhury
>  
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32458.html
> To start a new topic under Apache Spark User List, email 
> ml+s1001560n1...@n3.nabble.com 
> To unsubscribe from Apache Spark User List, click here.
> NAML


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Thakrar, Jayesh
Disclaimer - I use Spark with Scala and not Python.

But I am guessing that Jorn's reference to modularization is to ensure that you 
do the processing inside methods/functions and call those methods sequentially.
I believe that as long as an RDD/dataset variable is in scope, its memory may 
not be getting released.
By having functions, they will get out of scope and their memory can be 
released.

Also, assuming that the variables are not daisy-chained/inter-related as that 
too will not make it easy.


From: Jay 
Date: Monday, June 4, 2018 at 9:41 PM
To: Shuporno Choudhury 
Cc: "Jörn Franke [via Apache Spark User List]" 
, 
Subject: Re: [PySpark] Releasing memory after a spark job is finished

Can you tell us what version of Spark you are using and if Dynamic Allocation 
is enabled ?

Also, how are the files being read ? Is it a single read of all files using a 
file matching regex or are you running different threads in the same pyspark 
job?


On Mon 4 Jun, 2018, 1:27 PM Shuporno Choudhury, 
mailto:shuporno.choudh...@gmail.com>> wrote:
Thanks a lot for the insight.
Actually I have the exact same transformations for all the datasets, hence only 
1 python code.
Now, do you suggest that I run different spark-submit for all the different 
datasets given that I have the exact same transformations?

On Tue 5 Jun, 2018, 1:48 AM Jörn Franke [via Apache Spark User List], 
mailto:ml%2bs1001560n32458...@n3.nabble.com>>
 wrote:
Yes if they are independent with different transformations then I would create 
a separate python program. Especially for big data processing frameworks one 
should avoid to put everything in one big monotholic applications.


On 4. Jun 2018, at 22:02, Shuporno Choudhury <[hidden 
email]> wrote:
Hi,

Thanks for the input.
I was trying to get the functionality first, hence I was using local mode. I 
will be running on a cluster definitely but later.

Sorry for my naivety, but can you please elaborate on the modularity concept 
that you mentioned and how it will affect whatever I am already doing?
Do you mean running a different spark-submit for each different dataset when 
you say 'an independent python program for each process '?

On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] <[hidden 
email]> wrote:
Why don’t you modularize your code and write for each process an independent 
python program that is submitted via Spark?

Not sure though if Spark local make sense. If you don’t have a cluster then a 
normal python program can be much better.

On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden 
email]> wrote:
Hi everyone,
I am trying to run a pyspark code on some data sets sequentially [basically 1. 
Read data into a dataframe 2.Perform some join/filter/aggregation 3. Write 
modified data in parquet format to a target location]
Now, while running this pyspark code across multiple independent data sets 
sequentially, the memory usage from the previous data set doesn't seem to get 
released/cleared and hence spark's memory consumption (JVM memory consumption 
from Task Manager) keeps on increasing till it fails at some data set.
So, is there a way to clear/remove dataframes that I know are not going to be 
used later?
Basically, can I clear out some memory programmatically (in the pyspark code) 
when processing for a particular data set ends?
At no point, I am caching any dataframe (so unpersist() is also not a solution).

I am running spark using local[*] as master. There is a single SparkSession 
that is doing all the processing.
If it is not possible to clear out memory, what can be a better approach for 
this problem?

Can someone please help me with this and tell me if I am going wrong anywhere?

--Thanks,
Shuporno Choudhury


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
To start a new topic under Apache Spark User List, email [hidden 
email]
To unsubscribe from Apache Spark User List, click here.
NAML


--
--Thanks,
Shuporno Choudhury


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32458.html
To start a new topic under Apache Spark User List, email 

Re: spark partitionBy with partitioned column in json output

2018-06-04 Thread Jay
The partitionBy clause is used to create hive folders so that you can point
a hive partitioned table on the data .

What are you using the partitionBy for ? What is the use case ?

On Mon 4 Jun, 2018, 4:59 PM purna pradeep,  wrote:

> im reading below json in spark
>
> {"bucket": "B01", "actionType": "A1", "preaction": "NULL",
> "postaction": "NULL"}
> {"bucket": "B02", "actionType": "A2", "preaction": "NULL",
> "postaction": "NULL"}
> {"bucket": "B03", "actionType": "A3", "preaction": "NULL",
> "postaction": "NULL"}
>
> val df=spark.read.json("actions.json").toDF()
>
> Now im writing the same to a json output as below
>
> df.write. format("json"). mode("append").
> partitionBy("bucket","actionType"). save("output.json")
>
>
> and the output.json is as below
>
> {"preaction":"NULL","postaction":"NULL"}
>
> bucket,actionType columns are missing in the json output, i need
> partitionby columns as well in the output
>
>


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Jay
Can you tell us what version of Spark you are using and if Dynamic
Allocation is enabled ?

Also, how are the files being read ? Is it a single read of all files using
a file matching regex or are you running different threads in the same
pyspark job?



On Mon 4 Jun, 2018, 1:27 PM Shuporno Choudhury, <
shuporno.choudh...@gmail.com> wrote:

> Thanks a lot for the insight.
> Actually I have the exact same transformations for all the datasets, hence
> only 1 python code.
> Now, do you suggest that I run different spark-submit for all the
> different datasets given that I have the exact same transformations?
>
> On Tue 5 Jun, 2018, 1:48 AM Jörn Franke [via Apache Spark User List], <
> ml+s1001560n32458...@n3.nabble.com> wrote:
>
>> Yes if they are independent with different transformations then I would
>> create a separate python program. Especially for big data processing
>> frameworks one should avoid to put everything in one big monotholic
>> applications.
>>
>>
>> On 4. Jun 2018, at 22:02, Shuporno Choudhury <[hidden email]
>> > wrote:
>>
>> Hi,
>>
>> Thanks for the input.
>> I was trying to get the functionality first, hence I was using local
>> mode. I will be running on a cluster definitely but later.
>>
>> Sorry for my naivety, but can you please elaborate on the modularity
>> concept that you mentioned and how it will affect whatever I am already
>> doing?
>> Do you mean running a different spark-submit for each different dataset
>> when you say 'an independent python program for each process '?
>>
>> On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] 
>> <[hidden
>> email] > wrote:
>>
>> Why don’t you modularize your code and write for each process an
>>> independent python program that is submitted via Spark?
>>>
>>> Not sure though if Spark local make sense. If you don’t have a cluster
>>> then a normal python program can be much better.
>>>
>>> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]
>>> > wrote:
>>>
>>> Hi everyone,
>>> I am trying to run a pyspark code on some data sets sequentially [basically
>>> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3.
>>> Write modified data in parquet format to a target location]
>>> Now, while running this pyspark code across *multiple independent data
>>> sets sequentially*, the memory usage from the previous data set doesn't
>>> seem to get released/cleared and hence spark's memory consumption (JVM
>>> memory consumption from Task Manager) keeps on increasing till it fails at
>>> some data set.
>>> So, is there a way to clear/remove dataframes that I know are not going
>>> to be used later?
>>> Basically, can I clear out some memory programmatically (in the pyspark
>>> code) when processing for a particular data set ends?
>>> At no point, I am caching any dataframe (so unpersist() is also not a
>>> solution).
>>>
>>> I am running spark using local[*] as master. There is a single
>>> SparkSession that is doing all the processing.
>>> If it is not possible to clear out memory, what can be a better approach
>>> for this problem?
>>>
>>> Can someone please help me with this and tell me if I am going wrong
>>> anywhere?
>>>
>>> --Thanks,
>>> Shuporno Choudhury
>>>
>>>
>>>
>>> --
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
>>>
>> To start a new topic under Apache Spark User List, email [hidden email]
>>> 
>>>
>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>> 
>>>
>>
>>
>> --
>> --Thanks,
>> Shuporno Choudhury
>>
>>
>>
>> --
>>
> If you reply to this email, your message will be added to the discussion
>> below:
>>
>
>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32458.html
>>
> To start a new topic under Apache Spark User List, email
>> ml+s1001560n1...@n3.nabble.com
>> To unsubscribe from Apache Spark User List, click here
>> 
>> .
>> NAML
>> 

Re: spark partitionBy with partitioned column in json output

2018-06-04 Thread Lalwani, Jayesh
Purna,

This behavior is by design. If you provide partitionBy, Spark removes the 
columns from the data

From: purna pradeep 
Date: Monday, June 4, 2018 at 8:00 PM
To: "user@spark.apache.org" 
Subject: spark partitionBy with partitioned column in json output

im reading below json in spark

{"bucket": "B01", "actionType": "A1", "preaction": "NULL", "postaction": 
"NULL"}
{"bucket": "B02", "actionType": "A2", "preaction": "NULL", "postaction": 
"NULL"}
{"bucket": "B03", "actionType": "A3", "preaction": "NULL", "postaction": 
"NULL"}

val df=spark.read.json("actions.json").toDF()

Now im writing the same to a json output as below

df.write. format("json"). mode("append"). 
partitionBy("bucket","actionType"). save("output.json")


and the output.json is as below

{"preaction":"NULL","postaction":"NULL"}

bucket,actionType columns are missing in the json output, i need partitionby 
columns as well in the output



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


spark partitionBy with partitioned column in json output

2018-06-04 Thread purna pradeep
im reading below json in spark

{"bucket": "B01", "actionType": "A1", "preaction": "NULL",
"postaction": "NULL"}
{"bucket": "B02", "actionType": "A2", "preaction": "NULL",
"postaction": "NULL"}
{"bucket": "B03", "actionType": "A3", "preaction": "NULL",
"postaction": "NULL"}

val df=spark.read.json("actions.json").toDF()

Now im writing the same to a json output as below

df.write. format("json"). mode("append").
partitionBy("bucket","actionType"). save("output.json")


and the output.json is as below

{"preaction":"NULL","postaction":"NULL"}

bucket,actionType columns are missing in the json output, i need
partitionby columns as well in the output


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Shuporno Choudhury
Thanks a lot for the insight.
Actually I have the exact same transformations for all the datasets, hence
only 1 python code.
Now, do you suggest that I run different spark-submit for all the different
datasets given that I have the exact same transformations?

On Tue 5 Jun, 2018, 1:48 AM Jörn Franke [via Apache Spark User List], <
ml+s1001560n32458...@n3.nabble.com> wrote:

> Yes if they are independent with different transformations then I would
> create a separate python program. Especially for big data processing
> frameworks one should avoid to put everything in one big monotholic
> applications.
>
>
> On 4. Jun 2018, at 22:02, Shuporno Choudhury <[hidden email]
> > wrote:
>
> Hi,
>
> Thanks for the input.
> I was trying to get the functionality first, hence I was using local mode.
> I will be running on a cluster definitely but later.
>
> Sorry for my naivety, but can you please elaborate on the modularity
> concept that you mentioned and how it will affect whatever I am already
> doing?
> Do you mean running a different spark-submit for each different dataset
> when you say 'an independent python program for each process '?
>
> On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] <[hidden
> email] > wrote:
>
>> Why don’t you modularize your code and write for each process an
>> independent python program that is submitted via Spark?
>>
>> Not sure though if Spark local make sense. If you don’t have a cluster
>> then a normal python program can be much better.
>>
>> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]
>> > wrote:
>>
>> Hi everyone,
>> I am trying to run a pyspark code on some data sets sequentially [basically
>> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3.
>> Write modified data in parquet format to a target location]
>> Now, while running this pyspark code across *multiple independent data
>> sets sequentially*, the memory usage from the previous data set doesn't
>> seem to get released/cleared and hence spark's memory consumption (JVM
>> memory consumption from Task Manager) keeps on increasing till it fails at
>> some data set.
>> So, is there a way to clear/remove dataframes that I know are not going
>> to be used later?
>> Basically, can I clear out some memory programmatically (in the pyspark
>> code) when processing for a particular data set ends?
>> At no point, I am caching any dataframe (so unpersist() is also not a
>> solution).
>>
>> I am running spark using local[*] as master. There is a single
>> SparkSession that is doing all the processing.
>> If it is not possible to clear out memory, what can be a better approach
>> for this problem?
>>
>> Can someone please help me with this and tell me if I am going wrong
>> anywhere?
>>
>> --Thanks,
>> Shuporno Choudhury
>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> 
>>
>
>
> --
> --Thanks,
> Shuporno Choudhury
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32458.html
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Jörn Franke
Yes if they are independent with different transformations then I would create 
a separate python program. Especially for big data processing frameworks one 
should avoid to put everything in one big monotholic applications.


> On 4. Jun 2018, at 22:02, Shuporno Choudhury  
> wrote:
> 
> Hi,
> 
> Thanks for the input.
> I was trying to get the functionality first, hence I was using local mode. I 
> will be running on a cluster definitely but later.
> 
> Sorry for my naivety, but can you please elaborate on the modularity concept 
> that you mentioned and how it will affect whatever I am already doing?
> Do you mean running a different spark-submit for each different dataset when 
> you say 'an independent python program for each process '?
> 
>> On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] 
>>  wrote:
>> Why don’t you modularize your code and write for each process an independent 
>> python program that is submitted via Spark?
>> 
>> Not sure though if Spark local make sense. If you don’t have a cluster then 
>> a normal python program can be much better.
>> 
>>> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]> wrote:
>>> 
>>> Hi everyone,
>>> I am trying to run a pyspark code on some data sets sequentially [basically 
>>> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3. 
>>> Write modified data in parquet format to a target location]
>>> Now, while running this pyspark code across multiple independent data sets 
>>> sequentially, the memory usage from the previous data set doesn't seem to 
>>> get released/cleared and hence spark's memory consumption (JVM memory 
>>> consumption from Task Manager) keeps on increasing till it fails at some 
>>> data set.
>>> So, is there a way to clear/remove dataframes that I know are not going to 
>>> be used later? 
>>> Basically, can I clear out some memory programmatically (in the pyspark 
>>> code) when processing for a particular data set ends?
>>> At no point, I am caching any dataframe (so unpersist() is also not a 
>>> solution).
>>> 
>>> I am running spark using local[*] as master. There is a single SparkSession 
>>> that is doing all the processing.
>>> If it is not possible to clear out memory, what can be a better approach 
>>> for this problem?
>>> 
>>> Can someone please help me with this and tell me if I am going wrong 
>>> anywhere?
>>> 
>>> --Thanks,
>>> Shuporno Choudhury
>> 
>> 
>> If you reply to this email, your message will be added to the discussion 
>> below:
>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
>> To start a new topic under Apache Spark User List, email 
>> ml+s1001560n1...@n3.nabble.com 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
> 
> 
> -- 
> --Thanks,
> Shuporno Choudhury


Apply Core Java Transformation UDF on DataFrame

2018-06-04 Thread Chetan Khatri
All,

I would like to Apply Java Transformation UDF on DataFrame created from
Table, Flat Files and retrun new Data Frame Object. Any suggestions, with
respect to Spark Internals.

Thanks.


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Shuporno Choudhury
Hi,

Thanks for the input.
I was trying to get the functionality first, hence I was using local mode.
I will be running on a cluster definitely but later.

Sorry for my naivety, but can you please elaborate on the modularity
concept that you mentioned and how it will affect whatever I am already
doing?
Do you mean running a different spark-submit for each different dataset
when you say 'an independent python program for each process '?

On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] <
ml+s1001560n32455...@n3.nabble.com> wrote:

> Why don’t you modularize your code and write for each process an
> independent python program that is submitted via Spark?
>
> Not sure though if Spark local make sense. If you don’t have a cluster
> then a normal python program can be much better.
>
> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]
> > wrote:
>
> Hi everyone,
> I am trying to run a pyspark code on some data sets sequentially [basically
> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3.
> Write modified data in parquet format to a target location]
> Now, while running this pyspark code across *multiple independent data
> sets sequentially*, the memory usage from the previous data set doesn't
> seem to get released/cleared and hence spark's memory consumption (JVM
> memory consumption from Task Manager) keeps on increasing till it fails at
> some data set.
> So, is there a way to clear/remove dataframes that I know are not going to
> be used later?
> Basically, can I clear out some memory programmatically (in the pyspark
> code) when processing for a particular data set ends?
> At no point, I am caching any dataframe (so unpersist() is also not a
> solution).
>
> I am running spark using local[*] as master. There is a single
> SparkSession that is doing all the processing.
> If it is not possible to clear out memory, what can be a better approach
> for this problem?
>
> Can someone please help me with this and tell me if I am going wrong
> anywhere?
>
> --Thanks,
> Shuporno Choudhury
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>


-- 
--Thanks,
Shuporno Choudhury


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Jörn Franke
Why don’t you modularize your code and write for each process an independent 
python program that is submitted via Spark?

Not sure though if Spark local make sense. If you don’t have a cluster then a 
normal python program can be much better.

> On 4. Jun 2018, at 21:37, Shuporno Choudhury  
> wrote:
> 
> Hi everyone,
> I am trying to run a pyspark code on some data sets sequentially [basically 
> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3. Write 
> modified data in parquet format to a target location]
> Now, while running this pyspark code across multiple independent data sets 
> sequentially, the memory usage from the previous data set doesn't seem to get 
> released/cleared and hence spark's memory consumption (JVM memory consumption 
> from Task Manager) keeps on increasing till it fails at some data set.
> So, is there a way to clear/remove dataframes that I know are not going to be 
> used later? 
> Basically, can I clear out some memory programmatically (in the pyspark code) 
> when processing for a particular data set ends?
> At no point, I am caching any dataframe (so unpersist() is also not a 
> solution).
> 
> I am running spark using local[*] as master. There is a single SparkSession 
> that is doing all the processing.
> If it is not possible to clear out memory, what can be a better approach for 
> this problem?
> 
> Can someone please help me with this and tell me if I am going wrong anywhere?
> 
> --Thanks,
> Shuporno Choudhury


[PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Shuporno Choudhury
Hi everyone,
I am trying to run a pyspark code on some data sets sequentially [basically
1. Read data into a dataframe 2.Perform some join/filter/aggregation 3.
Write modified data in parquet format to a target location]
Now, while running this pyspark code across *multiple independent data sets
sequentially*, the memory usage from the previous data set doesn't seem to
get released/cleared and hence spark's memory consumption (JVM memory
consumption from Task Manager) keeps on increasing till it fails at some
data set.
So, is there a way to clear/remove dataframes that I know are not going to
be used later?
Basically, can I clear out some memory programmatically (in the pyspark
code) when processing for a particular data set ends?
At no point, I am caching any dataframe (so unpersist() is also not a
solution).

I am running spark using local[*] as master. There is a single SparkSession
that is doing all the processing.
If it is not possible to clear out memory, what can be a better approach
for this problem?

Can someone please help me with this and tell me if I am going wrong
anywhere?

--Thanks,
Shuporno Choudhury


A code example of Catalyst optimization

2018-06-04 Thread Jean Georges Perrin
Hi there,

I am looking for an example of optimization through Catalyst, that you can 
demonstrate via code. Typically, you load some data in a dataframe, you do 
something, you do the opposite operation, and, when you collect, it’s super 
fast because nothing really happened to the data. Hopefully, my request is 
clear enough - I’d like to use that in teaching when explaining the laziness of 
Spark.

Does anyone has that in his labs?

tia,

jg


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [External] Re: Sorting in Spark on multiple partitions

2018-06-04 Thread Jörn Franke
I think also there is a misunderstanding how repartition works. It keeps the 
existing number of partitions, but hash partitions according to userid. Means 
in each partition it is likely to have different user ids.
 
That would also explain your observed behavior. However without having the full 
source code these are just assumptions.

> On 4. Jun 2018, at 17:33, Jain, Neha T.  wrote:
> 
> Hi Jorn,
>  
> I tried removing userid from my sort clause but still the same issue- data 
> not sorted.
>  
> var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
>  
> I am checking the sorting results  by temporary writing this file to Hive as 
> well as HDFS. Now, when I see the user wise data it is not sorted.
> Attaching the output file for your reference.
>  
> On the basis of sorting within userid partitions, I want to add a flag which 
> marks first item in the partition as true other items in that partition as 
> false.
> If my sorting order is disturbed, the flag is wrongly set.
>  
> Please suggest what else could be done to fix this very basic scenario of 
> sorting in Spark across multiple partitions across multiple nodes.
>  
> Thanks & Regards,
> Neha Jain
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Monday, June 4, 2018 10:48 AM
> To: Sing, Jasbir 
> Cc: user@spark.apache.org; Patel, Payal ; Jain, 
> Neha T. 
> Subject: [External] Re: Sorting in Spark on multiple partitions
>  
> You partition by userid, why do you then sort again by userid in the 
> partition? Can you try to remove userid from the sort? 
>  
> How do you check if the sort is correct or not?
>  
> What is the underlying objective of the sort? Do you have more information on 
> schema and data?
> 
> On 4. Jun 2018, at 05:47, Sing, Jasbir  wrote:
> 
> Hi Team,
>  
> We are currently using Spark 2.2.0 and facing some challenges in sorting of 
> data on multiple partitions.
> We have tried below approaches:
>  
> Spark SQL approach:
> a.  var query = "select * from data distribute by " + userid + " sort by 
> " + userid + ", " + time “
>  
> This query returns correct results in Hive but not in Spark SQL.  
> var newDf = data.repartition(col(userud)).orderBy(userid, time)
> var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)
>  
>  
> But none of the above approach is giving correct results for sorting of data.
> Please suggest what could be done for the same.
>  
> Thanks & Regards,
> Neha Jain
>  
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy. 
> __
> 
> www.accenture.com
> 


Re: [External] Re: Sorting in Spark on multiple partitions

2018-06-04 Thread Jörn Franke
How do you load the data? How do you write it?
I fear without a full source code it will be difficult to troubleshoot the 
issue.

Which Spark version?

Use case is not yet 100% clear to me. You want to set the row with the 
oldest/newest date to true? I would just use top or something similar when 
processing the data.


> On 4. Jun 2018, at 17:33, Jain, Neha T.  wrote:
> 
> Hi Jorn,
>  
> I tried removing userid from my sort clause but still the same issue- data 
> not sorted.
>  
> var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
>  
> I am checking the sorting results  by temporary writing this file to Hive as 
> well as HDFS. Now, when I see the user wise data it is not sorted.
> Attaching the output file for your reference.
>  
> On the basis of sorting within userid partitions, I want to add a flag which 
> marks first item in the partition as true other items in that partition as 
> false.
> If my sorting order is disturbed, the flag is wrongly set.
>  
> Please suggest what else could be done to fix this very basic scenario of 
> sorting in Spark across multiple partitions across multiple nodes.
>  
> Thanks & Regards,
> Neha Jain
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Monday, June 4, 2018 10:48 AM
> To: Sing, Jasbir 
> Cc: user@spark.apache.org; Patel, Payal ; Jain, 
> Neha T. 
> Subject: [External] Re: Sorting in Spark on multiple partitions
>  
> You partition by userid, why do you then sort again by userid in the 
> partition? Can you try to remove userid from the sort? 
>  
> How do you check if the sort is correct or not?
>  
> What is the underlying objective of the sort? Do you have more information on 
> schema and data?
> 
> On 4. Jun 2018, at 05:47, Sing, Jasbir  wrote:
> 
> Hi Team,
>  
> We are currently using Spark 2.2.0 and facing some challenges in sorting of 
> data on multiple partitions.
> We have tried below approaches:
>  
> Spark SQL approach:
> a.  var query = "select * from data distribute by " + userid + " sort by 
> " + userid + ", " + time “
>  
> This query returns correct results in Hive but not in Spark SQL.  
> var newDf = data.repartition(col(userud)).orderBy(userid, time)
> var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)
>  
>  
> But none of the above approach is giving correct results for sorting of data.
> Please suggest what could be done for the same.
>  
> Thanks & Regards,
> Neha Jain
>  
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy. 
> __
> 
> www.accenture.com
> 


is there a way to create a static dataframe inside mapGroups?

2018-06-04 Thread kant kodali
Hi All,

Is there a way to create a static dataframe inside mapGroups? given that
mapGroups gives Iterator of rows. I just want to take that iterator and
populate a static dataframe so I can run raw sql queries on the static
dataframe.

Thanks!


Re: [Spark SQL] error in performing dataset union with complex data type (struct, list)

2018-06-04 Thread Pranav Agrawal
yes, issue is with array type only, I have confirmed that.
I exploded array to struct but still getting the same error,


*Exception in thread "main" org.apache.spark.sql.AnalysisException: Union
can only be performed on tables with the compatible column types.
struct
<>
struct
at the 21th column of the second table;;*

On Mon, Jun 4, 2018 at 2:55 PM, Jorge Machado  wrote:

> Have you tryed to narrow down the problem so that we can be 100% sure that
> it lies on the array types ? Just exclude them for sake of testing.
> If we know 100% that it is on this array stuff try to explode that columns
> into simple types.
>
> Jorge Machado
>
>
>
>
>
>
> On 4 Jun 2018, at 11:09, Pranav Agrawal  wrote:
>
> I am ordering the columns before doing union, so I think that should not
> be an issue,
>
>
>
>
>
>
>
>
>
>
> * String[] columns_original_order = baseDs.columns();
> String[] columns = baseDs.columns();Arrays.sort(columns);
> baseDs=baseDs.selectExpr(columns);
> incDsForPartition=incDsForPartition.selectExpr(columns);if
> (baseDs.count() > 0) {return
> baseDs.union(incDsForPartition).selectExpr(columns_original_order);
> } else {return
> incDsForPartition.selectExpr(columns_original_order);*
>
>
> On Mon, Jun 4, 2018 at 2:31 PM, Jorge Machado  wrote:
>
>> Try the same union with a dataframe without the arrays types. Could be
>> something strange there like ordering or so.
>>
>> Jorge Machado
>>
>>
>>
>>
>>
>> On 4 Jun 2018, at 10:17, Pranav Agrawal  wrote:
>>
>> schema is exactly the same, not sure why it is failing though.
>>
>> root
>>  |-- booking_id: integer (nullable = true)
>>  |-- booking_rooms_room_category_id: integer (nullable = true)
>>  |-- booking_rooms_room_id: integer (nullable = true)
>>  |-- booking_source: integer (nullable = true)
>>  |-- booking_status: integer (nullable = true)
>>  |-- cancellation_reason: integer (nullable = true)
>>  |-- checkin: string (nullable = true)
>>  |-- checkout: string (nullable = true)
>>  |-- city_id: integer (nullable = true)
>>  |-- cluster_id: integer (nullable = true)
>>  |-- company_id: integer (nullable = true)
>>  |-- created_at: string (nullable = true)
>>  |-- discount: integer (nullable = true)
>>  |-- feedback_created_at: string (nullable = true)
>>  |-- feedback_id: integer (nullable = true)
>>  |-- hotel_id: integer (nullable = true)
>>  |-- hub_id: integer (nullable = true)
>>  |-- month: integer (nullable = true)
>>  |-- no_show_reason: integer (nullable = true)
>>  |-- oyo_rooms: integer (nullable = true)
>>  |-- selling_amount: integer (nullable = true)
>>  |-- shifting: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- id: integer (nullable = true)
>>  |||-- booking_id: integer (nullable = true)
>>  |||-- shifting_status: integer (nullable = true)
>>  |||-- shifting_reason: integer (nullable = true)
>>  |||-- shifting_metadata: integer (nullable = true)
>>  |-- suggest_oyo: integer (nullable = true)
>>  |-- tickets: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- ticket_source: integer (nullable = true)
>>  |||-- ticket_status: string (nullable = true)
>>  |||-- ticket_instance_source: integer (nullable = true)
>>  |||-- ticket_category: string (nullable = true)
>>  |-- updated_at: timestamp (nullable = true)
>>  |-- year: integer (nullable = true)
>>  |-- zone_id: integer (nullable = true)
>>
>> root
>>  |-- booking_id: integer (nullable = true)
>>  |-- booking_rooms_room_category_id: integer (nullable = true)
>>  |-- booking_rooms_room_id: integer (nullable = true)
>>  |-- booking_source: integer (nullable = true)
>>  |-- booking_status: integer (nullable = true)
>>  |-- cancellation_reason: integer (nullable = true)
>>  |-- checkin: string (nullable = true)
>>  |-- checkout: string (nullable = true)
>>  |-- city_id: integer (nullable = true)
>>  |-- cluster_id: integer (nullable = true)
>>  |-- company_id: integer (nullable = true)
>>  |-- created_at: string (nullable = true)
>>  |-- discount: integer (nullable = true)
>>  |-- feedback_created_at: string (nullable = true)
>>  |-- feedback_id: integer (nullable = true)
>>  |-- hotel_id: integer (nullable = true)
>>  |-- hub_id: integer (nullable = true)
>>  |-- month: integer (nullable = true)
>>  |-- no_show_reason: integer (nullable = true)
>>  |-- oyo_rooms: integer (nullable = true)
>>  |-- selling_amount: integer (nullable = true)
>>  |-- shifting: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- id: integer (nullable = true)
>>  |||-- booking_id: integer (nullable = true)
>>  |||-- shifting_status: integer (nullable = true)
>>  |||-- shifting_reason: integer (nullable = true)
>>  |||-- shifting_metadata: integer (nullable = true)
>>  |-- suggest_oyo: integer (nullable = true)
>>  |-- tickets: array (nullable = true)
>>  |   

Re: Spark structured streaming generate output path runtime

2018-06-04 Thread Swapnil Chougule
Thanks Jayesh.
It worked for me.

~Swapnil

On Fri, 1 Jun 2018, 7:10 pm Lalwani, Jayesh, 
wrote:

> This will not work the way you have implemented it. The code that you have
> here will be called only once before the streaming query is started. Once
> the streaming query starts, this code is not called
>
>
>
> What I would do is
>
>1. Implement a udf that calculates flourtimestamp
>2. Add a column in df2 called “Flourtimestamp”, and populate it with
>the flour timestamp using udf.
>3. Write the stream using
>
> Df2.writeStream.format(“text”).partitionBy(“flourtimestamp”).option(“path”,
> “/home/data”).option("checkpointLocation","./checkpoint").start()
>
>
>
> The UDF will be called for every row. And partitionBy will create a folder
> within /home/data
>
>
>
> *From: *Swapnil Chougule 
> *Date: *Friday, June 1, 2018 at 6:21 AM
> *To: *user 
> *Subject: *Spark structured streaming generate output path runtime
>
>
>
> Hi
>
>
>
> I want to generate output directory runtime for data. Directory name is
> derived from current timestamp.
>
> Lets say, data for same minute should go into same directory.
>
>
>
> I tried following snippet but it didn't work. All data is being written in
> same directory (created with respect to initial timestamp)
>
>
>
> val query = df2.writeStream
>   .format("text")
>   .option("path", "/home/data/"+getFlourTimestamp())
>   .option("checkpointLocation","./checkpoint")
>   .start()
>
>   query.awaitTermination()
>
>
>
>   def getFlourTimestamp(): Long ={
> var curTime = System.currentTimeMillis()
> curTime - (curTime % 5000)
>   }
>
>
>
>
>
> In other words, I want getFlourTimestamp() to be executed after every
> batch.
>
>
>
> Any help around this will be really appreciated.
>
>
>
> Thanks,
>
> Swapnil
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: testing frameworks

2018-06-04 Thread Spico Florin
Hello!
  Thank you very much for your helpful answer and for the very good job
performed in spark-testing-base . I managed to perform unit testing with
spark-testing-base library as the provided article and also get inspired
from

https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/java/com/holdenkarau/spark/testing/SampleJavaRDDTest.java
.


I had some concerns regarding on how to deal with compairing the RDDs that
come from Dataframe and the one that come from jsc().parallelize method.

My workflow tests is as follow:
1. Get the data from a parquet file as dataframe
2. Convert dataframe  to toJavaRDD()
3. perform some mapping on the JavaRdd
4. Check whether the resulted mapped rdd  is equal with the expected one
(retrieved from a text file)

I performed the above test with following code snippet

 JavaRDD expected = jsc().parallelize(input_from_text_file);
SparkSession spark = SparkSession.builder().getOrCreate();

JavaRDD input =

spark.read().parquet("src/test/resources/test_data.parquet").toJavaRDD();

JavaRDD result = MyDriver.convertToMyCustomerData(input);
 JavaRDDComparisons.assertRDDEquals(expected, result);

The above tests failed failed, even through the data is the same. By
debugging the code, I observed that the data from that came from the
DataFrame didn't have the same order as the one that came from
jsc().parallelize(text_file).

So, I suppose that the issue came from the fact that the SparkSession and
jsc() don't share the same SparkContext (there is a warning about this when
running the program).

Therefore I came to the solution, to use the same jsc for both of the
expected and the result. With this solution the assertion succeeded as
expected.

  List df
=spark.read().parquet("src/test/resources/test_data.parquet").toJavaRDD().collect();
JavaRDD input = jsc().parallelize(df);

JavaRDD result = MyDriver.convertToMyCustomerData(input);
 JavaRDDComparisons.assertRDDEquals(expected, result);


My questions are:
1. what is the best solution to deal with RDDs comparison  when the RDDs
are built from Dataframes and when they are tested with RDDs obtained via
jsc().parallelize()?
2. Is the above solution a suitable one?

I look forward for your answers.

Regards,
  Florin







On Wed, May 30, 2018 at 3:11 PM, Holden Karau  wrote:

> So Jessie has an excellent blog post on how to use it with Java
> applications -
> http://www.jesse-anderson.com/2016/04/unit-testing-spark-with-java/
>
> On Wed, May 30, 2018 at 4:14 AM Spico Florin 
> wrote:
>
>> Hello!
>>   I'm also looking for unit testing spark Java application. I've seen the
>> great work done in  spark-testing-base but it seemed to me that I could
>> not use for Spark Java applications.
>> Only spark scala applications are supported?
>> Thanks.
>> Regards,
>>  Florin
>>
>> On Wed, May 23, 2018 at 8:07 AM, umargeek 
>> wrote:
>>
>>> Hi Steve,
>>>
>>> you can try out pytest-spark plugin if your writing programs using
>>> pyspark
>>> ,please find below link for reference.
>>>
>>> https://github.com/malexer/pytest-spark
>>> 
>>>
>>> Thanks,
>>> Umar
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
>


Re: [Spark SQL] error in performing dataset union with complex data type (struct, list)

2018-06-04 Thread Jorge Machado
Have you tryed to narrow down the problem so that we can be 100% sure that it 
lies on the array types ? Just exclude them for sake of testing. 
If we know 100% that it is on this array stuff try to explode that columns into 
simple types.

Jorge Machado






> On 4 Jun 2018, at 11:09, Pranav Agrawal  wrote:
> 
> I am ordering the columns before doing union, so I think that should not be 
> an issue,
> 
> String[] columns_original_order = baseDs.columns();
> String[] columns = baseDs.columns();
> Arrays.sort(columns);
> baseDs=baseDs.selectExpr(columns);
> incDsForPartition=incDsForPartition.selectExpr(columns);
> 
> if (baseDs.count() > 0) {
> return 
> baseDs.union(incDsForPartition).selectExpr(columns_original_order);
> } else {
> return incDsForPartition.selectExpr(columns_original_order);
> 
> 
> On Mon, Jun 4, 2018 at 2:31 PM, Jorge Machado  > wrote:
> Try the same union with a dataframe without the arrays types. Could be 
> something strange there like ordering or so.
> 
> Jorge Machado
> 
> 
> 
> 
> 
>> On 4 Jun 2018, at 10:17, Pranav Agrawal > > wrote:
>> 
>> schema is exactly the same, not sure why it is failing though.
>> 
>> root
>>  |-- booking_id: integer (nullable = true)
>>  |-- booking_rooms_room_category_id: integer (nullable = true)
>>  |-- booking_rooms_room_id: integer (nullable = true)
>>  |-- booking_source: integer (nullable = true)
>>  |-- booking_status: integer (nullable = true)
>>  |-- cancellation_reason: integer (nullable = true)
>>  |-- checkin: string (nullable = true)
>>  |-- checkout: string (nullable = true)
>>  |-- city_id: integer (nullable = true)
>>  |-- cluster_id: integer (nullable = true)
>>  |-- company_id: integer (nullable = true)
>>  |-- created_at: string (nullable = true)
>>  |-- discount: integer (nullable = true)
>>  |-- feedback_created_at: string (nullable = true)
>>  |-- feedback_id: integer (nullable = true)
>>  |-- hotel_id: integer (nullable = true)
>>  |-- hub_id: integer (nullable = true)
>>  |-- month: integer (nullable = true)
>>  |-- no_show_reason: integer (nullable = true)
>>  |-- oyo_rooms: integer (nullable = true)
>>  |-- selling_amount: integer (nullable = true)
>>  |-- shifting: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- id: integer (nullable = true)
>>  |||-- booking_id: integer (nullable = true)
>>  |||-- shifting_status: integer (nullable = true)
>>  |||-- shifting_reason: integer (nullable = true)
>>  |||-- shifting_metadata: integer (nullable = true)
>>  |-- suggest_oyo: integer (nullable = true)
>>  |-- tickets: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- ticket_source: integer (nullable = true)
>>  |||-- ticket_status: string (nullable = true)
>>  |||-- ticket_instance_source: integer (nullable = true)
>>  |||-- ticket_category: string (nullable = true)
>>  |-- updated_at: timestamp (nullable = true)
>>  |-- year: integer (nullable = true)
>>  |-- zone_id: integer (nullable = true)
>> 
>> root
>>  |-- booking_id: integer (nullable = true)
>>  |-- booking_rooms_room_category_id: integer (nullable = true)
>>  |-- booking_rooms_room_id: integer (nullable = true)
>>  |-- booking_source: integer (nullable = true)
>>  |-- booking_status: integer (nullable = true)
>>  |-- cancellation_reason: integer (nullable = true)
>>  |-- checkin: string (nullable = true)
>>  |-- checkout: string (nullable = true)
>>  |-- city_id: integer (nullable = true)
>>  |-- cluster_id: integer (nullable = true)
>>  |-- company_id: integer (nullable = true)
>>  |-- created_at: string (nullable = true)
>>  |-- discount: integer (nullable = true)
>>  |-- feedback_created_at: string (nullable = true)
>>  |-- feedback_id: integer (nullable = true)
>>  |-- hotel_id: integer (nullable = true)
>>  |-- hub_id: integer (nullable = true)
>>  |-- month: integer (nullable = true)
>>  |-- no_show_reason: integer (nullable = true)
>>  |-- oyo_rooms: integer (nullable = true)
>>  |-- selling_amount: integer (nullable = true)
>>  |-- shifting: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- id: integer (nullable = true)
>>  |||-- booking_id: integer (nullable = true)
>>  |||-- shifting_status: integer (nullable = true)
>>  |||-- shifting_reason: integer (nullable = true)
>>  |||-- shifting_metadata: integer (nullable = true)
>>  |-- suggest_oyo: integer (nullable = true)
>>  |-- tickets: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- ticket_source: integer (nullable = true)
>>  |||-- ticket_status: string (nullable = true)
>>  |||-- ticket_instance_source: integer (nullable = true)
>>  |||-- ticket_category: string (nullable = true)
>>  |-- 

Re: [Spark SQL] error in performing dataset union with complex data type (struct, list)

2018-06-04 Thread Pranav Agrawal
I am ordering the columns before doing union, so I think that should not be
an issue,










* String[] columns_original_order = baseDs.columns();
String[] columns = baseDs.columns();Arrays.sort(columns);
baseDs=baseDs.selectExpr(columns);
incDsForPartition=incDsForPartition.selectExpr(columns);if
(baseDs.count() > 0) {return
baseDs.union(incDsForPartition).selectExpr(columns_original_order);
} else {return
incDsForPartition.selectExpr(columns_original_order);*


On Mon, Jun 4, 2018 at 2:31 PM, Jorge Machado  wrote:

> Try the same union with a dataframe without the arrays types. Could be
> something strange there like ordering or so.
>
> Jorge Machado
>
>
>
>
>
> On 4 Jun 2018, at 10:17, Pranav Agrawal  wrote:
>
> schema is exactly the same, not sure why it is failing though.
>
> root
>  |-- booking_id: integer (nullable = true)
>  |-- booking_rooms_room_category_id: integer (nullable = true)
>  |-- booking_rooms_room_id: integer (nullable = true)
>  |-- booking_source: integer (nullable = true)
>  |-- booking_status: integer (nullable = true)
>  |-- cancellation_reason: integer (nullable = true)
>  |-- checkin: string (nullable = true)
>  |-- checkout: string (nullable = true)
>  |-- city_id: integer (nullable = true)
>  |-- cluster_id: integer (nullable = true)
>  |-- company_id: integer (nullable = true)
>  |-- created_at: string (nullable = true)
>  |-- discount: integer (nullable = true)
>  |-- feedback_created_at: string (nullable = true)
>  |-- feedback_id: integer (nullable = true)
>  |-- hotel_id: integer (nullable = true)
>  |-- hub_id: integer (nullable = true)
>  |-- month: integer (nullable = true)
>  |-- no_show_reason: integer (nullable = true)
>  |-- oyo_rooms: integer (nullable = true)
>  |-- selling_amount: integer (nullable = true)
>  |-- shifting: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- id: integer (nullable = true)
>  |||-- booking_id: integer (nullable = true)
>  |||-- shifting_status: integer (nullable = true)
>  |||-- shifting_reason: integer (nullable = true)
>  |||-- shifting_metadata: integer (nullable = true)
>  |-- suggest_oyo: integer (nullable = true)
>  |-- tickets: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- ticket_source: integer (nullable = true)
>  |||-- ticket_status: string (nullable = true)
>  |||-- ticket_instance_source: integer (nullable = true)
>  |||-- ticket_category: string (nullable = true)
>  |-- updated_at: timestamp (nullable = true)
>  |-- year: integer (nullable = true)
>  |-- zone_id: integer (nullable = true)
>
> root
>  |-- booking_id: integer (nullable = true)
>  |-- booking_rooms_room_category_id: integer (nullable = true)
>  |-- booking_rooms_room_id: integer (nullable = true)
>  |-- booking_source: integer (nullable = true)
>  |-- booking_status: integer (nullable = true)
>  |-- cancellation_reason: integer (nullable = true)
>  |-- checkin: string (nullable = true)
>  |-- checkout: string (nullable = true)
>  |-- city_id: integer (nullable = true)
>  |-- cluster_id: integer (nullable = true)
>  |-- company_id: integer (nullable = true)
>  |-- created_at: string (nullable = true)
>  |-- discount: integer (nullable = true)
>  |-- feedback_created_at: string (nullable = true)
>  |-- feedback_id: integer (nullable = true)
>  |-- hotel_id: integer (nullable = true)
>  |-- hub_id: integer (nullable = true)
>  |-- month: integer (nullable = true)
>  |-- no_show_reason: integer (nullable = true)
>  |-- oyo_rooms: integer (nullable = true)
>  |-- selling_amount: integer (nullable = true)
>  |-- shifting: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- id: integer (nullable = true)
>  |||-- booking_id: integer (nullable = true)
>  |||-- shifting_status: integer (nullable = true)
>  |||-- shifting_reason: integer (nullable = true)
>  |||-- shifting_metadata: integer (nullable = true)
>  |-- suggest_oyo: integer (nullable = true)
>  |-- tickets: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- ticket_source: integer (nullable = true)
>  |||-- ticket_status: string (nullable = true)
>  |||-- ticket_instance_source: integer (nullable = true)
>  |||-- ticket_category: string (nullable = true)
>  |-- updated_at: timestamp (nullable = false)
>  |-- year: integer (nullable = true)
>  |-- zone_id: integer (nullable = true)
>
> On Sun, Jun 3, 2018 at 8:05 PM, Alessandro Solimando <
> alessandro.solima...@gmail.com> wrote:
>
>> Hi Pranav,
>> I don´t have an answer to your issue, but what I generally do in this
>> cases is to first try to simplify it to a point where it is easier to check
>> what´s going on, and then adding back ¨pieces¨ one by one until I spot the
>> error.
>>
>> In your case I can suggest to:
>>
>> 1) project 

Re: [Spark SQL] error in performing dataset union with complex data type (struct, list)

2018-06-04 Thread Jorge Machado
Try the same union with a dataframe without the arrays types. Could be 
something strange there like ordering or so.

Jorge Machado





> On 4 Jun 2018, at 10:17, Pranav Agrawal  wrote:
> 
> schema is exactly the same, not sure why it is failing though.
> 
> root
>  |-- booking_id: integer (nullable = true)
>  |-- booking_rooms_room_category_id: integer (nullable = true)
>  |-- booking_rooms_room_id: integer (nullable = true)
>  |-- booking_source: integer (nullable = true)
>  |-- booking_status: integer (nullable = true)
>  |-- cancellation_reason: integer (nullable = true)
>  |-- checkin: string (nullable = true)
>  |-- checkout: string (nullable = true)
>  |-- city_id: integer (nullable = true)
>  |-- cluster_id: integer (nullable = true)
>  |-- company_id: integer (nullable = true)
>  |-- created_at: string (nullable = true)
>  |-- discount: integer (nullable = true)
>  |-- feedback_created_at: string (nullable = true)
>  |-- feedback_id: integer (nullable = true)
>  |-- hotel_id: integer (nullable = true)
>  |-- hub_id: integer (nullable = true)
>  |-- month: integer (nullable = true)
>  |-- no_show_reason: integer (nullable = true)
>  |-- oyo_rooms: integer (nullable = true)
>  |-- selling_amount: integer (nullable = true)
>  |-- shifting: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- id: integer (nullable = true)
>  |||-- booking_id: integer (nullable = true)
>  |||-- shifting_status: integer (nullable = true)
>  |||-- shifting_reason: integer (nullable = true)
>  |||-- shifting_metadata: integer (nullable = true)
>  |-- suggest_oyo: integer (nullable = true)
>  |-- tickets: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- ticket_source: integer (nullable = true)
>  |||-- ticket_status: string (nullable = true)
>  |||-- ticket_instance_source: integer (nullable = true)
>  |||-- ticket_category: string (nullable = true)
>  |-- updated_at: timestamp (nullable = true)
>  |-- year: integer (nullable = true)
>  |-- zone_id: integer (nullable = true)
> 
> root
>  |-- booking_id: integer (nullable = true)
>  |-- booking_rooms_room_category_id: integer (nullable = true)
>  |-- booking_rooms_room_id: integer (nullable = true)
>  |-- booking_source: integer (nullable = true)
>  |-- booking_status: integer (nullable = true)
>  |-- cancellation_reason: integer (nullable = true)
>  |-- checkin: string (nullable = true)
>  |-- checkout: string (nullable = true)
>  |-- city_id: integer (nullable = true)
>  |-- cluster_id: integer (nullable = true)
>  |-- company_id: integer (nullable = true)
>  |-- created_at: string (nullable = true)
>  |-- discount: integer (nullable = true)
>  |-- feedback_created_at: string (nullable = true)
>  |-- feedback_id: integer (nullable = true)
>  |-- hotel_id: integer (nullable = true)
>  |-- hub_id: integer (nullable = true)
>  |-- month: integer (nullable = true)
>  |-- no_show_reason: integer (nullable = true)
>  |-- oyo_rooms: integer (nullable = true)
>  |-- selling_amount: integer (nullable = true)
>  |-- shifting: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- id: integer (nullable = true)
>  |||-- booking_id: integer (nullable = true)
>  |||-- shifting_status: integer (nullable = true)
>  |||-- shifting_reason: integer (nullable = true)
>  |||-- shifting_metadata: integer (nullable = true)
>  |-- suggest_oyo: integer (nullable = true)
>  |-- tickets: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- ticket_source: integer (nullable = true)
>  |||-- ticket_status: string (nullable = true)
>  |||-- ticket_instance_source: integer (nullable = true)
>  |||-- ticket_category: string (nullable = true)
>  |-- updated_at: timestamp (nullable = false)
>  |-- year: integer (nullable = true)
>  |-- zone_id: integer (nullable = true)
> 
> On Sun, Jun 3, 2018 at 8:05 PM, Alessandro Solimando 
> mailto:alessandro.solima...@gmail.com>> 
> wrote:
> Hi Pranav,
> I don´t have an answer to your issue, but what I generally do in this cases 
> is to first try to simplify it to a point where it is easier to check what´s 
> going on, and then adding back ¨pieces¨ one by one until I spot the error.
> 
> In your case I can suggest to: 
> 
> 1) project the dataset to the problematic column only (column 21 from your 
> log)
> 2) use explode function to have one element of the array per line
> 3) flatten the struct 
> 
> At each step use printSchema() to double check if the types are as you expect 
> them to be, and if they are the same for both datasets.
> 
> Best regards,
> Alessandro 
> 
> On 2 June 2018 at 19:48, Pranav Agrawal  > wrote:
> can't get around this error when performing union of two datasets 
> (ds1.union(ds2)) having complex data type (struct, list),
> 
> 18/06/02 15:12:00 

Re: [Spark SQL] error in performing dataset union with complex data type (struct, list)

2018-06-04 Thread Pranav Agrawal
schema is exactly the same, not sure why it is failing though.

root
 |-- booking_id: integer (nullable = true)
 |-- booking_rooms_room_category_id: integer (nullable = true)
 |-- booking_rooms_room_id: integer (nullable = true)
 |-- booking_source: integer (nullable = true)
 |-- booking_status: integer (nullable = true)
 |-- cancellation_reason: integer (nullable = true)
 |-- checkin: string (nullable = true)
 |-- checkout: string (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- cluster_id: integer (nullable = true)
 |-- company_id: integer (nullable = true)
 |-- created_at: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- feedback_created_at: string (nullable = true)
 |-- feedback_id: integer (nullable = true)
 |-- hotel_id: integer (nullable = true)
 |-- hub_id: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- no_show_reason: integer (nullable = true)
 |-- oyo_rooms: integer (nullable = true)
 |-- selling_amount: integer (nullable = true)
 |-- shifting: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- id: integer (nullable = true)
 |||-- booking_id: integer (nullable = true)
 |||-- shifting_status: integer (nullable = true)
 |||-- shifting_reason: integer (nullable = true)
 |||-- shifting_metadata: integer (nullable = true)
 |-- suggest_oyo: integer (nullable = true)
 |-- tickets: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- ticket_source: integer (nullable = true)
 |||-- ticket_status: string (nullable = true)
 |||-- ticket_instance_source: integer (nullable = true)
 |||-- ticket_category: string (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- zone_id: integer (nullable = true)

root
 |-- booking_id: integer (nullable = true)
 |-- booking_rooms_room_category_id: integer (nullable = true)
 |-- booking_rooms_room_id: integer (nullable = true)
 |-- booking_source: integer (nullable = true)
 |-- booking_status: integer (nullable = true)
 |-- cancellation_reason: integer (nullable = true)
 |-- checkin: string (nullable = true)
 |-- checkout: string (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- cluster_id: integer (nullable = true)
 |-- company_id: integer (nullable = true)
 |-- created_at: string (nullable = true)
 |-- discount: integer (nullable = true)
 |-- feedback_created_at: string (nullable = true)
 |-- feedback_id: integer (nullable = true)
 |-- hotel_id: integer (nullable = true)
 |-- hub_id: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- no_show_reason: integer (nullable = true)
 |-- oyo_rooms: integer (nullable = true)
 |-- selling_amount: integer (nullable = true)
 |-- shifting: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- id: integer (nullable = true)
 |||-- booking_id: integer (nullable = true)
 |||-- shifting_status: integer (nullable = true)
 |||-- shifting_reason: integer (nullable = true)
 |||-- shifting_metadata: integer (nullable = true)
 |-- suggest_oyo: integer (nullable = true)
 |-- tickets: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- ticket_source: integer (nullable = true)
 |||-- ticket_status: string (nullable = true)
 |||-- ticket_instance_source: integer (nullable = true)
 |||-- ticket_category: string (nullable = true)
 |-- updated_at: timestamp (nullable = false)
 |-- year: integer (nullable = true)
 |-- zone_id: integer (nullable = true)

On Sun, Jun 3, 2018 at 8:05 PM, Alessandro Solimando <
alessandro.solima...@gmail.com> wrote:

> Hi Pranav,
> I don´t have an answer to your issue, but what I generally do in this
> cases is to first try to simplify it to a point where it is easier to check
> what´s going on, and then adding back ¨pieces¨ one by one until I spot the
> error.
>
> In your case I can suggest to:
>
> 1) project the dataset to the problematic column only (column 21 from your
> log)
> 2) use explode function to have one element of the array per line
> 3) flatten the struct
>
> At each step use printSchema() to double check if the types are as you
> expect them to be, and if they are the same for both datasets.
>
> Best regards,
> Alessandro
>
> On 2 June 2018 at 19:48, Pranav Agrawal  wrote:
>
>> can't get around this error when performing union of two datasets
>> (ds1.union(ds2)) having complex data type (struct, list),
>>
>>
>> *18/06/02 15:12:00 INFO ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> org.apache.spark.sql.AnalysisException: Union can only be performed on
>> tables with the compatible column types.
>> array>
>> <>
>> array>
>> at the 21th column of the second table;;*
>> As far as I can tell, they are the same. What am I doing wrong? Any help
>> / workaround appreciated!
>>
>>