Created SPARK-16963 to cover this issue. Fred
On Thu, Aug 4, 2016 at 4:52 PM, Michael Armbrust <mich...@databricks.com> wrote: > Yeah, this API is in the private execution package because we are planning > to continue to iterate on it. Today, we will only ever go back one batch, > though that might change in the future if we do async checkpointing of > internal state. > > You are totally right that we should relay this info back to the source. > Opening a JIRA sounds like a good first step. > > On Thu, Aug 4, 2016 at 4:38 PM, Fred Reiss <freiss....@gmail.com> wrote: > >> Hi, >> >> I've been looking over the Source API in >> org.apache.spark.sql.execution.streaming, >> and I'm at a loss for how the current API can be implemented in a practical >> way. The API defines a single getBatch() method for fetching records from >> the source, with the following Scaladoc comments defining the semantics: >> >> >> */** * Returns the data that is between the offsets (*`*start*`*, *` >> *end*`*]. When *`*start*` *is *`*None*` >> >> *then * the batch should begin with the first available record. This >> method must always return the * same data for a particular *`*start*` *and >> *`*end*` >> *pair. */* >> * def *getBatch(start: Option[Offset], end: Offset): DataFrame >> >> If I read the semantics described here correctly, a Source is required to >> retain all past history for the stream that it backs. Further, a Source >> is also required to retain this data across restarts of the process where >> the Source is instantiated, even when the Source is restarted on a >> different machine. >> >> The current implementation of FileStreamSource follows my reading of the >> requirements above. FileStreamSource never deletes a file. >> >> I feel like this requirement for unbounded state retention must be a >> mistake or misunderstanding of some kind. The scheduler is internally >> maintaining a high water mark (StreamExecution.committedOffsets in >> StreamExecution.scala) of data that has been successfully processed. There >> must have been an intent to communicate that high water mark back to the >> Source so that the Source can clean up its state. Indeed, the DataBricks >> blog post from last week (https://databricks.com/blog/2 >> 016/07/28/structured-streaming-in-apache-spark.html) says that "Only a >> few minutes’ worth of data needs to be retained; Structured Streaming will >> maintain its own internal state after that." >> >> But the code checked into git and shipped with Spark 2.0 does not have an >> API call for the scheduler to tell a Source where the boundary of "only a >> few minutes' worth of data" lies. >> >> Is there a JIRA that I'm not aware of to change the Source API? If not, >> should we maybe open one? >> >> Fred >> > >