[ 
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-17937:
-------------------------------------
    Issue Type: Improvement  (was: Sub-task)
        Parent:     (was: SPARK-15406)

> Clarify Kafka offset semantics for Structured Streaming
> -------------------------------------------------------
>
>                 Key: SPARK-17937
>                 URL: https://issues.apache.org/jira/browse/SPARK-17937
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>            Reporter: Cody Koeninger
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost).   It's possible to separate 
> this into offset too small and offset too large, but I'm not sure it matters 
> for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*.  Currently unsupported.  This means users who want to 
> migrate from existing kafka jobs need to jump through hoops.  Even if we 
> never want to support it, as soon as we take on SPARK-17815 we need to make 
> sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*.  Currently unsupported.  This could be supported with old, 
> inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position.  Currently 
> unsupported.  I think the semantics of this are super unclear by comparison 
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per 
> topicpartition  (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies 
> *Earliest* above)  In general, I see no reason this couldn't specify Latest 
> as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that.  If 
> startingOffsets is *User specified* perTopicpartition, and the new partition 
> isn't in the map, *Fail*.  Note that this is effectively undistinguishable 
> from new parititon during query, because partitions may have changed in 
> between pre-query configuration and query start, but we treat it differently, 
> and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this 
> case yet.  Could use the value of failOnDataLoss, but it's possible people 
> may want to know at startup that something was wrong, even if they're ok with 
> earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or 
> *Earliest*, based on failOnDataLoss.  but it looks like this setting is 
> currently ignored, and the executor will just fail...
> # During query
> #* New partition:  *Earliest*, only.  This seems to be by fiat, I see no 
> reason this can't be configurable.
> #* Offset out of range on driver:  this _probably_ doesn't happen, because 
> we're doing explicit seeks to the latest position
> #* Offset out of range on executor:  ?
> # At query restart 
> #* New partition: *Checkpoint*, fall back to *Earliest*.  Again, no reason 
> this couldn't be configurable fall back to Latest
> #* Offset out of range on driver:   this _probably_ doesn't happen, because 
> we're doing explicit seeks to the specified position
> #* Offset out of range on executor:  ?
> I've probably missed something, chime in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to