Question on List to DF

2022-03-15 Thread Bitfox
I am wondering why the list in scala spark can be converted into a
dataframe directly?

scala> val df = List("apple","orange","cherry").toDF("fruit")

*df*: *org.apache.spark.sql.DataFrame* = [fruit: string]


scala> df.show

+--+

| fruit|

+--+

| apple|

|orange|

|cherry|

+--+


I don't think pyspark can convert that as well.


Thank you.


Re: calculate correlation between multiple columns and one specific column after groupby the spark data frame

2022-03-15 Thread Sean Owen
Are you just trying to avoid writing the function call 30 times? Just put
this in a loop over all the columns instead, which adds a new corr col
every time to a list.

On Tue, Mar 15, 2022, 10:30 PM  wrote:

> Hi all,
>
> I am stuck at  a correlation calculation problem. I have a dataframe like
> below:
> groupiddatacol1datacol2datacol3datacol*corr_co
> 1 1 2 3 4 5
> 1 2 3 4 6 5
> 2 4 2 1 7 5
> 2 8 9 3 2 5
> 3 7 1 2 3 5
> 3 3 5 3 1 5
> I want to calculate the correlation between all datacol columns and
> corr_col column by each groupid.
> So I used the following spark scala-api codes:
>
> df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))
>
> This is very inefficient. If I have 30 data_col columns, I need to input
> 30 times functions.corr to calculate correlation.
>
> I have searched, it seems that functions.corr doesn't accept a List/Array
> parameter, and df.agg doesn't accept a function to be parameter.
> So any  spark scala API codes can do this job efficiently?
>
> Thanks
>
> Liang
>


calculate correlation between multiple columns and one specific column after groupby the spark data frame

2022-03-15 Thread ckgppl_yan
Hi all,
I am stuck at  a correlation calculation problem. I have a dataframe like 
below:groupiddatacol1datacol2datacol3datacol*corr_co112345123465242175289325371235335315I
 want to calculate the correlation between all datacol columns and corr_col 
column by each groupid.So I used the following spark scala-api 
codes:df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))
This is very inefficient. If I have 30 data_col columns, I need to input 30 
times functions.corr to calculate correlation.I have searched, it seems that 
functions.corr doesn't accept a List/Array parameter, and df.agg doesn't accept 
a function to be parameter.So any  spark scala API codes can do this job 
efficiently?
Thanks
Liang

Re: pivoting panda dataframe

2022-03-15 Thread Mich Talebzadeh
Thanks, I don't want to use Spark, otherwise I can do this.

p_dfm = df.toPandas()  # converting spark DF to Pandas DF


Can I do it without using Spark?


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 15 Mar 2022 at 22:08, Bjørn Jørgensen 
wrote:

> You have a pyspark dataframe and you want to convert it to pandas?
>
> Convert it first to pandas api on spark
>
>
> pf01 = f01.to_pandas_on_spark()
>
>
> Then convert it to pandas
>
>
> pf01 = f01.to_pandas()
>
> Or?
>
> tir. 15. mar. 2022, 22:56 skrev Mich Talebzadeh  >:
>
>> Thanks everyone.
>>
>> I want to do the following in pandas and numpy without using spark.
>>
>> This is what I do in spark to generate some random data using class
>> UsedFunctions (not important).
>>
>> class UsedFunctions:
>>   def randomString(self,length):
>> letters = string.ascii_letters
>> result_str = ''.join(random.choice(letters) for i in range(length))
>> return result_str
>>   def clustered(self,x,numRows):
>> return math.floor(x -1)/numRows
>>   def scattered(self,x,numRows):
>> return abs((x -1 % numRows))* 1.0
>>   def randomised(self,seed,numRows):
>> random.seed(seed)
>> return abs(random.randint(0, numRows) % numRows) * 1.0
>>   def padString(self,x,chars,length):
>> n = int(math.log10(x) + 1)
>> result_str = ''.join(random.choice(chars) for i in range(length-n)) +
>> str(x)
>> return result_str
>>   def padSingleChar(self,chars,length):
>> result_str = ''.join(chars for i in range(length))
>> return result_str
>>   def println(self,lst):
>> for ll in lst:
>>   print(ll[0])
>>
>>
>> usedFunctions = UsedFunctions()
>>
>> start = 1
>> end = start + 9
>> print ("starting at ID = ",start, ",ending on = ",end)
>> Range = range(start, end)
>> rdd = sc.parallelize(Range). \
>>  map(lambda x: (x, usedFunctions.clustered(x,numRows), \
>>usedFunctions.scattered(x,numRows), \
>>usedFunctions.randomised(x,numRows), \
>>usedFunctions.randomString(50), \
>>usedFunctions.padString(x," ",50), \
>>usedFunctions.padSingleChar("x",4000)))
>> df = rdd.toDF()
>>
>> OK how can I create a panda DataFrame df without using Spark?
>>
>> Thanks
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 15 Mar 2022 at 21:19, Bjørn Jørgensen 
>> wrote:
>>
>>> Hi Andrew. Mitch asked, and I answered transpose()
>>> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
>>> .
>>>
>>> And now you are asking in the same thread about pandas API on spark and
>>> the transform().
>>>
>>> Apache Spark have pandas API on Spark.
>>>
>>> Which means that spark has an API call for pandas functions, and when
>>> you use pandas API on spark it is spark you are using then.
>>>
>>> Add this line in yours import
>>>
>>> from pyspark import pandas as ps
>>>
>>>
>>> Now you can pass yours dataframe back and forward to pandas API on spark
>>> by using
>>>
>>> pf01 = f01.to_pandas_on_spark()
>>>
>>>
>>> f01 = pf01.to_spark()
>>>
>>>
>>> Note that I have changed pd to ps here.
>>>
>>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})
>>>
>>> df.transform(lambda x: x + 1)
>>>
>>> You will now see that all numbers are +1
>>>
>>> You can find more information about pandas API on spark transform
>>> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
>>> or in yours notbook
>>> df.transform?
>>>
>>> Signature:
>>> df.transform(
>>> func: Callable[..., ForwardRef('Series')],
>>> axis: Union[int, str] = 0,
>>> *args: Any,
>>> **kwargs: Any,) -> 'DataFrame'Docstring:
>>> Call ``func`` on self producing a Series with transformed values
>>> and that has the same length as its input.
>>>
>>> See also `Transform and apply a function
>>> `_.

Re: pivoting panda dataframe

2022-03-15 Thread Bjørn Jørgensen
Colums bind in r is concatinat in pandas
https://www.datasciencemadesimple.com/append-concatenate-columns-python-pandas-column-bind/


Please start a now thread for each questions.

tir. 15. mar. 2022, 22:59 skrev Andrew Davidson :

> Many many thanks!
>
>
>
> I have been looking for a pyspark data frame  column_bind() solution for
> several months. Hopefully pyspark.pandas  works. The only other solutions I
> was aware of was to use spark.dataframe.join(). This does not scale for
> obvious reason.
>
>
>
> Andy
>
>
>
>
>
> *From: *Bjørn Jørgensen 
> *Date: *Tuesday, March 15, 2022 at 2:19 PM
> *To: *Andrew Davidson 
> *Cc: *Mich Talebzadeh , "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: pivoting panda dataframe
>
>
>
> Hi Andrew. Mitch asked, and I answered transpose()
> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
> .
>
>
>
> And now you are asking in the same thread about pandas API on spark and
> the transform().
>
>
>
> Apache Spark have pandas API on Spark.
>
>
>
> Which means that spark has an API call for pandas functions, and when you
> use pandas API on spark it is spark you are using then.
>
>
>
> Add this line in yours import
>
>
>
> from pyspark import pandas as ps
>
>
>
>
>
> Now you can pass yours dataframe back and forward to pandas API on spark
> by using
>
>
>
> pf01 = f01.to_pandas_on_spark()
>
>
> f01 = pf01.to_spark()
>
>
>
>
>
> Note that I have changed pd to ps here.
>
>
>
> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})
>
>
>
> df.transform(lambda x: x + 1)
>
>
>
> You will now see that all numbers are +1
>
>
>
> You can find more information about pandas API on spark transform
> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
>
> or in yours notbook
>
> df.transform?
>
>
>
> Signature:
>
> df.transform(
>
> func: Callable[..., ForwardRef('Series')],
>
> axis: Union[int, str] = 0,
>
> *args: Any,
>
> **kwargs: Any,
>
> ) -> 'DataFrame'
>
> Docstring:
>
> Call ``func`` on self producing a Series with transformed values
>
> and that has the same length as its input.
>
>
>
> See also `Transform and apply a function
>
> `_.
>
>
>
> .. note:: this API executes the function once to infer the type which is
>
>  potentially expensive, for instance, when the dataset is created after
>
>  aggregations or sorting.
>
>
>
>  To avoid this, specify return type in ``func``, for instance, as below:
>
>
>
>  >>> def square(x) -> ps.Series[np.int32]:
>
>  ... return x ** 2
>
>
>
>  pandas-on-Spark uses return type hint and does not try to infer the type.
>
>
>
> .. note:: the series within ``func`` is actually multiple pandas series as the
>
> segments of the whole pandas-on-Spark series; therefore, the length of 
> each series
>
> is not guaranteed. As an example, an aggregation against each series
>
> does work as a global aggregation but an aggregation of each segment. See
>
> below:
>
>
>
> >>> def func(x) -> ps.Series[np.int32]:
>
> ... return x + sum(x)
>
>
>
> Parameters
>
> --
>
> func : function
>
> Function to use for transforming the data. It must work when pandas Series
>
> is passed.
>
> axis : int, default 0 or 'index'
>
> Can only be set to 0 at the moment.
>
> *args
>
> Positional arguments to pass to func.
>
> **kwargs
>
> Keyword arguments to pass to func.
>
>
>
> Returns
>
> ---
>
> DataFrame
>
> A DataFrame that must have the same length as self.
>
>
>
> Raises
>
> --
>
> Exception : If the returned DataFrame has a different length than self.
>
>
>
> See Also
>
> 
>
> DataFrame.aggregate : Only perform aggregating type operations.
>
> DataFrame.apply : Invoke function on DataFrame.
>
> Series.transform : The equivalent function for Series.
>
>
>
> Examples
>
> 
>
> >>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)}, columns=['A', 'B'])
>
> >>> df
>
>A  B
>
> 0  0  1
>
> 1  1  2
>
> 2  2  3
>
>
>
> >>> def square(x) -> ps.Series[np.int32]:
>
> ... return x ** 2
>
> >>> df.transform(square)
>
>A  B
>
> 0  0  1
>
> 1  1  4
>
> 2  4  9
>
>
>
> You can omit the type hint and let pandas-on-Spark infer its type.
>
>
>
> >>> df.transform(lambda x: x ** 2)
>
>A  B
>
> 0  0  1
>
> 1  1  4
>
> 2  4  9
>
>
>
> For multi-index columns:
>
>
>
> >>> df.columns = [('X', 'A'), ('X', 'B')]
>
> >>> df.transform(square)  # doctest: +NORMALIZE_WHITESPACE
>
>X
>
>A  B
>
> 0  0  1
>
> 1  1  4
>
> 2  4  9
>
>
>
> >>> (df * -1).transform(abs)  # doctest: +NORMALIZE_WHITESPACE
>
>X
>
>A  B
>
> 0  0  1
>
> 1  1  2
>
> 2  2  3
>
>
>
> You can also specify extra arguments.
>
>
>
> >>> def calculation(x, y, z) -> ps.Series[int]:
>
> ... 

Re: pivoting panda dataframe

2022-03-15 Thread Bjørn Jørgensen
You have a pyspark dataframe and you want to convert it to pandas?

Convert it first to pandas api on spark


pf01 = f01.to_pandas_on_spark()


Then convert it to pandas


pf01 = f01.to_pandas()

Or?

tir. 15. mar. 2022, 22:56 skrev Mich Talebzadeh :

> Thanks everyone.
>
> I want to do the following in pandas and numpy without using spark.
>
> This is what I do in spark to generate some random data using class
> UsedFunctions (not important).
>
> class UsedFunctions:
>   def randomString(self,length):
> letters = string.ascii_letters
> result_str = ''.join(random.choice(letters) for i in range(length))
> return result_str
>   def clustered(self,x,numRows):
> return math.floor(x -1)/numRows
>   def scattered(self,x,numRows):
> return abs((x -1 % numRows))* 1.0
>   def randomised(self,seed,numRows):
> random.seed(seed)
> return abs(random.randint(0, numRows) % numRows) * 1.0
>   def padString(self,x,chars,length):
> n = int(math.log10(x) + 1)
> result_str = ''.join(random.choice(chars) for i in range(length-n)) +
> str(x)
> return result_str
>   def padSingleChar(self,chars,length):
> result_str = ''.join(chars for i in range(length))
> return result_str
>   def println(self,lst):
> for ll in lst:
>   print(ll[0])
>
>
> usedFunctions = UsedFunctions()
>
> start = 1
> end = start + 9
> print ("starting at ID = ",start, ",ending on = ",end)
> Range = range(start, end)
> rdd = sc.parallelize(Range). \
>  map(lambda x: (x, usedFunctions.clustered(x,numRows), \
>usedFunctions.scattered(x,numRows), \
>usedFunctions.randomised(x,numRows), \
>usedFunctions.randomString(50), \
>usedFunctions.padString(x," ",50), \
>usedFunctions.padSingleChar("x",4000)))
> df = rdd.toDF()
>
> OK how can I create a panda DataFrame df without using Spark?
>
> Thanks
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 15 Mar 2022 at 21:19, Bjørn Jørgensen 
> wrote:
>
>> Hi Andrew. Mitch asked, and I answered transpose()
>> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
>> .
>>
>> And now you are asking in the same thread about pandas API on spark and
>> the transform().
>>
>> Apache Spark have pandas API on Spark.
>>
>> Which means that spark has an API call for pandas functions, and when you
>> use pandas API on spark it is spark you are using then.
>>
>> Add this line in yours import
>>
>> from pyspark import pandas as ps
>>
>>
>> Now you can pass yours dataframe back and forward to pandas API on spark
>> by using
>>
>> pf01 = f01.to_pandas_on_spark()
>>
>>
>> f01 = pf01.to_spark()
>>
>>
>> Note that I have changed pd to ps here.
>>
>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})
>>
>> df.transform(lambda x: x + 1)
>>
>> You will now see that all numbers are +1
>>
>> You can find more information about pandas API on spark transform
>> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
>> or in yours notbook
>> df.transform?
>>
>> Signature:
>> df.transform(
>> func: Callable[..., ForwardRef('Series')],
>> axis: Union[int, str] = 0,
>> *args: Any,
>> **kwargs: Any,) -> 'DataFrame'Docstring:
>> Call ``func`` on self producing a Series with transformed values
>> and that has the same length as its input.
>>
>> See also `Transform and apply a function
>> `_.
>>
>> .. note:: this API executes the function once to infer the type which is
>>  potentially expensive, for instance, when the dataset is created after
>>  aggregations or sorting.
>>
>>  To avoid this, specify return type in ``func``, for instance, as below:
>>
>>  >>> def square(x) -> ps.Series[np.int32]:
>>  ... return x ** 2
>>
>>  pandas-on-Spark uses return type hint and does not try to infer the 
>> type.
>>
>> .. note:: the series within ``func`` is actually multiple pandas series as 
>> the
>> segments of the whole pandas-on-Spark series; therefore, the length of 
>> each series
>> is not guaranteed. As an example, an aggregation against each series
>> does work as a global aggregation but an aggregation of each segment. See
>> below:
>>
>> >>> def func(x) -> 

Re: pivoting panda dataframe

2022-03-15 Thread Andrew Davidson
Many many thanks!

I have been looking for a pyspark data frame  column_bind() solution for 
several months. Hopefully pyspark.pandas  works. The only other solutions I was 
aware of was to use spark.dataframe.join(). This does not scale for obvious 
reason.

Andy


From: Bjørn Jørgensen 
Date: Tuesday, March 15, 2022 at 2:19 PM
To: Andrew Davidson 
Cc: Mich Talebzadeh , "user @spark" 

Subject: Re: pivoting panda dataframe

Hi Andrew. Mitch asked, and I answered transpose() 
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
 .

And now you are asking in the same thread about pandas API on spark and the 
transform().

Apache Spark have pandas API on Spark.

Which means that spark has an API call for pandas functions, and when you use 
pandas API on spark it is spark you are using then.

Add this line in yours import

from pyspark import pandas as ps


Now you can pass yours dataframe back and forward to pandas API on spark by 
using

pf01 = f01.to_pandas_on_spark()


f01 = pf01.to_spark()


Note that I have changed pd to ps here.

df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})

df.transform(lambda x: x + 1)

You will now see that all numbers are +1

You can find more information about pandas API on spark transform 
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
or in yours notbook
df.transform?


Signature:

df.transform(

func: Callable[..., ForwardRef('Series')],

axis: Union[int, str] = 0,

*args: Any,

**kwargs: Any,

) -> 'DataFrame'

Docstring:

Call ``func`` on self producing a Series with transformed values

and that has the same length as its input.



See also `Transform and apply a function

`_.



.. note:: this API executes the function once to infer the type which is

 potentially expensive, for instance, when the dataset is created after

 aggregations or sorting.



 To avoid this, specify return type in ``func``, for instance, as below:



 >>> def square(x) -> ps.Series[np.int32]:

 ... return x ** 2



 pandas-on-Spark uses return type hint and does not try to infer the type.



.. note:: the series within ``func`` is actually multiple pandas series as the

segments of the whole pandas-on-Spark series; therefore, the length of each 
series

is not guaranteed. As an example, an aggregation against each series

does work as a global aggregation but an aggregation of each segment. See

below:



>>> def func(x) -> ps.Series[np.int32]:

... return x + sum(x)



Parameters

--

func : function

Function to use for transforming the data. It must work when pandas Series

is passed.

axis : int, default 0 or 'index'

Can only be set to 0 at the moment.

*args

Positional arguments to pass to func.

**kwargs

Keyword arguments to pass to func.



Returns

---

DataFrame

A DataFrame that must have the same length as self.



Raises

--

Exception : If the returned DataFrame has a different length than self.



See Also



DataFrame.aggregate : Only perform aggregating type operations.

DataFrame.apply : Invoke function on DataFrame.

Series.transform : The equivalent function for Series.



Examples



>>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)}, columns=['A', 'B'])

>>> df

   A  B

0  0  1

1  1  2

2  2  3



>>> def square(x) -> ps.Series[np.int32]:

... return x ** 2

>>> df.transform(square)

   A  B

0  0  1

1  1  4

2  4  9



You can omit the type hint and let pandas-on-Spark infer its type.



>>> df.transform(lambda x: x ** 2)

   A  B

0  0  1

1  1  4

2  4  9



For multi-index columns:



>>> df.columns = [('X', 'A'), ('X', 'B')]

>>> df.transform(square)  # doctest: +NORMALIZE_WHITESPACE

   X

   A  B

0  0  1

1  1  4

2  4  9



>>> (df * -1).transform(abs)  # doctest: +NORMALIZE_WHITESPACE

   X

   A  B

0  0  1

1  1  2

2  2  3



You can also specify extra arguments.



>>> def calculation(x, y, z) -> ps.Series[int]:

... return x ** y + z

>>> df.transform(calculation, y=10, z=20)  # doctest: +NORMALIZE_WHITESPACE

  X

  A  B

020 21

121   1044

2  1044  59069

File:  /opt/spark/python/pyspark/pandas/frame.py

Type:  method




tir. 15. mar. 2022 kl. 19:33 skrev Andrew Davidson 
mailto:aedav...@ucsc.edu>>:
Hi Bjorn

I have been looking for spark transform for a while. Can you send me a link to 
the pyspark function?

I assume pandas transform is not really an option. I think it will try to pull 
the entire dataframe into the drivers memory.

Kind regards

Andy

p.s. My real problem is that spark does not allow you to 

Re: pivoting panda dataframe

2022-03-15 Thread Mich Talebzadeh
Thanks everyone.

I want to do the following in pandas and numpy without using spark.

This is what I do in spark to generate some random data using class
UsedFunctions (not important).

class UsedFunctions:
  def randomString(self,length):
letters = string.ascii_letters
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str
  def clustered(self,x,numRows):
return math.floor(x -1)/numRows
  def scattered(self,x,numRows):
return abs((x -1 % numRows))* 1.0
  def randomised(self,seed,numRows):
random.seed(seed)
return abs(random.randint(0, numRows) % numRows) * 1.0
  def padString(self,x,chars,length):
n = int(math.log10(x) + 1)
result_str = ''.join(random.choice(chars) for i in range(length-n)) +
str(x)
return result_str
  def padSingleChar(self,chars,length):
result_str = ''.join(chars for i in range(length))
return result_str
  def println(self,lst):
for ll in lst:
  print(ll[0])


usedFunctions = UsedFunctions()

start = 1
end = start + 9
print ("starting at ID = ",start, ",ending on = ",end)
Range = range(start, end)
rdd = sc.parallelize(Range). \
 map(lambda x: (x, usedFunctions.clustered(x,numRows), \
   usedFunctions.scattered(x,numRows), \
   usedFunctions.randomised(x,numRows), \
   usedFunctions.randomString(50), \
   usedFunctions.padString(x," ",50), \
   usedFunctions.padSingleChar("x",4000)))
df = rdd.toDF()

OK how can I create a panda DataFrame df without using Spark?

Thanks


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 15 Mar 2022 at 21:19, Bjørn Jørgensen 
wrote:

> Hi Andrew. Mitch asked, and I answered transpose()
> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
> .
>
> And now you are asking in the same thread about pandas API on spark and
> the transform().
>
> Apache Spark have pandas API on Spark.
>
> Which means that spark has an API call for pandas functions, and when you
> use pandas API on spark it is spark you are using then.
>
> Add this line in yours import
>
> from pyspark import pandas as ps
>
>
> Now you can pass yours dataframe back and forward to pandas API on spark
> by using
>
> pf01 = f01.to_pandas_on_spark()
>
>
> f01 = pf01.to_spark()
>
>
> Note that I have changed pd to ps here.
>
> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})
>
> df.transform(lambda x: x + 1)
>
> You will now see that all numbers are +1
>
> You can find more information about pandas API on spark transform
> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
> or in yours notbook
> df.transform?
>
> Signature:
> df.transform(
> func: Callable[..., ForwardRef('Series')],
> axis: Union[int, str] = 0,
> *args: Any,
> **kwargs: Any,) -> 'DataFrame'Docstring:
> Call ``func`` on self producing a Series with transformed values
> and that has the same length as its input.
>
> See also `Transform and apply a function
> `_.
>
> .. note:: this API executes the function once to infer the type which is
>  potentially expensive, for instance, when the dataset is created after
>  aggregations or sorting.
>
>  To avoid this, specify return type in ``func``, for instance, as below:
>
>  >>> def square(x) -> ps.Series[np.int32]:
>  ... return x ** 2
>
>  pandas-on-Spark uses return type hint and does not try to infer the type.
>
> .. note:: the series within ``func`` is actually multiple pandas series as the
> segments of the whole pandas-on-Spark series; therefore, the length of 
> each series
> is not guaranteed. As an example, an aggregation against each series
> does work as a global aggregation but an aggregation of each segment. See
> below:
>
> >>> def func(x) -> ps.Series[np.int32]:
> ... return x + sum(x)
>
> Parameters
> --
> func : function
> Function to use for transforming the data. It must work when pandas Series
> is passed.
> axis : int, default 0 or 'index'
> Can only be set to 0 at the moment.
> *args
> Positional arguments to pass to func.
> **kwargs
> Keyword arguments to pass to func.
>
> Returns
> ---
> DataFrame
> A DataFrame that must have the same 

Re: Continuous ML model training in stream mode

2022-03-15 Thread Artemis User
Thanks Sean!  Well, it looks like we have to abandon our structured 
streaming model to use DStream for this, or do you see possibility to 
use structured streaming with ml instead of mllib?


On 3/15/22 4:51 PM, Sean Owen wrote:
There is a streaming k-means example in Spark. 
https://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means


On Tue, Mar 15, 2022, 3:46 PM Artemis User  wrote:

Has anyone done any experiments of training an ML model using stream
data? especially for unsupervised models?   Any
suggestions/references
are highly appreciated...

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pivoting panda dataframe

2022-03-15 Thread Bjørn Jørgensen
Hi Andrew. Mitch asked, and I answered transpose()
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
.

And now you are asking in the same thread about pandas API on spark and the
transform().

Apache Spark have pandas API on Spark.

Which means that spark has an API call for pandas functions, and when you
use pandas API on spark it is spark you are using then.

Add this line in yours import

from pyspark import pandas as ps


Now you can pass yours dataframe back and forward to pandas API on spark by
using

pf01 = f01.to_pandas_on_spark()


f01 = pf01.to_spark()


Note that I have changed pd to ps here.

df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})

df.transform(lambda x: x + 1)

You will now see that all numbers are +1

You can find more information about pandas API on spark transform
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
or in yours notbook
df.transform?

Signature:
df.transform(
func: Callable[..., ForwardRef('Series')],
axis: Union[int, str] = 0,
*args: Any,
**kwargs: Any,) -> 'DataFrame'Docstring:
Call ``func`` on self producing a Series with transformed values
and that has the same length as its input.

See also `Transform and apply a function
`_.

.. note:: this API executes the function once to infer the type which is
 potentially expensive, for instance, when the dataset is created after
 aggregations or sorting.

 To avoid this, specify return type in ``func``, for instance, as below:

 >>> def square(x) -> ps.Series[np.int32]:
 ... return x ** 2

 pandas-on-Spark uses return type hint and does not try to infer the type.

.. note:: the series within ``func`` is actually multiple pandas series as the
segments of the whole pandas-on-Spark series; therefore, the
length of each series
is not guaranteed. As an example, an aggregation against each series
does work as a global aggregation but an aggregation of each segment. See
below:

>>> def func(x) -> ps.Series[np.int32]:
... return x + sum(x)

Parameters
--
func : function
Function to use for transforming the data. It must work when pandas Series
is passed.
axis : int, default 0 or 'index'
Can only be set to 0 at the moment.
*args
Positional arguments to pass to func.
**kwargs
Keyword arguments to pass to func.

Returns
---
DataFrame
A DataFrame that must have the same length as self.

Raises
--
Exception : If the returned DataFrame has a different length than self.

See Also

DataFrame.aggregate : Only perform aggregating type operations.
DataFrame.apply : Invoke function on DataFrame.
Series.transform : The equivalent function for Series.

Examples

>>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)}, columns=['A', 'B'])
>>> df
   A  B
0  0  1
1  1  2
2  2  3

>>> def square(x) -> ps.Series[np.int32]:
... return x ** 2
>>> df.transform(square)
   A  B
0  0  1
1  1  4
2  4  9

You can omit the type hint and let pandas-on-Spark infer its type.

>>> df.transform(lambda x: x ** 2)
   A  B
0  0  1
1  1  4
2  4  9

For multi-index columns:

>>> df.columns = [('X', 'A'), ('X', 'B')]
>>> df.transform(square)  # doctest: +NORMALIZE_WHITESPACE
   X
   A  B
0  0  1
1  1  4
2  4  9

>>> (df * -1).transform(abs)  # doctest: +NORMALIZE_WHITESPACE
   X
   A  B
0  0  1
1  1  2
2  2  3

You can also specify extra arguments.

>>> def calculation(x, y, z) -> ps.Series[int]:
... return x ** y + z
>>> df.transform(calculation, y=10, z=20)  # doctest: +NORMALIZE_WHITESPACE
  X
  A  B
020 21
121   1044
2  1044  59069File:
/opt/spark/python/pyspark/pandas/frame.pyType:  method





tir. 15. mar. 2022 kl. 19:33 skrev Andrew Davidson :

> Hi Bjorn
>
>
>
> I have been looking for spark transform for a while. Can you send me a
> link to the pyspark function?
>
>
>
> I assume pandas transform is not really an option. I think it will try to
> pull the entire dataframe into the drivers memory.
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> p.s. My real problem is that spark does not allow you to bind columns. You
> can use union() to bind rows. I could get the equivalent of cbind() using
> union().transform()
>
>
>
> *From: *Bjørn Jørgensen 
> *Date: *Tuesday, March 15, 2022 at 10:37 AM
> *To: *Mich Talebzadeh 
> *Cc: *"user @spark" 
> *Subject: *Re: pivoting panda dataframe
>
>
>
>
> https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html 
> we
> have that transpose in pandas api for spark to.
>
>
>
> You also have stack() and multilevel
> https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html
>
>
>
>
>
>
>
> tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh <
> 

Re: How Spark establishes connectivity to Hive

2022-03-15 Thread Artemis User
I guess it's really depends on your configuration.  The Hive metastore 
is providing just the metadata/schema data for your database, not actual 
data storage.  Hive is running on top of Hadoop. If you configure your 
Spark to run on the same Hadoop cluster using Yarn, your SQL dataframe 
in Spark will be converted automatically to data files in HDFS in Hive 
format provided by the metastore.  So HDFS is the connecting medium 
between Spark and Hive.


The default Spark distribution package is bundled with a thrift server 
that you can save/retrieve dataframe to/from Hive tables using on a 
standalone metastore, without the presence of a Hive server.  However, 
you do have to run a script (not provided by Spark) to initialize the 
standalone metastore.  In the standalone Hive mode, Spark read/write 
from/to these hive tables directly, which are stored as plain text files 
on disk (not parquet).


The benefit of using a thrift server is having the option of 
saving/retrieving Spark data by third party applications via JDBC 
directly.  Since most people running Spark without Hadoop, and you can 
run a standalone Hive to achieve the same outcome, not sure what 
benefits of running Spark on HDFS and Hive would bring, considering the 
huge admin overhead associated with both Hadoop and Hive.  Hope this 
helps...



On 3/14/22 1:54 PM, Venkatesan Muniappan wrote:

hi Team,

I wanted to understand how spark connects to Hive. Does it connect to 
Hive metastore directly bypassing hive server?. Lets say when we are 
inserting data into a hive table with its I/O format as Parquet. Does 
Spark creates the parquet file from the Dataframe/RDD/DataSet and put 
it in its HDFS location and update metastore about the new parquet 
file?. Or it simply run the insert statement on Hiverserver (through 
jdbc or some other means).


We are using Spark 2.4.3 and Hive 2.1.1 in our cluster.

Is there a document that explains about this?. Please share.

Thanks,
Venkat
2016173438


Re: Continuous ML model training in stream mode

2022-03-15 Thread Sean Owen
There is a streaming k-means example in Spark.
https://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means

On Tue, Mar 15, 2022, 3:46 PM Artemis User  wrote:

> Has anyone done any experiments of training an ML model using stream
> data? especially for unsupervised models?   Any suggestions/references
> are highly appreciated...
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Continuous ML model training in stream mode

2022-03-15 Thread Artemis User
Has anyone done any experiments of training an ML model using stream 
data? especially for unsupervised models?   Any suggestions/references 
are highly appreciated...


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pivoting panda dataframe

2022-03-15 Thread Andrew Davidson
Hi Bjorn

I have been looking for spark transform for a while. Can you send me a link to 
the pyspark function?

I assume pandas transform is not really an option. I think it will try to pull 
the entire dataframe into the drivers memory.

Kind regards

Andy

p.s. My real problem is that spark does not allow you to bind columns. You can 
use union() to bind rows. I could get the equivalent of cbind() using 
union().transform()

From: Bjørn Jørgensen 
Date: Tuesday, March 15, 2022 at 10:37 AM
To: Mich Talebzadeh 
Cc: "user @spark" 
Subject: Re: pivoting panda dataframe

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html we 
have that transpose in pandas api for spark to.

You also have stack() and multilevel 
https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html



tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>:


hi,



Is it possible to pivot a panda dataframe by making the row column heading?



thanks




 [Image removed by sender.]   view my Linkedin 
profile

 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




--
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: pivoting panda dataframe

2022-03-15 Thread Bjørn Jørgensen
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html we
have that transpose in pandas api for spark to.

You also have stack() and multilevel
https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html



tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

>
> hi,
>
>
> Is it possible to pivot a panda dataframe by making the row column
> heading?
>
>
> thanks
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


pivoting panda dataframe

2022-03-15 Thread Mich Talebzadeh
hi,


Is it possible to pivot a panda dataframe by making the row column heading?


thanks



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.