I have data streaming into my spark scala application in this format id mark1 mark2 mark3 time uuid1 100 200 300 Tue Aug 8 14:06:02 PDT 2017 uuid1 100 200 300 Tue Aug 8 14:06:22 PDT 2017 uuid2 150 250 350 Tue Aug 8 14:06:32 PDT 2017 uuid2 150 250 350 Tue Aug 8 14:06:52 PDT 2017 uuid2 150 250 350 Tue Aug 8 14:06:58 PDT 2017
I have it read into columns id, mark1, mark2, mark3 and time. The time is converted to datetime format as well. I want to get this grouped by id and get the lag for mark1 which gives the previous row's mark1 value. Something like this: id mark1 mark2 mark3 prev_mark time uuid1 100 200 300 null Tue Aug 8 14:06:02 PDT 2017 uuid1 100 200 300 100 Tue Aug 8 14:06:22 PDT 2017 uuid2 150 250 350 null Tue Aug 8 14:06:32 PDT 2017 uuid2 150 250 350 150 Tue Aug 8 14:06:52 PDT 2017 uuid2 150 250 350 150 Tue Aug 8 14:06:58 PDT 2017 Consider the dataframe to be markDF. I have tried: val window = Window.partitionBy("uuid").orderBy("timestamp") val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window)) which says non time windows cannot be applied on streaming/appending datasets/frames. I have also tried: val window = Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10) val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window)) To get a window for few rows which did not work either. The streaming window something like: window("timestamp", "10 minutes") cannot be used to send over the lag. I am super confused on how to do this. Any help would be awesome!!