Re: Help With unstructured text file with spark scala

2022-02-08 Thread Bitfox
Hello

You can treat it as a csf file and load it from spark:

>>> df = spark.read.format("csv").option("inferSchema",
"true").option("header", "true").option("sep","#").load(csv_file)

>>> df.show()

++---+-+

|   Plano|Código Beneficiário|Nome Beneficiário|

++---+-+

|58693 - NACIONAL ...|   65751353|   Jose Silva|

|58693 - NACIONAL ...|   65751388|  Joana Silva|

|58693 - NACIONAL ...|   65751353| Felipe Silva|

|58693 - NACIONAL ...|   65751388|  Julia Silva|

++---+-+



cat csv_file:


Plano#Código Beneficiário#Nome Beneficiário

58693 - NACIONAL R COPART PJCE#065751353#Jose Silva

58693 - NACIONAL R COPART PJCE#065751388#Joana Silva

58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva

58693 - NACIONAL R COPART PJCE#065751388#Julia Silva



Regards



On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa 
wrote:

> Hi
> I have to transform unstructured text to dataframe.
> Could anyone please help with Scala code ?
>
> Dataframe need as:
>
> operadora filial unidade contrato empresa plano codigo_beneficiario
> nome_beneficiario
>
> Relação de Beneficiários Ativos e Excluídos
> Carteira em#27/12/2019##Todos os Beneficiários
> Operadora#AMIL
> Filial#SÃO PAULO#Unidade#Guarulhos
>
> Contrato#123456 - Test
> Empresa#Test
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>
> Contrato#898011000 - FUNDACAO GERDAU
> Empresa#FUNDACAO GERDAU
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: question on the different way of RDD to dataframe

2022-02-08 Thread frakass

I think it's better as:

df1.map { case(w,x,y,z) => columns(w,x,y,z) }

Thanks


On 2022/2/9 12:46, Mich Talebzadeh wrote:
scala> val df2 = df1.map(p => columns(p(0).toString,p(1).toString, 
p(2).toString,p(3).toString.toDouble)) // map those columns


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: flatMap for dataframe

2022-02-08 Thread frakass

Is this the scala syntax?
Yes in scala I know how to do it by converting the df to a dataset.
how for pyspark?

Thanks

On 2022/2/9 10:24, oliver dd wrote:

df.flatMap(row => row.getAs[String]("value").split(" "))


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: flatMap for dataframe

2022-02-08 Thread oliver dd
Hi,

You can achieve your goal by:

df.flatMap(row => row.getAs[String]("value").split(" "))

—
Best Regards,
oliverdding

Re: question on the different way of RDD to dataframe

2022-02-08 Thread frakass

I know that using case class I can control the data type strictly.

scala> val rdd = sc.parallelize(List(("apple",1),("orange",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] 
at parallelize at :23


scala> rdd.toDF.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)


I can specify the second column to other type such as Double by case class:

scala> rdd.map{ case (x,y) => Fruit(x,y) }.toDF.printSchema
root
 |-- fruit: string (nullable = true)
 |-- num: double (nullable = false)



Thank you.



On 2022/2/8 10:32, Sean Owen wrote:
It's just a possibly tidier way to represent objects with named, typed 
fields, in order to specify a DataFrame's contents.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



flatMap for dataframe

2022-02-08 Thread frakass

Hello

for the RDD I can apply flatMap method:

>>> sc.parallelize(["a few words","ba na ba na"]).flatMap(lambda x: 
x.split(" ")).collect()

['a', 'few', 'words', 'ba', 'na', 'ba', 'na']


But for a dataframe table how can I flatMap that as above?

>>> df.show()
++
|   value|
++
| a few lines|
|hello world here|
| ba na ba na|
++


Thanks

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does spark support something like the bind function in R?

2022-02-08 Thread ayan guha
Hi

In python, or in general in spark, you can just "read" the files and select
the column. I am assuming you are reading each file individually in
separate dataframes and joining them. Instead, you can read all the files
in single dataframe and select 1 column.

On Wed, Feb 9, 2022 at 2:55 AM Andrew Davidson 
wrote:

> I need to create a single table by selecting one column from thousands of
> files. The columns are all of the same type, have the same number of rows
> and rows names. I am currently using join. I get OOM on mega-mem cluster
> with 2.8 TB.
>
>
>
> Does spark have something like cbind() “Take a sequence of vector, matrix
> or data-frame arguments and combine by *c*olumns or *r*ows,
> respectively. “
>
>
>
> https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/cbind
>
>
>
> Digging through the spark documentation I found a udf example
>
> https://spark.apache.org/docs/latest/sparkr.html#dapply
>
>
>
> ```
>
> *# Convert waiting time from hours to seconds.*
>
> *# Note that we can apply UDF to DataFrame.*
>
> schema <- structType(structField("eruptions", "double"), structField(
> "waiting", "double"),
>
>  structField("waiting_secs", "double"))
>
> df1 <- dapply(df, *function*(x) { x <- cbind(x, x$waiting * 60) }, schema)
>
> head(collect(df1))
>
> *##  eruptions waiting waiting_secs*
>
> *##1 3.600  79 4740*
>
> *##2 1.800  54 3240*
>
> *##3 3.333  74 4440*
>
> *##4 2.283  62 3720*
>
> *##5 4.533  85 5100*
>
> *##6 2.883  55 3300*
>
> ```
>
>
>
> I wonder if this is just a wrapper around join? If so it is probably not
> going to help me out.
>
>
>
> Also I would prefer to work in python
>
>
>
> Any thoughts?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: StructuredStreaming - foreach/foreachBatch

2022-02-08 Thread Mich Talebzadeh
BTW you can check this Linkedin article of mine on Processing Change Data
Capture with Spark Structured Streaming



It covers the concept of triggers including trigger(once = True) or
one-time batch in Spark Structured Streaming


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:

> Thanks, Mich .. that worked fine!
>
>
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh 
> wrote:
>
>> read below
>>
>> """
>>"foreach" performs custom write logic on each row and
>> "foreachBatch" performs custom write logic on each micro-batch through
>> SendToBigQuery function
>> *foreachBatch(SendToBigQuery) expects 2 parameters,
>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>> batch --> batchId*
>>Using foreachBatch, we write each micro batch to storage
>> defined in our custom logic. In this case, we store the output of our
>> streaming application to Google BigQuery table.
>>Note that we are appending data and column "rowkey" is
>> defined as UUID so it can be used as the primary key
>> """
>> result = streamingDataFrame.select( \
>>  col("parsed_value.rowkey").alias("rowkey") \
>>, col("parsed_value.ticker").alias("ticker") \
>>, col("parsed_value.timeissued").alias("timeissued") \
>>, col("parsed_value.price").alias("price")). \
>>  writeStream. \
>>  outputMode('append'). \
>>  option("truncate", "false"). \
>>  *foreachBatch(SendToBigQuery)*. \
>>  trigger(processingTime='2 seconds'). \
>>  start()
>>
>> now you define your function *SendToBigQuery() *
>>
>>
>> *def SendToBigQuery(df, batchId):*
>>
>> if(len(df.take(1))) > 0:
>>
>> df.printSchema()
>>
>> print(f"""batchId is {batchId}""")
>>
>> rows = df.count()
>>
>> print(f""" Total records processed in this run = {rows}""")
>>
>> ..
>>
>> else:
>>
>> print("DataFrame is empty")
>>
>> *HTH*
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:
>>
>>> Hello All,
>>>
>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>> transformation on each individual row.
>>>
>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>> Basic question - how is the row passed to the function when foreach is
>>> used ?
>>>
>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>> function called ? How do I access individual rows ?
>>>
>>> Details are in stackoverflow :
>>>
>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>
>>> What is the best approach for this use-case ?
>>>
>>> tia!
>>>
>>


Re: Help With unstructured text file with spark scala

2022-02-08 Thread Lalwani, Jayesh
You will need to provide more info.

Does the data contain records? 
Are the records "homogenous" ; ie; do they have the same fields?
What is the format of the data?
Are records separated by lines/seperators?
Is the data sharded across multiple files?
How big is each shard?



On 2/8/22, 11:50 AM, "Danilo Sousa"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi
I have to transform unstructured text to dataframe.
Could anyone please help with Scala code ?

Dataframe need as:

operadora filial unidade contrato empresa plano codigo_beneficiario 
nome_beneficiario

Relação de Beneficiários Ativos e Excluídos
Carteira em#27/12/2019##Todos os Beneficiários
Operadora#AMIL
Filial#SÃO PAULO#Unidade#Guarulhos

Contrato#123456 - Test
Empresa#Test
Plano#Código Beneficiário#Nome Beneficiário
58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
58693 - NACIONAL R COPART PJCE#073930313#Maria Silva

Contrato#898011000 - FUNDACAO GERDAU
Empresa#FUNDACAO GERDAU
Plano#Código Beneficiário#Nome Beneficiário
58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: Does spark have something like rowsum() in R?

2022-02-08 Thread Sean Owen
That seems like a fine way to do it. Why you're running out of mem is
probably more a function of your parallelism, cluster size, and the fact
that R is a memory hog.
I'm not sure there are great alternatives in R and Spark; in other
languages you might more directly get the array of (numeric?) row value and
sum them efficiently. Certainly pandas UDFs would make short work of that.

On Tue, Feb 8, 2022 at 10:02 AM Andrew Davidson 
wrote:

> As part of my data normalization process I need to calculate row sums. The
> following code works on smaller test data sets. It does not work on my big
> tables. When I run on a table with over 10,000 columns I get an OOM on a
> cluster with 2.8 TB. Is there a better way to implement this
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/rowsum
>
> “Compute column sums across rows of a numeric matrix-like object for each
> level of a grouping variable. “
>
>
>
>
> ###
>
> def rowSums( self, countsSparkDF, newColName, columnNames ):
>
> '''
>
> calculates actual sum of columns
>
>
>
> arguments
>
> countSparkDF
>
>
>
> newColumName:
>
> results from column sum will be sorted here
>
>
>
> columnNames:
>
> list of columns to sum
>
>
>
> returns
>
> amended countSparkDF
>
> '''
>
> self.logger.warn( "rowSumsImpl BEGIN" )
>
>
>
> # https://stackoverflow.com/a/54283997/4586180
>
> retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName ,
> reduce( add, [col( x ) for x in columnNames] ) )
>
>
>
> # self.logger.warn( "rowSums retDF numRows:{} numCols:{}"\
>
> #  .format( retDF.count(), len( retDF.columns ) )
> )
>
> #
>
> # self.logger.warn("AEDWIP remove show")
>
> # retDF.show()
>
>
>
> self.logger.warn( "rowSumsImpl END\n" )
>
> return retDF
>
>
>
>
>


Help With unstructured text file with spark scala

2022-02-08 Thread Danilo Sousa
Hi
I have to transform unstructured text to dataframe.
Could anyone please help with Scala code ?

Dataframe need as:

operadora filial unidade contrato empresa plano codigo_beneficiario 
nome_beneficiario

Relação de Beneficiários Ativos e Excluídos
Carteira em#27/12/2019##Todos os Beneficiários
Operadora#AMIL
Filial#SÃO PAULO#Unidade#Guarulhos

Contrato#123456 - Test
Empresa#Test
Plano#Código Beneficiário#Nome Beneficiário
58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
58693 - NACIONAL R COPART PJCE#073930313#Maria Silva

Contrato#898011000 - FUNDACAO GERDAU
Empresa#FUNDACAO GERDAU
Plano#Código Beneficiário#Nome Beneficiário
58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: question on the different way of RDD to dataframe

2022-02-08 Thread Mich Talebzadeh
As Sean mentioned Scala case class  is a handy way of representing objects
with names and types. For example, if you are reading a csv file with
spaced column names like "counter party" etc and you want a more
compact column name like counterparty etc


scala> val location="hdfs://rhes75:9000/tmp/crap.csv"

location: String = hdfs://rhes75:9000/tmp/crap.csv

scala> val df1 = spark.read.option("header", false).csv(location)  // don't
read the header

df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 34 more
fields]  // column header are represted as _c0, _c1 etc

scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
PRICE: Double)  // create name and type for _c0, _c1 and so forth

defined class columns

scala> val df2 = df1.map(p => columns(p(0).toString,p(1).toString,
p(2).toString,p(3).toString.toDouble)) // map those columns

df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string
... 2 more fields]

scala> df2.printSchema

root

 |-- KEY: string (nullable = true)

 |-- TICKER: string (nullable = true)

 |-- TIMEISSUED: string (nullable = true)

 |-- PRICE: double (nullable = false)

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 8 Feb 2022 at 14:32, Sean Owen  wrote:

> It's just a possibly tidier way to represent objects with named, typed
> fields, in order to specify a DataFrame's contents.
>
> On Tue, Feb 8, 2022 at 4:16 AM  wrote:
>
>> Hello
>>
>> I am converting some py code to scala.
>> This works in python:
>>
>> >>> rdd = sc.parallelize([('apple',1),('orange',2)])
>> >>> rdd.toDF(['fruit','num']).show()
>> +--+---+
>> | fruit|num|
>> +--+---+
>> | apple|  1|
>> |orange|  2|
>> +--+---+
>>
>> And in scala:
>> scala> rdd.toDF("fruit","num").show()
>> +--+---+
>> | fruit|num|
>> +--+---+
>> | apple|  1|
>> |orange|  2|
>> +--+---+
>>
>> But I saw many code that use a case class for translation.
>>
>> scala> case class Fruit(fruit:String,num:Int)
>> defined class Fruit
>>
>> scala> rdd.map{case (x,y) => Fruit(x,y) }.toDF().show()
>> +--+---+
>> | fruit|num|
>> +--+---+
>> | apple|  1|
>> |orange|  2|
>> +--+---+
>>
>>
>> Do you know why to use a "case class" here?
>>
>> thanks.
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Does spark have something like rowsum() in R?

2022-02-08 Thread Andrew Davidson
As part of my data normalization process I need to calculate row sums. The 
following code works on smaller test data sets. It does not work on my big 
tables. When I run on a table with over 10,000 columns I get an OOM on a 
cluster with 2.8 TB. Is there a better way to implement this

Kind regards

Andy

https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/rowsum
“Compute column sums across rows of a numeric matrix-like object for each level 
of a grouping variable. “



###

def rowSums( self, countsSparkDF, newColName, columnNames ):

'''

calculates actual sum of columns



arguments

countSparkDF



newColumName:

results from column sum will be sorted here



columnNames:

list of columns to sum



returns

amended countSparkDF

'''

self.logger.warn( "rowSumsImpl BEGIN" )



# https://stackoverflow.com/a/54283997/4586180

retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName , reduce( 
add, [col( x ) for x in columnNames] ) )



# self.logger.warn( "rowSums retDF numRows:{} numCols:{}"\

#  .format( retDF.count(), len( retDF.columns ) ) )

#

# self.logger.warn("AEDWIP remove show")

# retDF.show()



self.logger.warn( "rowSumsImpl END\n" )

return retDF




Does spark support something like the bind function in R?

2022-02-08 Thread Andrew Davidson
I need to create a single table by selecting one column from thousands of 
files. The columns are all of the same type, have the same number of rows and 
rows names. I am currently using join. I get OOM on mega-mem cluster with 2.8 
TB.

Does spark have something like cbind() “Take a sequence of vector, matrix or 
data-frame arguments and combine by columns or rows, respectively. “

https://www.rdocumentation.org/packages/base/versions/3.6.2/topics/cbind

Digging through the spark documentation I found a udf example
https://spark.apache.org/docs/latest/sparkr.html#dapply

```
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", 
"double"),
 structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1 3.600  79 4740
##2 1.800  54 3240
##3 3.333  74 4440
##4 2.283  62 3720
##5 4.533  85 5100
##6 2.883  55 3300
```

I wonder if this is just a wrapper around join? If so it is probably not going 
to help me out.

Also I would prefer to work in python

Any thoughts?

Kind regards

Andy




Re: question on the different way of RDD to dataframe

2022-02-08 Thread Sean Owen
It's just a possibly tidier way to represent objects with named, typed
fields, in order to specify a DataFrame's contents.

On Tue, Feb 8, 2022 at 4:16 AM  wrote:

> Hello
>
> I am converting some py code to scala.
> This works in python:
>
> >>> rdd = sc.parallelize([('apple',1),('orange',2)])
> >>> rdd.toDF(['fruit','num']).show()
> +--+---+
> | fruit|num|
> +--+---+
> | apple|  1|
> |orange|  2|
> +--+---+
>
> And in scala:
> scala> rdd.toDF("fruit","num").show()
> +--+---+
> | fruit|num|
> +--+---+
> | apple|  1|
> |orange|  2|
> +--+---+
>
> But I saw many code that use a case class for translation.
>
> scala> case class Fruit(fruit:String,num:Int)
> defined class Fruit
>
> scala> rdd.map{case (x,y) => Fruit(x,y) }.toDF().show()
> +--+---+
> | fruit|num|
> +--+---+
> | apple|  1|
> |orange|  2|
> +--+---+
>
>
> Do you know why to use a "case class" here?
>
> thanks.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


question on the different way of RDD to dataframe

2022-02-08 Thread capitnfrakass

Hello

I am converting some py code to scala.
This works in python:


rdd = sc.parallelize([('apple',1),('orange',2)])
rdd.toDF(['fruit','num']).show()

+--+---+
| fruit|num|
+--+---+
| apple|  1|
|orange|  2|
+--+---+

And in scala:
scala> rdd.toDF("fruit","num").show()
+--+---+
| fruit|num|
+--+---+
| apple|  1|
|orange|  2|
+--+---+

But I saw many code that use a case class for translation.

scala> case class Fruit(fruit:String,num:Int)
defined class Fruit

scala> rdd.map{case (x,y) => Fruit(x,y) }.toDF().show()
+--+---+
| fruit|num|
+--+---+
| apple|  1|
|orange|  2|
+--+---+


Do you know why to use a "case class" here?

thanks.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: add an auto_increment column

2022-02-08 Thread capitnfrakass

I have got the answer from Mich's answer. Thank you both.

frakass


On 08/02/2022 16:36, Gourav Sengupta wrote:

Hi,

so do you want to rank apple and tomato both as 2? Not quite clear on
the use case here though.

Regards,
Gourav Sengupta

On Tue, Feb 8, 2022 at 7:10 AM  wrote:


Hello Gourav

As you see here orderBy has already give the solution for "equal
amount":


df =




sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount'])



df.select("*").orderBy("amount",ascending=False).show()

+--+--+
| fruit|amount|
+--+--+
|cherry| 5|
| apple| 3|
|tomato| 3|
|orange| 2|
+--+--+

I want to add a column at the right whose name is "top" and the
value
auto_increment from 1 to N.

Thank you.

On 08/02/2022 13:52, Gourav Sengupta wrote:

Hi,

sorry once again, will try to understand the problem first :)

As we can clearly see that the initial responses were incorrectly
guessing the solution to be monotonically_increasing function

What if there are two fruits with equal amount? For any real life
application, can we understand what are trying to achieve by the
rankings?

Regards,
Gourav Sengupta

On Tue, Feb 8, 2022 at 4:22 AM ayan guha 

wrote:



For this req you can rank or dense rank.

On Tue, 8 Feb 2022 at 1:12 pm,  wrote:


Hello,

For this query:


df.select("*").orderBy("amount",ascending=False).show()

+--+--+
| fruit|amount|
+--+--+
|tomato| 9|
| apple| 6|
|cherry| 5|
|orange| 3|
+--+--+

I want to add a column "top", in which the value is: 1,2,3...
meaning
top1, top2, top3...

How can I do it?

Thanks.

On 07/02/2022 21:18, Gourav Sengupta wrote:

Hi,

can we understand the requirement first?

What is that you are trying to achieve by auto increment id? Do

you

just want different ID's for rows, or you may want to keep

track

of

the record count of a table as well, or do you want to do use

them for

surrogate keys?

If you are going to insert records multiple times in a table,

and

still have different values?

I think without knowing the requirements all the above

responses, like

everything else where solutions are reached before

understanding

the

problem, has high chances of being wrong.

Regards,
Gourav Sengupta

On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj



wrote:


Monotonically_increasing_id() will give the same functionality

On Mon, 7 Feb, 2022, 6:57 am ,  wrote:


For a dataframe object, how to add a column who is

auto_increment

like
mysql's behavior?

Thank you.















-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org










-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org

--
Best Regards,
Ayan Guha


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: add an auto_increment column

2022-02-08 Thread Gourav Sengupta
Hi,

so do you want to rank apple and tomato both as 2? Not quite clear on the
use case here though.

Regards,
Gourav Sengupta

On Tue, Feb 8, 2022 at 7:10 AM  wrote:

>
> Hello Gourav
>
>
> As you see here orderBy has already give the solution for "equal
> amount":
>
> >>> df =
> >>>
> sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount'])
>
> >>> df.select("*").orderBy("amount",ascending=False).show()
> +--+--+
> | fruit|amount|
> +--+--+
> |cherry| 5|
> | apple| 3|
> |tomato| 3|
> |orange| 2|
> +--+--+
>
>
> I want to add a column at the right whose name is "top" and the value
> auto_increment from 1 to N.
>
> Thank you.
>
>
>
> On 08/02/2022 13:52, Gourav Sengupta wrote:
> > Hi,
> >
> > sorry once again, will try to understand the problem first :)
> >
> > As we can clearly see that the initial responses were incorrectly
> > guessing the solution to be monotonically_increasing function
> >
> > What if there are two fruits with equal amount? For any real life
> > application, can we understand what are trying to achieve by the
> > rankings?
> >
> > Regards,
> > Gourav Sengupta
> >
> > On Tue, Feb 8, 2022 at 4:22 AM ayan guha  wrote:
> >
> >> For this req you can rank or dense rank.
> >>
> >> On Tue, 8 Feb 2022 at 1:12 pm,  wrote:
> >>
> >>> Hello,
> >>>
> >>> For this query:
> >>>
> >> df.select("*").orderBy("amount",ascending=False).show()
> >>> +--+--+
> >>> | fruit|amount|
> >>> +--+--+
> >>> |tomato| 9|
> >>> | apple| 6|
> >>> |cherry| 5|
> >>> |orange| 3|
> >>> +--+--+
> >>>
> >>> I want to add a column "top", in which the value is: 1,2,3...
> >>> meaning
> >>> top1, top2, top3...
> >>>
> >>> How can I do it?
> >>>
> >>> Thanks.
> >>>
> >>> On 07/02/2022 21:18, Gourav Sengupta wrote:
>  Hi,
> 
>  can we understand the requirement first?
> 
>  What is that you are trying to achieve by auto increment id? Do
> >>> you
>  just want different ID's for rows, or you may want to keep track
> >>> of
>  the record count of a table as well, or do you want to do use
> >>> them for
>  surrogate keys?
> 
>  If you are going to insert records multiple times in a table,
> >>> and
>  still have different values?
> 
>  I think without knowing the requirements all the above
> >>> responses, like
>  everything else where solutions are reached before understanding
> >>> the
>  problem, has high chances of being wrong.
> 
>  Regards,
>  Gourav Sengupta
> 
>  On Mon, Feb 7, 2022 at 2:21 AM Siva Samraj
> >>> 
>  wrote:
> 
> > Monotonically_increasing_id() will give the same functionality
> >
> > On Mon, 7 Feb, 2022, 6:57 am ,  wrote:
> >
> >> For a dataframe object, how to add a column who is
> >>> auto_increment
> >> like
> >> mysql's behavior?
> >>
> >> Thank you.
> >>
> >>
> >
> 
> >>>
> >>
> > -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
> >>>
> >>
> > -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> --
> >> Best Regards,
> >> Ayan Guha
>


Re: add an auto_increment column

2022-02-08 Thread Bitfox
Maybe col func is not even needed here. :)

>>> df.select(F.dense_rank().over(wOrder).alias("rank"),
"fruit","amount").show()

++--+--+

|rank| fruit|amount|

++--+--+

|   1|cherry| 5|

|   2| apple| 3|

|   2|tomato| 3|

|   3|orange| 2|

++--+--+




On Tue, Feb 8, 2022 at 3:50 PM Mich Talebzadeh 
wrote:

> simple either rank() or desnse_rank()
>
> >>> from pyspark.sql import functions as F
> >>> from pyspark.sql.functions import col
> >>> from pyspark.sql.window import Window
> >>> wOrder = Window().orderBy(df['amount'].desc())
> >>> df.select(F.rank().over(wOrder).alias("rank"), col('fruit'),
> col('amount')).show()
> ++--+--+
> |rank| fruit|amount|
> ++--+--+
> |   1|cherry| 5|
> |   2| apple| 3|
> |   2|tomato| 3|
> |   4|orange| 2|
> ++--+--+
>
> >>> df.select(F.dense_rank().over(wOrder).alias("rank"), col('fruit'),
> col('amount')).show()
> ++--+--+
> |rank| fruit|amount|
> ++--+--+
> |   1|cherry| 5|
> |   2| apple| 3|
> |   2|tomato| 3|
> |   3|orange| 2|
> ++--+--+
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 01:27,  wrote:
>
>> For a dataframe object, how to add a column who is auto_increment like
>> mysql's behavior?
>>
>> Thank you.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>