Hi,

given your dataset:

val df=Seq(
  (1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523, 
"M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12, 20230523, "M01"), (13, 20230523, 
"M02")
).toDF("Row", "Date", "Type")

the simplest you can get with column functions is this:

import org.apache.spark.sql.expressions.Window
val dateWindow=Window.partitionBy("Date").orderBy("Row")
val batchWindow=Window.partitionBy("Date", "batch").orderBy("Row")
df.withColumn("inc", when($"Type" =!=lag($"Type", 1).over(dateWindow), 
lit(1)).otherwise(lit(0)))
  .withColumn("batch", sum($"inc").over(dateWindow))
  .withColumn("count", when($"Type" ==="M01", 
lit(0)).otherwise(row_number().over(batchWindow)))
  .show

This creates:

+---+--------+----+---+-----+-----+
|Row|    Date|Type|inc|batch|count|
+---+--------+----+---+-----+-----+
|  1|20230523| M01|  0|    0|    0|
|  2|20230523| M01|  0|    0|    0|
|  3|20230523| M01|  0|    0|    0|
|  4|20230523| M02|  1|    1|    1|
|  5|20230523| M02|  0|    1|    2|
|  6|20230523| M02|  0|    1|    3|
|  7|20230523| M01|  1|    2|    0|
|  8|20230523| M01|  0|    2|    0|
|  9|20230523| M02|  1|    3|    1|
| 10|20230523| M02|  0|    3|    2|
| 11|20230523| M02|  0|    3|    3|
| 12|20230523| M01|  1|    4|    0|
| 13|20230523| M02|  1|    5|    1|
+---+--------+----+---+-----+-----+

Column "inc" is used to split the partition into batches of same consecutive types. Column "batch" gives rows of those batches a unique ids. For each of those batch, we can use row_number to create count, but for "M01" we set "count" to 0.

The query plan does not look too bad, no extra shuffle involved.


Raghavendra proposed to iterate the partitions. I presume that you partition by "Date" and order within partition by "Row", which puts multiple dates into one partition. Even if you have one date per partition, AQE might coalesce partitions into bigger ones. This can can get you into trouble when a date starts with "M02".

You could group your dataset by "Date" and process the individual sorted groups (requires Spark 3.4.0). This way, you still partition by "Date" but process only individual dates, as proposed by Raghavendra:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.encoders.RowEncoder val groups  
=df.groupByKey(_.getInt(1))
groups.flatMapSortedGroups($"Row") {case (_:Int, rows:Iterator[Row]) =>
  var count =0 rows.map {row =>
    if (row.getString(2) =="M01") {
      count =0 }else {
      count +=1 }
    Row.fromSeq(row.toSeq :+count)
  }
}(RowEncoder(df.schema.add("count", IntegerType))).show

Cheers,
Enrico


Am 23.05.23 um 20:13 schrieb Nipuna Shantha:
Hi all,

This is the sample set of data that I used for this task

image.png

My expected output is as below

image.png

My scenario is if Type is M01 the count should be 0 and if Type is M02 it should be incremented from 1 or 0 until the sequence of M02 is finished. Imagine this as a partition so row numbers cannot jumble. So can you guys suggest a method to this scenario. Also for your concern this dataset is really large; it has around 100000000 records and I am using spark with scala

Thank You,
Best Regards

<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.www.avast.com <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>

Reply via email to