[ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525137#comment-16525137
 ] 

Cody Koeninger commented on SPARK-18258:
----------------------------------------

[~Yohan123] This ticket is about giving implementors of addBatch access to 
actual offsets, not about what any particular implementation ends up doing with 
them.  A database sink is the use case I had in mind.  For a Kafka sink, it 
might be that putting additional information in the value was the only 
realistic option, but that's a use case specific concern.

If I misunderstood your question let me know.

> Sinks need access to offset representation
> ------------------------------------------
>
>                 Key: SPARK-18258
>                 URL: https://issues.apache.org/jira/browse/SPARK-18258
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>            Reporter: Cody Koeninger
>            Priority: Major
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for 
> this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
> OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using 
> committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back 
> to the correct source when restarting complicated multi-source jobs, but I 
> think it'd be sufficient.  Passing the string/json representation of the seq 
> instead of the seq itself would probably be sufficient as well, but the 
> convention of rendering a None as "-" in the json is maybe a little 
> idiosyncratic to parse, and the constant defining that is private.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to