Re: Incremental Value dependents on another column of Data frame Spark

2023-05-24 Thread Enrico Minack

Hi,

given your dataset:

val df=Seq(
  (1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523, 
"M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12, 20230523, "M01"), (13, 20230523, 
"M02")
).toDF("Row", "Date", "Type")

the simplest you can get with column functions is this:

import org.apache.spark.sql.expressions.Window
val dateWindow=Window.partitionBy("Date").orderBy("Row")
val batchWindow=Window.partitionBy("Date", "batch").orderBy("Row")
df.withColumn("inc", when($"Type" =!=lag($"Type", 1).over(dateWindow), 
lit(1)).otherwise(lit(0)))
  .withColumn("batch", sum($"inc").over(dateWindow))
  .withColumn("count", when($"Type" ==="M01", 
lit(0)).otherwise(row_number().over(batchWindow)))
  .show

This creates:

+---+++---+-+-+
|Row|Date|Type|inc|batch|count|
+---+++---+-+-+
|  1|20230523| M01|  0|0|0|
|  2|20230523| M01|  0|0|0|
|  3|20230523| M01|  0|0|0|
|  4|20230523| M02|  1|1|1|
|  5|20230523| M02|  0|1|2|
|  6|20230523| M02|  0|1|3|
|  7|20230523| M01|  1|2|0|
|  8|20230523| M01|  0|2|0|
|  9|20230523| M02|  1|3|1|
| 10|20230523| M02|  0|3|2|
| 11|20230523| M02|  0|3|3|
| 12|20230523| M01|  1|4|0|
| 13|20230523| M02|  1|5|1|
+---+++---+-+-+

Column "inc" is used to split the partition into batches of same 
consecutive types. Column "batch" gives rows of those batches a unique 
ids. For each of those batch, we can use row_number to create count, but 
for "M01" we set "count" to 0.


The query plan does not look too bad, no extra shuffle involved.


Raghavendra proposed to iterate the partitions. I presume that you 
partition by "Date" and order within partition by "Row", which puts 
multiple dates into one partition. Even if you have one date per 
partition, AQE might coalesce partitions into bigger ones. This can can 
get you into trouble when a date starts with "M02".


You could group your dataset by "Date" and process the individual sorted 
groups (requires Spark 3.4.0). This way, you still partition by "Date" 
but process only individual dates, as proposed by Raghavendra:


import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.encoders.RowEncoder val groups  
=df.groupByKey(_.getInt(1))
groups.flatMapSortedGroups($"Row") {case (_:Int, rows:Iterator[Row]) =>
  var count =0 rows.map {row =>
if (row.getString(2) =="M01") {
  count =0 }else {
  count +=1 }
Row.fromSeq(row.toSeq :+count)
  }
}(RowEncoder(df.schema.add("count", IntegerType))).show

Cheers,
Enrico


Am 23.05.23 um 20:13 schrieb Nipuna Shantha:

Hi all,

This is the sample set of data that I used for this task

image.png

My expected output is as below

image.png

My scenario is if Type is M01 the count should be 0 and if Type is M02 
it should be incremented from 1 or 0 until the sequence of M02 is 
finished. Imagine this as a partition so row numbers cannot jumble. So 
can you guys suggest a method to this scenario. Also for your concern 
this dataset is really large; it has around 1 records and I am 
using spark with scala


Thank You,
Best Regards

 
	Virus-free.www.avast.com 
 





Re: Incremental Value dependents on another column of Data frame Spark

2023-05-23 Thread Raghavendra Ganesh
Given, you are already stating the above can be imagined as a partition, I
can think of mapPartitions iterator.

  val inputSchema = inputDf.schema
  val outputRdd = inputDf.rdd.mapPartitions(rows => new SomeClass(rows))
  val outputDf = sparkSession.createDataFrame(outputRdd,
inputSchema.add("counter", IntegerType))
}

class SomeClass(rows: Iterator[Row]) extends Iterator[Row] {
  var counter: Int = 0
  override def hasNext: Boolean = rows.hasNext

  override def next(): Row = {
val row = rows.next()
val rowType:String = row.getAs[String]("Type")
if(rowType == "M01")
  counter = 0
else
  counter += 1
Row.fromSeq(row.toSeq ++ Seq(counter))
  }
}

--
Raghavendra


On Tue, May 23, 2023 at 11:44 PM Nipuna Shantha 
wrote:

> Hi all,
>
> This is the sample set of data that I used for this task
>
> [image: image.png]
>
> My expected output is as below
>
> [image: image.png]
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02 it
> should be incremented from 1 or 0 until the sequence of M02 is finished.
> Imagine this as a partition so row numbers cannot jumble. So can you guys
> suggest a method to this scenario. Also for your concern this dataset is
> really large; it has around 1 records and I am using spark with
> scala
>
> Thank You,
> Best Regards
>
>
> 
> Virus-free.www.avast.com
> 
> <#m_4627475067266622656_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>


Incremental Value dependents on another column of Data frame Spark

2023-05-23 Thread Nipuna Shantha
Hi all,

This is the sample set of data that I used for this task

[image: image.png]

My expected output is as below

[image: image.png]

My scenario is if Type is M01 the count should be 0 and if Type is M02 it
should be incremented from 1 or 0 until the sequence of M02 is finished.
Imagine this as a partition so row numbers cannot jumble. So can you guys
suggest a method to this scenario. Also for your concern this dataset is
really large; it has around 1 records and I am using spark with
scala

Thank You,
Best Regards


Virus-free.www.avast.com

<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>