exclude rules in analyzer

2022-03-16 Thread Shi Yuhang
I have found that we can use `spark.sql.optimizer.excludedRules` to exclude
rules in the optimizer, but we can't exclude rules in the analyzer.

I wonder why it is not supported or is there any plan to support it?


Re: pivoting panda dataframe

2022-03-16 Thread ayan guha
Column bind is called join in relational world, spark uses the same.

Pivot in true sense is harder to achieve because you really dont know how
many columns you will end up with, but spark has a pivot function

On Thu, 17 Mar 2022 at 9:16 am, Mich Talebzadeh 
wrote:

> OK this is the version that works with Panda only without Spark
>
> import random
> import string
> import math
> import datetime
> import time
> import pandas as pd
>
> class UsedFunctions:
>
>   def randomString(self,length):
> letters = string.ascii_letters
> result_str = ''.join(random.choice(letters) for i in range(length))
> return result_str
>
>   def clustered(self,x,numRows):
> return math.floor(x -1)/numRows
>
>   def scattered(self,x,numRows):
> return abs((x -1 % numRows))* 1.0
>
>   def randomised(self,seed,numRows):
> random.seed(seed)
> return abs(random.randint(0, numRows) % numRows) * 1.0
>
>   def padString(self,x,chars,length):
> n = int(math.log10(x) + 1)
> result_str = ''.join(random.choice(chars) for i in range(length-n)) + 
> str(x)
> return result_str
>
>   def padSingleChar(self,chars,length):
> result_str = ''.join(chars for i in range(length))
> return result_str
>
>   def println(self,lst):
> for ll in lst:
>   print(ll[0])
>
>   def createSomeChars(self):
>   string.ascii_letters = 'ABCDEFGHIJ'
>   return random.choice(string.ascii_letters)
>
> usedFunctions = UsedFunctions()
>
> def main():
> appName = "RandomDataGenerator"
> start_time = time.time()
> randomdata = RandomData()
> dfRandom = randomdata.generateRamdomData()
>
>
> class RandomData:
> def generateRamdomData(self):
>   uf = UsedFunctions()
>   numRows = 10
>   start = 1
>   end = start + numRows - 1
>   print("starting at ID = ", start, ",ending on = ", end)
>   Range = range(start, end)
>   df = pd.DataFrame(map(lambda x: (x, usedFunctions.clustered(x, 
> numRows), \
>usedFunctions.scattered(x, numRows), \
>usedFunctions.randomised(x, numRows), \
>usedFunctions.randomString(10), \
>usedFunctions.padString(x, " ", 20), \
>usedFunctions.padSingleChar("z", 20), \
>usedFunctions.createSomeChars()), 
> Range))
>   pd.set_option("display.max_rows", None, "display.max_columns", None)
>   for col_name in df.columns:
>   print(col_name)
>   print(df.groupby(7).groups)
>   ##print(df)
>
> if __name__ == "__main__":
>   main()
>
> and comes back with this
>
>
> starting at ID =  1 ,ending on =  10
>
> 0
>
> 1
>
> 2
>
> 3
>
> 4
>
> 5
>
> 6
>
> 7
>
> {'B': [5, 7], 'D': [4], 'F': [1], 'G': [0, 3, 6, 8], 'J': [2]}
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 15 Mar 2022 at 22:19, Mich Talebzadeh 
> wrote:
>
>> Thanks, I don't want to use Spark, otherwise I can do this.
>>
>> p_dfm = df.toPandas()  # converting spark DF to Pandas DF
>>
>>
>> Can I do it without using Spark?
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 15 Mar 2022 at 22:08, Bjørn Jørgensen 
>> wrote:
>>
>>> You have a pyspark dataframe and you want to convert it to pandas?
>>>
>>> Convert it first to pandas api on spark
>>>
>>>
>>> pf01 = f01.to_pandas_on_spark()
>>>
>>>
>>> Then convert it to pandas
>>>
>>>
>>> pf01 = f01.to_pandas()
>>>
>>> Or?
>>>
>>> tir. 15. mar. 2022, 22:56 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
 Thanks everyone.

 I want to do the following in pandas and numpy without using spark.

 This is what I do in spark to generate some random data using class
 UsedFunctions (not important).

 class UsedFunctions:
   def randomString(self,length):
 letters = string.ascii_letters
 result_str = ''.join(random.choice(letters) for i in range(length))
 return result_str
   def clustered(self,x,numRows):

Re: pivoting panda dataframe

2022-03-16 Thread Mich Talebzadeh
OK this is the version that works with Panda only without Spark

import random
import string
import math
import datetime
import time
import pandas as pd

class UsedFunctions:

  def randomString(self,length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str

  def clustered(self,x,numRows):
return math.floor(x -1)/numRows

  def scattered(self,x,numRows):
return abs((x -1 % numRows))* 1.0

  def randomised(self,seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0

  def padString(self,x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) + str(x)
return result_str

  def padSingleChar(self,chars,length):
result_str = ''.join(chars for i in range(length))
return result_str

  def println(self,lst):
for ll in lst:
  print(ll[0])

  def createSomeChars(self):
  string.ascii_letters = 'ABCDEFGHIJ'
  return random.choice(string.ascii_letters)

usedFunctions = UsedFunctions()

def main():
appName = "RandomDataGenerator"
start_time = time.time()
randomdata = RandomData()
dfRandom = randomdata.generateRamdomData()


class RandomData:
def generateRamdomData(self):
  uf = UsedFunctions()
  numRows = 10
  start = 1
  end = start + numRows - 1
  print("starting at ID = ", start, ",ending on = ", end)
  Range = range(start, end)
  df = pd.DataFrame(map(lambda x: (x, usedFunctions.clustered(x, numRows), \
   usedFunctions.scattered(x, numRows), \
   usedFunctions.randomised(x, numRows), \
   usedFunctions.randomString(10), \
   usedFunctions.padString(x, " ", 20), \
   usedFunctions.padSingleChar("z", 20), \
   usedFunctions.createSomeChars()), Range))
  pd.set_option("display.max_rows", None, "display.max_columns", None)
  for col_name in df.columns:
  print(col_name)
  print(df.groupby(7).groups)
  ##print(df)

if __name__ == "__main__":
  main()

and comes back with this


starting at ID =  1 ,ending on =  10

0

1

2

3

4

5

6

7

{'B': [5, 7], 'D': [4], 'F': [1], 'G': [0, 3, 6, 8], 'J': [2]}



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 15 Mar 2022 at 22:19, Mich Talebzadeh 
wrote:

> Thanks, I don't want to use Spark, otherwise I can do this.
>
> p_dfm = df.toPandas()  # converting spark DF to Pandas DF
>
>
> Can I do it without using Spark?
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 15 Mar 2022 at 22:08, Bjørn Jørgensen 
> wrote:
>
>> You have a pyspark dataframe and you want to convert it to pandas?
>>
>> Convert it first to pandas api on spark
>>
>>
>> pf01 = f01.to_pandas_on_spark()
>>
>>
>> Then convert it to pandas
>>
>>
>> pf01 = f01.to_pandas()
>>
>> Or?
>>
>> tir. 15. mar. 2022, 22:56 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>> Thanks everyone.
>>>
>>> I want to do the following in pandas and numpy without using spark.
>>>
>>> This is what I do in spark to generate some random data using class
>>> UsedFunctions (not important).
>>>
>>> class UsedFunctions:
>>>   def randomString(self,length):
>>> letters = string.ascii_letters
>>> result_str = ''.join(random.choice(letters) for i in range(length))
>>> return result_str
>>>   def clustered(self,x,numRows):
>>> return math.floor(x -1)/numRows
>>>   def scattered(self,x,numRows):
>>> return abs((x -1 % numRows))* 1.0
>>>   def randomised(self,seed,numRows):
>>> random.seed(seed)
>>> return abs(random.randint(0, numRows) % numRows) * 1.0
>>>   def padString(self,x,chars,length):
>>> n = int(math.log10(x) + 1)
>>> result_str = ''.join(random.choice(chars) for i in range(length-n))
>>> + str(x)
>>> return result_str
>>>   def padSingleChar(self,chars,length):
>>> result_str = ''.join(chars for i in 

Unsubscribe

2022-03-16 Thread van wilson


> On Mar 16, 2022, at 7:38 AM,   wrote:
> 
> Thanks, Jayesh and all. I finally get the correlation data frame using agg 
> with list of functions.
> I think the list of functions which generate a column should be more detailed 
> description.
> 
> Liang
> 
> - 原始邮件 -
> 发件人:"Lalwani, Jayesh" 
> 收件人:"ckgppl_...@sina.cn" , Enrico Minack 
> , Sean Owen 
> 抄送人:user 
> 主题:Re: 回复:Re: 回复:Re: calculate 
> correlation_between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
> 日期:2022年03月16日 20点49分
> 
> No, You don’t need 30 dataframes and self joins. Convert a list of columns to 
> a list of functions, and then pass the list of functions to the agg function
> 
>  
> 
>  
> 
> From: "ckgppl_...@sina.cn" 
> Reply-To: "ckgppl_...@sina.cn" 
> Date: Wednesday, March 16, 2022 at 8:16 AM
> To: Enrico Minack , Sean Owen 
> Cc: user 
> Subject: [EXTERNAL] 回复:Re: 回复:Re: calculate correlation 
> between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
> 
>  
> 
> CAUTION: This email originated from outside of the organization. Do not click 
> links or open attachments unless you can confirm the sender and know the 
> content is safe.
> 
>  
> 
> Thanks, Enrico.
> 
> I just found that I need to group the data frame then calculate the 
> correlation. So I will get a list of dataframe, not columns. 
> 
> So I used following solution:
> 
> 1.   use following codes to create a mutable data frame df_all. I used 
> the first datacol to calculate correlation.  
> df.groupby("groupid").agg(functions.corr("datacol1","corr_col")
> 
> 2.   iterate all remaining datacol columns, create a temp data frame for 
> this iteration. In this iteration, use df_all to join the temp data frame on 
> the groupid column, then drop duplicated groupid column.
> 
> 3.   after the iteration, I will get the dataframe which contains all 
> correlation data.
> 
> 
> 
> 
> I need to verify the data to make sure it is valid.
> 
> 
> 
> 
> Liang
> 
> - 原始邮件 -
> 发件人:Enrico Minack 
> 收件人:ckgppl_...@sina.cn, Sean Owen 
> 抄送人:user 
> 主题:Re: 回复:Re: calculate correlation 
> between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
> 日期:2022年03月16日 19点53分
> 
>  
> 
> If you have a list of Columns called `columns`, you can pass them to the 
> `agg` method as:
> 
>  
> 
>   agg(columns.head, columns.tail: _*)
> 
>  
> 
> Enrico
> 
>  
> 
>  
> 
> Am 16.03.22 um 08:02 schrieb ckgppl_...@sina.cn :
> 
> Thanks, Sean. I modified the codes and have generated a list of columns.
> 
> I am working on convert a list of columns to a new data frame. It seems that 
> there is no direct  API to do this.
> 
>  
> 
> - 原始邮件 -
> 发件人:Sean Owen  
> 收件人:ckgppl_...@sina.cn 
> 抄送人:user  
> 主题:Re: calculate correlation between multiple columns and one specific column 
> after groupby the spark data frame
> 日期:2022年03月16日 11点55分
> 
>  
> 
> Are you just trying to avoid writing the function call 30 times? Just put 
> this in a loop over all the columns instead, which adds a new corr col every 
> time to a list. 
> 
> On Tue, Mar 15, 2022, 10:30 PM  > wrote:
> 
> Hi all,
> 
>  
> 
> I am stuck at  a correlation calculation problem. I have a dataframe like 
> below:
> 
> groupid
> 
> datacol1
> 
> datacol2
> 
> datacol3
> 
> datacol*
> 
> corr_co
> 
> 1
> 
> 1
> 
> 2
> 
> 3
> 
> 4
> 
> 5
> 
> 1
> 
> 2
> 
> 3
> 
> 4
> 
> 6
> 
> 5
> 
> 2
> 
> 4
> 
> 2
> 
> 1
> 
> 7
> 
> 5
> 
> 2
> 
> 8
> 
> 9
> 
> 3
> 
> 2
> 
> 5
> 
> 3
> 
> 7
> 
> 1
> 
> 2
> 
> 3
> 
> 5
> 
> 3
> 
> 3
> 
> 5
> 
> 3
> 
> 1
> 
> 5
> 
> I want to calculate the correlation between all datacol columns and corr_col 
> column by each groupid.
> 
> So I used the following spark scala-api codes:
> 
> df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))
> 
>  
> 
> This is very inefficient. If I have 30 data_col columns, I need to input 30 
> times functions.corr to calculate correlation.
> 
> I have searched, it seems that functions.corr doesn't accept a List/Array 
> parameter, and df.agg doesn't accept a function to be parameter.
> 
> So any  spark scala API codes can do this job efficiently?
> 
>  
> 
> Thanks
> 
>  
> 
> Liang
> 
>  
> 



回复:Re: 回复:Re: 回复:Re: calculate correlation_between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame

2022-03-16 Thread ckgppl_yan
Thanks, Jayesh and all. I finally get the correlation data frame using agg with 
list of functions.I think the list of functions which generate a column should 
be more detailed description.
Liang
- 原始邮件 -
发件人:"Lalwani, Jayesh" 
收件人:"ckgppl_...@sina.cn" , Enrico Minack 
, Sean Owen 
抄送人:user 
主题:Re: 回复:Re:  回复:Re: calculate 
correlation_between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
日期:2022年03月16日 20点49分


No, You don’t need 30 dataframes and self joins. Convert a list of columns to a 
list of functions, and then pass the list of functions to the agg function
 
 

From: "ckgppl_...@sina.cn" 

Reply-To: "ckgppl_...@sina.cn" 

Date: Wednesday, March 16, 2022 at 8:16 AM

To: Enrico Minack , Sean Owen 

Cc: user 

Subject: [EXTERNAL] 回复:Re:
回复:Re: calculate correlation 
between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame


 






CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless
 you can confirm the sender and know the content is safe.





 


Thanks, Enrico.


I just found that I need to group the data frame then calculate the 
correlation. So I will get a list of dataframe, not columns. 


So I used following solution:



1.  
use following codes to create a mutable data frame df_all. I used the first 
datacol to calculate correlation.  
df.groupby("groupid").agg(functions.corr("datacol1","corr_col")

2.  
iterate all remaining datacol columns, create a temp data frame for this 
iteration. In this iteration, use df_all to join the temp data frame on the 
groupid
 column, then drop duplicated groupid column.

3.  
after the iteration, I will get the dataframe which contains all correlation 
data.









I need to verify the data to make sure it is valid.









Liang



- 
原始邮件 -

发件人:Enrico Minack 

收件人:ckgppl_...@sina.cn, Sean Owen 

抄送人:user 

主题:Re:
回复:Re: calculate correlation 
between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame

日期:2022年03月16日
 19点53分

 

If you have a list of Columns called `columns`, you can pass them to the `agg` 
method as:


 


  agg(columns.head, columns.tail: _*)


 


Enrico


 


 


Am 16.03.22 um 08:02 schrieb 
ckgppl_...@sina.cn:



Thanks, Sean. I modified the codes and have generated a list of columns.


I am working on convert a list of columns to a new data frame. It seems that 
there is no direct  API to do this.


 



- 
原始邮件 -

发件人:Sean Owen


收件人:ckgppl_...@sina.cn

抄送人:user


主题:Re: calculate correlation between multiple
 columns and one specific column after groupby the spark data frame

日期:2022年03月16日
 11点55分

 


Are you just trying to avoid writing the function call 30 times? Just put this 
in a loop over all the columns instead, which adds a new corr col every time to 
a list. 


On Tue, Mar 15, 2022, 10:30 PM  wrote:



Hi all,


 



I am stuck at  a correlation calculation problem. I have a dataframe like below:





groupid


datacol1


datacol2


datacol3


datacol*


corr_co






1


1


2


3


4


5




1


2


3


4


6


5




2


4


2


1


7


5




2


8


9


3


2


5




3


7


1


2


3


5




3


3


5


3


1


5






I want to calculate the correlation between all datacol columns and corr_col 
column by each groupid.



So I used the following spark scala-api codes:


df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))



 


This is very inefficient. If I have 30 data_col columns, I need to input 30 
times functions.corr to calculate correlation.


I have searched, it seems that functions.corr doesn't accept a List/Array 
parameter, and df.agg doesn't accept a function to be parameter.

So any  spark scala API codes can do this job efficiently?


 


Thanks



 


Liang







 




Skip single integration test case in Spark on K8s

2022-03-16 Thread Pralabh Kumar
Hi Spark team

I am running Spark kubernetes integration test suite on cloud.

build/mvn install \

-f  pom.xml \

-pl resource-managers/kubernetes/integration-tests -am -Pscala-2.12
-Phadoop-3.1.1 -Phive -Phive-thriftserver -Pyarn -Pkubernetes
-Pkubernetes-integration-tests \

-Djava.version=8 \

-Dspark.kubernetes.test.sparkTgz= \

-Dspark.kubernetes.test.imageTag=<> \

-Dspark.kubernetes.test.imageRepo=< repo> \

-Dspark.kubernetes.test.deployMode=cloud \

-Dtest.include.tags=k8s \

-Dspark.kubernetes.test.javaImageTag= \

-Dspark.kubernetes.test.namespace= \

-Dspark.kubernetes.test.serviceAccountName=spark \

-Dspark.kubernetes.test.kubeConfigContext=<> \

-Dspark.kubernetes.test.master=<> \

-Dspark.kubernetes.test.jvmImage=<> \

-Dspark.kubernetes.test.pythonImage=<> \

-Dlog4j.logger.org.apache.spark=DEBUG



I am successfully able to run some test cases and some are failing . For
e.g "Run SparkRemoteFileTest using a Remote data file" in KuberneterSuite
is failing.


Is there a way to skip running some of the test cases ?.



Please help me on the same.


Regards

Pralabh Kumar


Re: 回复:Re: 回复:Re: calculate correlation between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame

2022-03-16 Thread Lalwani, Jayesh
No, You don’t need 30 dataframes and self joins. Convert a list of columns to a 
list of functions, and then pass the list of functions to the agg function


From: "ckgppl_...@sina.cn" 
Reply-To: "ckgppl_...@sina.cn" 
Date: Wednesday, March 16, 2022 at 8:16 AM
To: Enrico Minack , Sean Owen 
Cc: user 
Subject: [EXTERNAL] 回复:Re: 回复:Re: calculate correlation 
between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks, Enrico.
I just found that I need to group the data frame then calculate the 
correlation. So I will get a list of dataframe, not columns.
So I used following solution:
1.   use following codes to create a mutable data frame df_all. I used the 
first datacol to calculate correlation.  
df.groupby("groupid").agg(functions.corr("datacol1","corr_col")
2.   iterate all remaining datacol columns, create a temp data frame for 
this iteration. In this iteration, use df_all to join the temp data frame on 
the groupid column, then drop duplicated groupid column.
3.   after the iteration, I will get the dataframe which contains all 
correlation data.


I need to verify the data to make sure it is valid.


Liang
- 原始邮件 -
发件人:Enrico Minack 
收件人:ckgppl_...@sina.cn, Sean Owen 
抄送人:user 
主题:Re: 回复:Re: calculate correlation 
between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
日期:2022年03月16日 19点53分

If you have a list of Columns called `columns`, you can pass them to the `agg` 
method as:

  agg(columns.head, columns.tail: _*)

Enrico


Am 16.03.22 um 08:02 schrieb ckgppl_...@sina.cn:
Thanks, Sean. I modified the codes and have generated a list of columns.
I am working on convert a list of columns to a new data frame. It seems that 
there is no direct  API to do this.

- 原始邮件 -
发件人:Sean Owen 
收件人:ckgppl_...@sina.cn
抄送人:user 
主题:Re: calculate correlation between multiple columns and one specific column 
after groupby the spark data frame
日期:2022年03月16日 11点55分

Are you just trying to avoid writing the function call 30 times? Just put this 
in a loop over all the columns instead, which adds a new corr col every time to 
a list.
On Tue, Mar 15, 2022, 10:30 PM mailto:ckgppl_...@sina.cn>> 
wrote:
Hi all,


I am stuck at  a correlation calculation problem. I have a dataframe like below:
groupid

datacol1

datacol2

datacol3

datacol*

corr_co

1

1

2

3

4

5

1

2

3

4

6

5

2

4

2

1

7

5

2

8

9

3

2

5

3

7

1

2

3

5

3

3

5

3

1

5

I want to calculate the correlation between all datacol columns and corr_col 
column by each groupid.
So I used the following spark scala-api codes:
df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))

This is very inefficient. If I have 30 data_col columns, I need to input 30 
times functions.corr to calculate correlation.

I have searched, it seems that functions.corr doesn't accept a List/Array 
parameter, and df.agg doesn't accept a function to be parameter.
So any  spark scala API codes can do this job efficiently?

Thanks

Liang




Play data development with Scala and Spark

2022-03-16 Thread Bitfox
Hello,

I have written a free book which is available online, giving a beginner
introduction to Scala and Spark development.

https://github.com/bitfoxtop/Play-Data-Development-with-Scala-and-Spark/blob/main/PDDWS2-v1.pdf

If you can read Chinese then you are welcome to give any feedback. I will
update the content in my free time.

Thank you.


回复:Re: 回复:Re: calculate correlation between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame

2022-03-16 Thread ckgppl_yan
Thanks, Enrico.I just found that I need to group the data frame then calculate 
the correlation. So I will get a list of dataframe, not columns. So I used 
following solution:use following codes to create a mutable data frame df_all. I 
used the first datacol to calculate correlation.  
df.groupby("groupid").agg(functions.corr("datacol1","corr_col")iterate all 
remaining datacol columns, create a temp data frame for this iteration. In this 
iteration, use df_all to join the temp data frame on the groupid column, then 
drop duplicated groupid column.after the iteration, I will get the dataframe 
which contains all correlation data.
I need to verify the data to make sure it is valid.
Liang- 原始邮件 -
发件人:Enrico Minack 
收件人:ckgppl_...@sina.cn, Sean Owen 
抄送人:user 
主题:Re: 回复:Re: calculate correlation 
between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame
日期:2022年03月16日 19点53分

If you have a list of Columns called
  `columns`, you can pass them to the `agg` method as:



  agg(columns.head, columns.tail: _*)





Enrico






Am 16.03.22 um 08:02 schrieb
  ckgppl_...@sina.cn:



  
  Thanks, Sean. I modified the codes and have generated a list
of columns.
  I am working on convert a list of columns to a new data
frame. It seems that there is no direct  API to do this.
  

  
  
- 原始邮件 -

  发件人:Sean Owen 

  收件人:ckgppl_...@sina.cn

  抄送人:user 

  主题:Re: calculate correlation between multiple columns and one
  specific column after groupby the spark data frame

  日期:2022年03月16日 11点55分





  Are you just trying to avoid writing the function call 30
times? Just put this in a loop over all the columns instead,
which adds a new corr col every time to a list. 




  On Tue, Mar 15, 2022, 10:30 PM

wrote:

  
  
Hi all,




  I am stuck at
   a correlation calculation problem. I have a
  dataframe like below:
  

  
  groupid
  datacol1
  datacol2
  datacol3
  datacol*
  corr_co

  

  1
  1
  2
  3
  4
  5


  1
  2
  3
  4
  6
  5


  2
  4
  2
  1
  7
  5


  2
  8
  9
  3
  2
  5


  3
  7
  1
  2
  3
  5


  3
  3
  5
  3
  1
  5

  

  
  I want to calculate the
  correlation between all datacol columns and
  corr_col column by each groupid.

So I used the following spark
scala-api codes:

df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))

  


  This is very inefficient. If I
  have 30 data_col columns, I need to input 30 times
  functions.corr to calculate correlation.
  I have searched, it seems
  that functions.corr doesn't accept a List/Array
  parameter, and df.agg doesn't accept a function to
  be parameter.
  So any  spark scala API codes can 

Re: 回复:Re: calculate correlation between multiple columns and one specific column after groupby the spark data frame

2022-03-16 Thread Enrico Minack
If you have a list of Columns called `columns`, you can pass them to the 
`agg` method as:


  agg(columns.head, columns.tail: _*)

Enrico


Am 16.03.22 um 08:02 schrieb ckgppl_...@sina.cn:

Thanks, Sean. I modified the codes and have generated a list of columns.
I am working on convert a list of columns to a new data frame. It 
seems that there is no direct  API to do this.


- 原始邮件 -
发件人:Sean Owen 
收件人:ckgppl_...@sina.cn
抄送人:user 
主题:Re: calculate correlation between multiple columns and one specific 
column after groupby the spark data frame

日期:2022年03月16日 11点55分

Are you just trying to avoid writing the function call 30 times? Just 
put this in a loop over all the columns instead, which adds a new corr 
col every time to a list.


On Tue, Mar 15, 2022, 10:30 PM  wrote:

Hi all,

I am stuck at  a correlation calculation problem. I have a
dataframe like below:

groupid datacol1datacol2datacol3datacol*
corr_co
1   1   2   3   4   5
1   2   3   4   6   5
2   4   2   1   7   5
2   8   9   3   2   5
3   7   1   2   3   5
3   3   5   3   1   5

I want to calculate the correlation between all datacol columns
and corr_col column by each groupid.
So I used the following spark scala-api codes:

df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))

This is very inefficient. If I have 30 data_col columns, I need to
input 30 times functions.corr to calculate correlation.

I have searched, it seems that functions.corr doesn't accept a
List/Array parameter, and df.agg doesn't accept a function to be
parameter.

So any  spark scala API codes can do this job efficiently?

Thanks

Liang



Re: Question on List to DF

2022-03-16 Thread Gourav Sengupta
Hi Jayesh,

thanks found your email quite interesting :)


Regards,
Gourav

On Wed, Mar 16, 2022 at 8:02 AM Bitfox  wrote:

> Thank you. that makes sense.
>
> On Wed, Mar 16, 2022 at 2:03 PM Lalwani, Jayesh 
> wrote:
>
>> The toDF function in scala uses a bit of Scala magic that allows you to
>> add methods to existing classes. Here’s a link to explanation
>> https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch01s11.html
>>
>>
>>
>> In short, you can implement a class that extends the List class and add
>> methods to your  list class, and you can implement an implicit converter
>> that converts from List to your class. When the Scala compiler sees that
>> you are calling a function on a List object that doesn’t exist in the List
>> class, it will look for implicit converters that convert List object to
>> another object that has the function, and will automatically call it.
>>
>> So, if you have a class
>>
>> Class MyList extends List {
>> def toDF(colName: String): DataFrame{
>> …..
>> }
>> }
>>
>> and a implicit converter
>> implicit def convertListToMyList(list: List): MyList {
>>
>> ….
>> }
>>
>> when you do
>> List("apple","orange","cherry").toDF("fruit")
>>
>>
>>
>> Internally, Scala will generate the code as
>> convertListToMyList(List("apple","orange","cherry")).toDF("fruit")
>>
>>
>>
>>
>>
>> *From: *Bitfox 
>> *Date: *Wednesday, March 16, 2022 at 12:06 AM
>> *To: *"user @spark" 
>> *Subject: *[EXTERNAL] Question on List to DF
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> I am wondering why the list in scala spark can be converted into a
>> dataframe directly?
>>
>>
>>
>> scala> val df = List("apple","orange","cherry").toDF("fruit")
>>
>> *df*: *org.apache.spark.sql.DataFrame* = [fruit: string]
>>
>>
>>
>> scala> df.show
>>
>> +--+
>>
>> | fruit|
>>
>> +--+
>>
>> | apple|
>>
>> |orange|
>>
>> |cherry|
>>
>> +--+
>>
>>
>>
>> I don't think pyspark can convert that as well.
>>
>>
>>
>> Thank you.
>>
>


Re: spark 3.2.1: Unexpected reuse of dynamic PVC

2022-03-16 Thread Andreas Weise
minor correction:
>> (hence our *ReadWriteOnce* Storage should be sufficient right?...

On Wed, Mar 16, 2022 at 11:33 AM Andreas Weise 
wrote:

> Hi,
>
> when using dynamic allocation on k8s with dynamic pvc reuse, I face that
> only few executors are running. 2 of 4 are stucked in 'ContainerCreating'
> with Events like:
> spark-medium-1x-38b7c47f92340e9e-exec-3 : Multi-Attach error for volume
> "pvc-c184e264-4a6d-406f-8d95-c59ff9e074d8" Volume is already used by pod(s)
> spark-medium-1x-38b7c47f92340e9e-exec-2
>
> According to the documentation, only PVCs of deleted executors should be
> reused (hence our ReadOnlyMany Storage should be sufficient right?). But
> the executor of the reused pvc is still running. Is this expected ?
>
> Config:
>
> spark.dynamicAllocation.enabled=true
> spark.dynamicAllocation.maxExecutors=4
> spark.dynamicAllocation.minExecutors=1
> spark.dynamicAllocation.executorIdleTimeout=60s
> spark.dynamicAllocation.shuffleTracking.enabled=true
> spark.kubernetes.driver.ownPersistentVolumeClaim=true
> spark.kubernetes.driver.reusePersistentVolumeClaim=true
>
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand
>
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=sc-openshift-default
>
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=1Gi
>
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data/
>
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false
>
> Log excerpt (full log attached):
>
> INFO [2022-03-16 11:09:21,678] ({kubernetes-executor-snapshots-subscribers-0} 
> Logging.scala[logInfo]:57) - Going to request 1 executors from Kubernetes for 
> ResourceProfile Id: 0, target: 1, known: 0, sharedSlotFromPendingPods: 
> 2147483647.
> INFO [2022-03-16 11:09:21,684] ({kubernetes-executor-snapshots-subscribers-0} 
> Logging.scala[logInfo]:57) - Found 0 reusable PVCs from 0 PVCs
> INFO [2022-03-16 11:09:21,686] ({kubernetes-executor-snapshots-subscribers-0} 
> Logging.scala[logInfo]:57) - Spark configuration files loaded from 
> Some(/opt/conda/lib/python3.9/site-packages/pyspark/conf) : 
> log4j.properties,hive-site.xml
> INFO [2022-03-16 11:09:21,687] ({kubernetes-executor-snapshots-subscribers-0} 
> Logging.scala[logInfo]:57) - Adding decommission script to lifecycle
> INFO [2022-03-16 11:09:21,689] 
> ({FIFOScheduler-interpreter_510428346-Worker-1} Logging.scala[logInfo]:57) - 
> Using initial executors = 1, max of spark.dynamicAllocation.initialExecutors, 
> spark.dynamicAllocation.minExecutors and spark.executor.instances
> WARN [2022-03-16 11:09:21,690] 
> ({FIFOScheduler-interpreter_510428346-Worker-1} Logging.scala[logWarning]:69) 
> - Dynamic allocation without a shuffle service is an experimental feature.
> INFO [2022-03-16 11:09:21,775] ({kubernetes-executor-snapshots-subscribers-0} 
> Logging.scala[logInfo]:57) - Trying to create PersistentVolumeClaim 
> spark-medium-1x-38b7c47f92340e9e-exec-2-pvc-0 with StorageClass 
> sc-openshift-default
> INFO [2022-03-16 11:09:37,220] ({dispatcher-CoarseGrainedScheduler} 
> Logging.scala[logInfo]:57) - Registered executor 
> NettyRpcEndpointRef(spark-client://Executor) (10.128.6.67:57144) with ID 2,  
> ResourceProfileId 0
> INFO [2022-03-16 11:09:37,225] ({spark-listener-group-executorManagement} 
> Logging.scala[logInfo]:57) - New executor 2 has registered (new total is 1)
>
> ...
>
> INFO [2022-03-16 11:09:51,708] ({kubernetes-executor-snapshots-subscribers-1} 
> Logging.scala[logInfo]:57) - Going to request 1 executors from Kubernetes for 
> ResourceProfile Id: 0, target: 2, known: 1, sharedSlotFromPendingPods: 
> 2147483647.
> INFO [2022-03-16 11:09:51,709] ({spark-dynamic-executor-allocation} 
> Logging.scala[logInfo]:57) - Requesting 1 new executor because tasks are 
> backlogged (new desired total will be 2 for resource profile id: 0)
> INFO [2022-03-16 11:09:51,717] ({kubernetes-executor-snapshots-subscribers-1} 
> Logging.scala[logInfo]:57) - Found 1 reusable PVCs from 1 PVCs
> INFO [2022-03-16 11:09:51,719] ({kubernetes-executor-snapshots-subscribers-1} 
> Logging.scala[logInfo]:57) - Spark configuration files loaded from 
> Some(/opt/conda/lib/python3.9/site-packages/pyspark/conf) : 
> log4j.properties,hive-site.xml
> INFO [2022-03-16 11:09:51,721] ({kubernetes-executor-snapshots-subscribers-1} 
> Logging.scala[logInfo]:57) - Adding decommission script to lifecycle
> INFO [2022-03-16 11:09:51,726] ({kubernetes-executor-snapshots-subscribers-1} 
> Logging.scala[logInfo]:57) - Reuse PersistentVolumeClaim 
> spark-medium-1x-38b7c47f92340e9e-exec-2-pvc-0
> INFO [2022-03-16 11:09:52,713] ({spark-dynamic-executor-allocation} 
> Logging.scala[logInfo]:57) - Requesting 2 new executors because tasks are 
> backlogged (new desired total will be 4 for resource profile id: 0)
> INFO [2022-03-16 11:09:52,813] ({kubernetes-executor-snapshots-subscribers-0} 
> 

Re: Question on List to DF

2022-03-16 Thread Bitfox
Thank you. that makes sense.

On Wed, Mar 16, 2022 at 2:03 PM Lalwani, Jayesh  wrote:

> The toDF function in scala uses a bit of Scala magic that allows you to
> add methods to existing classes. Here’s a link to explanation
> https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch01s11.html
>
>
>
> In short, you can implement a class that extends the List class and add
> methods to your  list class, and you can implement an implicit converter
> that converts from List to your class. When the Scala compiler sees that
> you are calling a function on a List object that doesn’t exist in the List
> class, it will look for implicit converters that convert List object to
> another object that has the function, and will automatically call it.
>
> So, if you have a class
>
> Class MyList extends List {
> def toDF(colName: String): DataFrame{
> …..
> }
> }
>
> and a implicit converter
> implicit def convertListToMyList(list: List): MyList {
>
> ….
> }
>
> when you do
> List("apple","orange","cherry").toDF("fruit")
>
>
>
> Internally, Scala will generate the code as
> convertListToMyList(List("apple","orange","cherry")).toDF("fruit")
>
>
>
>
>
> *From: *Bitfox 
> *Date: *Wednesday, March 16, 2022 at 12:06 AM
> *To: *"user @spark" 
> *Subject: *[EXTERNAL] Question on List to DF
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> I am wondering why the list in scala spark can be converted into a
> dataframe directly?
>
>
>
> scala> val df = List("apple","orange","cherry").toDF("fruit")
>
> *df*: *org.apache.spark.sql.DataFrame* = [fruit: string]
>
>
>
> scala> df.show
>
> +--+
>
> | fruit|
>
> +--+
>
> | apple|
>
> |orange|
>
> |cherry|
>
> +--+
>
>
>
> I don't think pyspark can convert that as well.
>
>
>
> Thank you.
>


回复:Re: calculate correlation between multiple columns and one specific column after groupby the spark data frame

2022-03-16 Thread ckgppl_yan
Thanks, Sean. I modified the codes and have generated a list of columns.I am 
working on convert a list of columns to a new data frame. It seems that there 
is no direct  API to do this.
- 原始邮件 -
发件人:Sean Owen 
收件人:ckgppl_...@sina.cn
抄送人:user 
主题:Re: calculate correlation between multiple columns and one specific column 
after groupby the spark data frame
日期:2022年03月16日 11点55分

Are you just trying to avoid writing the function call 30 times? Just put this 
in a loop over all the columns instead, which adds a new corr col every time to 
a list. 

On Tue, Mar 15, 2022, 10:30 PM   wrote:
Hi all,
I am stuck at  a correlation calculation problem. I have a dataframe like 
below:groupiddatacol1datacol2datacol3datacol*corr_co112345123465242175289325371235335315I
 want to calculate the correlation between all datacol columns and corr_col 
column by each groupid.So I used the following spark scala-api 
codes:df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))
This is very inefficient. If I have 30 data_col columns, I need to input 30 
times functions.corr to calculate correlation.I have searched, it seems that 
functions.corr doesn't accept a List/Array parameter, and df.agg doesn't accept 
a function to be parameter.So any  spark scala API codes can do this job 
efficiently?
Thanks
Liang

Re: Question on List to DF

2022-03-16 Thread Lalwani, Jayesh
The toDF function in scala uses a bit of Scala magic that allows you to add 
methods to existing classes. Here’s a link to explanation 
https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch01s11.html

In short, you can implement a class that extends the List class and add methods 
to your  list class, and you can implement an implicit converter that converts 
from List to your class. When the Scala compiler sees that you are calling a 
function on a List object that doesn’t exist in the List class, it will look 
for implicit converters that convert List object to another object that has the 
function, and will automatically call it.

So, if you have a class
Class MyList extends List {
def toDF(colName: String): DataFrame{
…..
}
}

and a implicit converter
implicit def convertListToMyList(list: List): MyList {
….
}

when you do
List("apple","orange","cherry").toDF("fruit")

Internally, Scala will generate the code as
convertListToMyList(List("apple","orange","cherry")).toDF("fruit")


From: Bitfox 
Date: Wednesday, March 16, 2022 at 12:06 AM
To: "user @spark" 
Subject: [EXTERNAL] Question on List to DF


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


I am wondering why the list in scala spark can be converted into a dataframe 
directly?


scala> val df = List("apple","orange","cherry").toDF("fruit")

df: org.apache.spark.sql.DataFrame = [fruit: string]



scala> df.show

+--+

| fruit|

+--+

| apple|

|orange|

|cherry|

+--+



I don't think pyspark can convert that as well.



Thank you.