Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Raghavendra Ganesh
Hi, What is the purpose for which you want to use repartition() .. to reduce the number of files in delta? Also note that there is an alternative option of using coalesce() instead of repartition(). -- Raghavendra On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong wrote: > Hi all on user@spark: >

Re: Incremental Value dependents on another column of Data frame Spark

2023-05-23 Thread Raghavendra Ganesh
Given, you are already stating the above can be imagined as a partition, I can think of mapPartitions iterator. val inputSchema = inputDf.schema val outputRdd = inputDf.rdd.mapPartitions(rows => new SomeClass(rows)) val outputDf = sparkSession.createDataFrame(outputRdd,

Re: Spark Aggregator with ARRAY input and ARRAY output

2023-04-23 Thread Raghavendra Ganesh
For simple array types setting encoder to ExpressionEncoder() should work. -- Raghavendra On Sun, Apr 23, 2023 at 9:20 PM Thomas Wang wrote: > Hi Spark Community, > > I'm trying to implement a custom Spark Aggregator (a subclass to > org.apache.spark.sql.expressions.Aggregator). Correct me if

Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Raghavendra Ganesh
you can groupBy(country). and use mapPartitions method in which you can iterate over all rows keeping 2 variables for maxPopulationSoFar and corresponding city. Then return the city with max population. I think as others suggested, it may be possible to use Bucketing, it would give a more friendly

Re: how to add a column for percent

2022-05-23 Thread Raghavendra Ganesh
withColumn takes a column as the second argument, not string. If you want formatting before show() you can use the round() function. -- Raghavendra On Mon, May 23, 2022 at 11:35 AM wilson wrote: > hello > > how to add a column for percent for the current row of counted data? > > scala> >

Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Raghavendra Ganesh
What is optimal depends on the context of the problem. Is the intent here to find the best solution for top n values with a group by ? Both the solutions look sub-optimal to me. Window function would be expensive as it needs an order by (which a top n solution shouldn't need). It would be best to

Re: how to classify column

2022-02-11 Thread Raghavendra Ganesh
You could use expr() function to achieve the same. .withColumn("newColumn",expr(s"case when score>3 then 'good' else 'bad' end")) -- Raghavendra On Fri, Feb 11, 2022 at 5:59 PM frakass wrote: > Hello > > I have a column whose value (Int type as score) is from 0 to 5. > I want to query that,

Re: Merge two dataframes

2021-05-12 Thread Raghavendra Ganesh
You can add an extra id column and perform an inner join. val df1_with_id = df1.withColumn("id", monotonically_increasing_id()) val df2_with_id = df2.withColumn("id", monotonically_increasing_id()) df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show() +-+-+

Re: How to Spawn Child Thread or Sub-jobs in a Spark Session

2020-12-04 Thread Raghavendra Ganesh
There should not be any need to explicitly make DF-2, DF-3 computation parallel. Spark generates execution plans and it can decide what can run in parallel (ideally you should see them running parallel in spark UI). You need to cache DF-1 if possible (either in memory/disk), otherwise computation

Re: Count distinct and driver memory

2020-10-19 Thread Raghavendra Ganesh
Spark provides multiple options for caching (including disk). Have you tried caching to disk ? -- Raghavendra On Mon, Oct 19, 2020 at 11:41 PM Lalwani, Jayesh wrote: > I was caching it because I didn't want to re-execute the DAG when I ran > the count query. If you have a spark application