Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
That makes a lot of sense now! I am looking for Tumbling window so my window interval and batch interval is 24 hours. Every day I want to start with a fresh state. Finally, Since you said I need to do book keeping of "last 24 hours" ? Do you mean I need to do this some external store and then

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread Tathagata Das
The per-key state S is kept in the memory. It has to be of a type that can be encoded by Datasets. All you have to do is update S every time the function is called, and the engine takes care of serializing/checkpointing the state value, and retrieving the correct version of the value when

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
I think I understand *groupByKey/**mapGroupsWithState *and I am still trying to wrap my head around *GroupState*. so, I believe I have a naive questions to ask on *GroupState*. If I were to represent a state that has history of events (say 24 hours) and say the number of events can be big for a

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
Hi TD, Thanks for the explanation and for the clear pseudo code and an example! mapGroupsWithState is cool and looks very flexible however I have few concerns and questions. For example Say I store TrainHistory as max heap from the Java Collections library and I keep adding to to this heap for

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Aah, I might have misinterpreted. The groupBy + window solution would give the max time for each train over 24 hours (non-overlapping window) of event data (timestamped by activity_timestamp). So the output would be like. Train Dest Window(activity_timestamp)max(Time) 1 HK

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Can I do sub queries using DataSet or DataFrame API's but not raw sql unless it is a final resort? On Tue, Aug 29, 2017 at 8:47 PM, ayan guha wrote: > This is not correct. In SQL Land, your query should be like below: > > select * from ( > select Train,DEST,TIME,

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread ayan guha
This is not correct. In SQL Land, your query should be like below: select * from ( select Train,DEST,TIME, row_number() over (partition by train order by time desc) r from TrainTable ) x where r=1 All the constructs supported in dataframe functions. On Wed, Aug 30, 2017 at 1:08 PM, kant kodali

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
yes in a relational db one could just do this SELECT t.Train, t.Dest, r.MaxTimeFROM ( SELECT Train, MAX(Time) as MaxTime FROM TrainTable GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND t.Time = r.MaxTime (copied answer from a Stack overflow question someone

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
That just gives you the max time for each train. If I understood the question correctly, OP wants the whole row with the max time. That's generally solved through joins or subqueries, which would be hard to do in a streaming setting On Aug 29, 2017 7:29 PM, "ayan guha"

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread ayan guha
Why removing the destination from the window wont work? Like this: *trainTimesDataset* * .withWatermark("**activity_timestamp", "5 days")* * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")* * .max("time")* On Wed, Aug 30, 2017 at 10:38 AM, kant kodali

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
@Burak so how would the transformation or query would look like for the above example? I don't see flatMapGroupsWithState in the DataSet API Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier. On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz wrote: > Hey

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
Hey TD, If I understood the question correctly, your solution wouldn't return the exact solution, since it also groups by on destination. I would say the easiest solution would be to use flatMapGroupsWithState, where you: .groupByKey(_.train) and keep in state the row with the maximum time. On

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Yes. And in that case, if you just care about only the last few days of max, then you should set watermark on the timestamp column. *trainTimesDataset* * .withWatermark("**activity_timestamp", "5 days")* * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest")* *

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Hi, Thanks for the response. Since this is a streaming based query and in my case I need to hold state for 24 hours which I forgot to mention in my previous email. can I do ? *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest").max("time")* On Tue,

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int, dest: String, time: Timestamp] * *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* *SQL*: *"select train, dest, max(time) from trainTimesView group by train, dest"*// after calling

How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Hi All, I am wondering what is the easiest and concise way to express the computation below in Spark Structured streaming given that it supports both imperative and declarative styles? I am just trying to select rows that has max timestamp for each train? Instead of doing some sort of nested