Hi,
given your dataset:
val df=Seq(
(1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523,
"M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12, 20230523, "M01"), (13, 20230523,
"M02")
).toDF("Row", "Date", "Type")
the simplest you can get with column functions is this:
import org.apache.spark.sql.expressions.Window
val dateWindow=Window.partitionBy("Date").orderBy("Row")
val batchWindow=Window.partitionBy("Date", "batch").orderBy("Row")
df.withColumn("inc", when($"Type" =!=lag($"Type", 1).over(dateWindow),
lit(1)).otherwise(lit(0)))
.withColumn("batch", sum($"inc").over(dateWindow))
.withColumn("count", when($"Type" ==="M01",
lit(0)).otherwise(row_number().over(batchWindow)))
.show
This creates:
+---+--------+----+---+-----+-----+
|Row| Date|Type|inc|batch|count|
+---+--------+----+---+-----+-----+
| 1|20230523| M01| 0| 0| 0|
| 2|20230523| M01| 0| 0| 0|
| 3|20230523| M01| 0| 0| 0|
| 4|20230523| M02| 1| 1| 1|
| 5|20230523| M02| 0| 1| 2|
| 6|20230523| M02| 0| 1| 3|
| 7|20230523| M01| 1| 2| 0|
| 8|20230523| M01| 0| 2| 0|
| 9|20230523| M02| 1| 3| 1|
| 10|20230523| M02| 0| 3| 2|
| 11|20230523| M02| 0| 3| 3|
| 12|20230523| M01| 1| 4| 0|
| 13|20230523| M02| 1| 5| 1|
+---+--------+----+---+-----+-----+
Column "inc" is used to split the partition into batches of same
consecutive types. Column "batch" gives rows of those batches a unique
ids. For each of those batch, we can use row_number to create count, but
for "M01" we set "count" to 0.
The query plan does not look too bad, no extra shuffle involved.
Raghavendra proposed to iterate the partitions. I presume that you
partition by "Date" and order within partition by "Row", which puts
multiple dates into one partition. Even if you have one date per
partition, AQE might coalesce partitions into bigger ones. This can can
get you into trouble when a date starts with "M02".
You could group your dataset by "Date" and process the individual sorted
groups (requires Spark 3.4.0). This way, you still partition by "Date"
but process only individual dates, as proposed by Raghavendra:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.encoders.RowEncoder val groups
=df.groupByKey(_.getInt(1))
groups.flatMapSortedGroups($"Row") {case (_:Int, rows:Iterator[Row]) =>
var count =0 rows.map {row =>
if (row.getString(2) =="M01") {
count =0 }else {
count +=1 }
Row.fromSeq(row.toSeq :+count)
}
}(RowEncoder(df.schema.add("count", IntegerType))).show
Cheers,
Enrico
Am 23.05.23 um 20:13 schrieb Nipuna Shantha:
Hi all,
This is the sample set of data that I used for this task
image.png
My expected output is as below
image.png
My scenario is if Type is M01 the count should be 0 and if Type is M02
it should be incremented from 1 or 0 until the sequence of M02 is
finished. Imagine this as a partition so row numbers cannot jumble. So
can you guys suggest a method to this scenario. Also for your concern
this dataset is really large; it has around 100000000 records and I am
using spark with scala
Thank You,
Best Regards
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
Virus-free.www.avast.com
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>