Given, you are already stating the above can be imagined as a partition, I
can think of mapPartitions iterator.
val inputSchema = inputDf.schema
val outputRdd = inputDf.rdd.mapPartitions(rows => new SomeClass(rows))
val outputDf = sparkSession.createDataFrame(outputRdd,
inputSchema.add("counter", IntegerType))
}
class SomeClass(rows: Iterator[Row]) extends Iterator[Row] {
var counter: Int = 0
override def hasNext: Boolean = rows.hasNext
override def next(): Row = {
val row = rows.next()
val rowType:String = row.getAs[String]("Type")
if(rowType == "M01")
counter = 0
else
counter += 1
Row.fromSeq(row.toSeq ++ Seq(counter))
}
}
--
Raghavendra
On Tue, May 23, 2023 at 11:44 PM Nipuna Shantha <[email protected]>
wrote:
> Hi all,
>
> This is the sample set of data that I used for this task
>
> [image: image.png]
>
> My expected output is as below
>
> [image: image.png]
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02 it
> should be incremented from 1 or 0 until the sequence of M02 is finished.
> Imagine this as a partition so row numbers cannot jumble. So can you guys
> suggest a method to this scenario. Also for your concern this dataset is
> really large; it has around 100000000 records and I am using spark with
> scala
>
> Thank You,
> Best Regards
>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> Virus-free.www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> <#m_4627475067266622656_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>