Re: add an auto_increment column

2022-02-07 Thread Mich Talebzadeh
simple either rank() or desnse_rank() >>> from pyspark.sql import functions as F >>> from pyspark.sql.functions import col >>> from pyspark.sql.window import Window >>> wOrder = Window().orderBy(df['amount'].desc()) >>> df.select(F.rank().over(wOrder).alias("rank"), col('fruit'),

Re: add an auto_increment column

2022-02-07 Thread Stelios Philippou
https://stackoverflow.com/a/51854022/299676 On Tue, 8 Feb 2022 at 09:25, Stelios Philippou wrote: > This has the information that you require in order to add an extra column > with a sequence to it. > > > On Tue, 8 Feb 2022 at 09:11, wrote: > >> >> Hello Gourav >> >> >> As you see here orderBy

Re: add an auto_increment column

2022-02-07 Thread Stelios Philippou
This has the information that you require in order to add an extra column with a sequence to it. On Tue, 8 Feb 2022 at 09:11, wrote: > > Hello Gourav > > > As you see here orderBy has already give the solution for "equal > amount": > > >>> df = > >>> >

Re: add an auto_increment column

2022-02-07 Thread capitnfrakass
Hello Gourav As you see here orderBy has already give the solution for "equal amount": df = sc.parallelize([("orange",2),("apple",3),("tomato",3),("cherry",5)]).toDF(['fruit','amount']) df.select("*").orderBy("amount",ascending=False).show() +--+--+ | fruit|amount|

Re: add an auto_increment column

2022-02-07 Thread Gourav Sengupta
Hi, sorry once again, will try to understand the problem first :) As we can clearly see that the initial responses were incorrectly guessing the solution to be monotonically_increasing function What if there are two fruits with equal amount? For any real life application, can we understand what

Re: add an auto_increment column

2022-02-07 Thread ayan guha
For this req you can rank or dense rank. On Tue, 8 Feb 2022 at 1:12 pm, wrote: > Hello, > > For this query: > > >>> df.select("*").orderBy("amount",ascending=False).show() > +--+--+ > | fruit|amount| > +--+--+ > |tomato| 9| > | apple| 6| > |cherry| 5| > |orange|

Re: add an auto_increment column

2022-02-07 Thread capitnfrakass
Hello, For this query: df.select("*").orderBy("amount",ascending=False).show() +--+--+ | fruit|amount| +--+--+ |tomato| 9| | apple| 6| |cherry| 5| |orange| 3| +--+--+ I want to add a column "top", in which the value is: 1,2,3... meaning top1, top2,

Re: StructuredStreaming - foreach/foreachBatch

2022-02-07 Thread karan alang
Thanks, Mich .. that worked fine! On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh wrote: > read below > > """ >"foreach" performs custom write logic on each row and > "foreachBatch" performs custom write logic on each micro-batch through > SendToBigQuery function >

Re: StructuredStreaming - foreach/foreachBatch

2022-02-07 Thread Mich Talebzadeh
read below """ "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function *foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or

StructuredStreaming - foreach/foreachBatch

2022-02-07 Thread karan alang
Hello All, I'm using StructuredStreaming to read data from Kafka, and need to do transformation on each individual row. I'm trying to use 'foreach' (or foreachBatch), and running into issues. Basic question - how is the row passed to the function when foreach is used ? Also, when I use

Re: Spark 3.1.2 full thread dumps

2022-02-07 Thread Lalwani, Jayesh
Probably not the answer you are looking for, but the best thing to do is to avoid making Spark code sleep. Is there a way you can predict how big your autoscaling group needs to be without looking at all the data? Are you using fixed number of Spark executors or are you have some way of scaling

Re: add an auto_increment column

2022-02-07 Thread Gourav Sengupta
Hi, can we understand the requirement first? What is that you are trying to achieve by auto increment id? Do you just want different ID's for rows, or you may want to keep track of the record count of a table as well, or do you want to do use them for surrogate keys? If you are going to insert

Re: TypeError: Can not infer schema for type:

2022-02-07 Thread Mich Talebzadeh
Absolutely The reason this error happens is that an rdd is a one dimensional data structure whilst a data frame has to be 2 dimensional, i.e. we have a List[Integer] but we need List[Tuple[Integer]]. Try this >>> rdd = sc.parallelize([3,2,1,4]) >>> df = rdd.map(lambda x: (x,)).toDF() >>>

foreachRDD question

2022-02-07 Thread Bitfox
Hello list, for the code in the link: https://github.com/apache/spark/blob/v3.2.1/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala I am not sure, why enclose the RDD to Dataframe logic in a foreachRDD block? What's the use of foreachRDD? Thanks in advance.