[ https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-17937: ------------------------------------- Issue Type: Improvement (was: Sub-task) Parent: (was: SPARK-15406) > Clarify Kafka offset semantics for Structured Streaming > ------------------------------------------------------- > > Key: SPARK-17937 > URL: https://issues.apache.org/jira/browse/SPARK-17937 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Reporter: Cody Koeninger > > Possible events for which offsets are needed: > # New partition is discovered > # Offset out of range (aka, data has been lost). It's possible to separate > this into offset too small and offset too large, but I'm not sure it matters > for us. > Possible sources of offsets: > # *Earliest* position in log > # *Latest* position in log > # *Fail* and kill the query > # *Checkpoint* position > # *User specified* per topicpartition > # *Kafka commit log*. Currently unsupported. This means users who want to > migrate from existing kafka jobs need to jump through hoops. Even if we > never want to support it, as soon as we take on SPARK-17815 we need to make > sure Kafka commit log state is clearly documented and handled. > # *Timestamp*. Currently unsupported. This could be supported with old, > inaccurate Kafka time api, or upcoming time index > # *X offsets* before or after latest / earliest position. Currently > unsupported. I think the semantics of this are super unclear by comparison > with timestamp, given that Kafka doesn't have a single range of offsets. > Currently allowed pre-query configuration, all "ORs" are exclusive: > # startingOffsets: *earliest* OR *latest* OR *User specified* json per > topicpartition (SPARK-17812) > # failOnDataLoss: true (which implies *Fail* above) OR false (which implies > *Earliest* above) In general, I see no reason this couldn't specify Latest > as an option. > Possible lifecycle times in which an offset-related event may happen: > # At initial query start > #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If > startingOffsets is *User specified* perTopicpartition, and the new partition > isn't in the map, *Fail*. Note that this is effectively undistinguishable > from new parititon during query, because partitions may have changed in > between pre-query configuration and query start, but we treat it differently, > and users in this case are SOL > #* Offset out of range on driver: We don't technically have behavior for this > case yet. Could use the value of failOnDataLoss, but it's possible people > may want to know at startup that something was wrong, even if they're ok with > earliest for a during-query out of range > #* Offset out of range on executor: seems like it should be *Fail* or > *Earliest*, based on failOnDataLoss. but it looks like this setting is > currently ignored, and the executor will just fail... > # During query > #* New partition: *Earliest*, only. This seems to be by fiat, I see no > reason this can't be configurable. > #* Offset out of range on driver: this _probably_ doesn't happen, because > we're doing explicit seeks to the latest position > #* Offset out of range on executor: ? > # At query restart > #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason > this couldn't be configurable fall back to Latest > #* Offset out of range on driver: this _probably_ doesn't happen, because > we're doing explicit seeks to the specified position > #* Offset out of range on executor: ? > I've probably missed something, chime in. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org