I guess sorting would make sense only when you have the complete data set. In 
streaming you don’t know what record is coming next so doesn’t make sense to 
sort it (except in the aggregated complete output mode where the entire result 
table is emitted each time and the results can be sorted).


From:  Hemant Bhanawat <hemant9...@gmail.com>
Date:  Tuesday, April 24, 2018 at 12:18 AM
To:  "Bowden, Chris" <chris.bow...@microfocus.com>
Cc:  Reynold Xin <r...@databricks.com>, dev <dev@spark.apache.org>
Subject:  Re: Sorting on a streaming dataframe

Thanks Chris. There are many ways in which I can solve this problem but they 
are cumbersome. The easiest way would have been to sort the streaming 
dataframe. The reason I asked this question is because I could not find a 
reason why sorting on streaming dataframe is disallowed. 


On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <chris.bow...@microfocus.com> 
You can happily sort the underlying RDD of InternalRow(s) inside a sink, 
assuming you are willing to implement and maintain your own sink(s). That is, 
just grabbing the parquet sink, etc. isn’t going to work out of the box. 
Alternatively map/flatMapGroupsWithState is probably sufficient and requires 
less working knowledge to make effective reuse of internals. Just group by foo 
and then sort accordingly and assign ids. The id counter can be stateful per 
group. Sometimes this problem may not need to be solved at all. For example, if 
you are using kafka, a proper partitioning scheme and message offsets may be 
“good enough”. From: Hemant Bhanawat <hemant9...@gmail.com>
Sent: Thursday, April 12, 2018 11:42:59 PM
To: Reynold Xin
Cc: dev
Subject: Re: Sorting on a streaming dataframe
Well, we want to assign snapshot ids (incrementing counters) to the incoming 
records. For that, we are zipping the streaming rdds with that counter using a 
modified version of ZippedWithIndexRDD. We are ok if the records in the 
streaming dataframe gets counters in random order but the counter should always 
be incrementing. 

This is working fine until we have a failure. When we have a failure, we 
re-assign the records to snapshot ids  and this time same snapshot id can get 
assigned to a different record. This is a problem because the primary key in 
our storage engine is <recordid, snapshotid>. So we want to sort the dataframe 
so that the records always get the same snapshot id. 

On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <r...@databricks.com> wrote:
Can you describe your use case more?

On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9...@gmail.com> wrote:
Hi Guys, 

Why is sorting on streaming dataframes not supported(unless it is complete 
mode)? My downstream needs me to sort the streaming dataframe.


Reply via email to