Opened an issue. https://issues.apache.org/jira/browse/SPARK-24144
Since it is a Major issue for us, I have marked it as Major issue. Feel free to change if that is not the case from Spark's perspective. On Tue, May 1, 2018 at 4:34 AM, Michael Armbrust <mich...@databricks.com> wrote: > Please open a JIRA then! > > On Fri, Apr 27, 2018 at 3:59 AM Hemant Bhanawat <hemant9...@gmail.com> > wrote: > >> I see. >> >> monotonically_increasing_id on streaming dataFrames will be really >> helpful to me and I believe to many more users. Adding this functionality >> in Spark would be efficient in terms of performance as compared to >> implementing this functionality inside the applications. >> >> Hemant >> >> On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust < >> mich...@databricks.com> wrote: >> >>> The basic tenet of structured streaming is that a query should return >>> the same answer in streaming or batch mode. We support sorting in complete >>> mode because we have all the data and can sort it correctly and return the >>> full answer. In update or append mode, sorting would only return a correct >>> answer if we could promise that records that sort lower are going to arrive >>> later (and we can't). Therefore, it is disallowed. >>> >>> If you are just looking for a unique, stable id and you are already >>> using kafka as the source, you could just combine the partition id and the >>> offset. The structured streaming connector to Kafka >>> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html> >>> exposes both of these in the schema of the streaming DataFrame. (similarly >>> for kinesis you can use the shard id and sequence number) >>> >>> If you need the IDs to be contiguous, then this is a somewhat >>> fundamentally hard problem. I think the best we could do is add support >>> for monotonically_increasing_id() in streaming dataframes. >>> >>> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <chaya...@gmail.com> >>> wrote: >>> >>>> Perhaps your use case fits to Apache Kafka better. >>>> >>>> More info at: >>>> https://kafka.apache.org/documentation/streams/ >>>> >>>> Everything really comes down to the architecture design and algorithm >>>> spec. However, from my experience with Spark, there are many good reasons >>>> why this requirement is not supported ;) >>>> >>>> Best, >>>> >>>> Chayapan (A) >>>> >>>> >>>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <hemant9...@gmail.com> >>>> wrote: >>>> >>>> Thanks Chris. There are many ways in which I can solve this problem but >>>> they are cumbersome. The easiest way would have been to sort the streaming >>>> dataframe. The reason I asked this question is because I could not find a >>>> reason why sorting on streaming dataframe is disallowed. >>>> >>>> Hemant >>>> >>>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris < >>>> chris.bow...@microfocus.com> wrote: >>>> >>>>> You can happily sort the underlying RDD of InternalRow(s) inside a >>>>> sink, assuming you are willing to implement and maintain your own sink(s). >>>>> That is, just grabbing the parquet sink, etc. isn’t going to work out of >>>>> the box. Alternatively map/flatMapGroupsWithState is probably sufficient >>>>> and requires less working knowledge to make effective reuse of internals. >>>>> Just group by foo and then sort accordingly and assign ids. The id counter >>>>> can be stateful per group. Sometimes this problem may not need to be >>>>> solved >>>>> at all. For example, if you are using kafka, a proper partitioning scheme >>>>> and message offsets may be “good enough”. >>>>> ------------------------------ >>>>> *From:* Hemant Bhanawat <hemant9...@gmail.com> >>>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM >>>>> *To:* Reynold Xin >>>>> *Cc:* dev >>>>> *Subject:* Re: Sorting on a streaming dataframe >>>>> >>>>> Well, we want to assign snapshot ids (incrementing counters) to the >>>>> incoming records. For that, we are zipping the streaming rdds with that >>>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the >>>>> records in the streaming dataframe gets counters in random order but the >>>>> counter should always be incrementing. >>>>> >>>>> This is working fine until we have a failure. When we have a failure, >>>>> we re-assign the records to snapshot ids and this time same snapshot id >>>>> can get assigned to a different record. This is a problem because the >>>>> primary key in our storage engine is <recordid, snapshotid>. So we want to >>>>> sort the dataframe so that the records always get the same snapshot id. >>>>> >>>>> >>>>> >>>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <r...@databricks.com> >>>>> wrote: >>>>> >>>>> Can you describe your use case more? >>>>> >>>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9...@gmail.com> >>>>> wrote: >>>>> >>>>> Hi Guys, >>>>> >>>>> Why is sorting on streaming dataframes not supported(unless it is >>>>> complete mode)? My downstream needs me to sort the streaming dataframe. >>>>> >>>>> Hemant >>>>> >>>>> >>>>> >>>> >>>> >>> >>