[ https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524921#comment-16524921 ]
Richard Yu commented on SPARK-18258: ------------------------------------ Just a question, I noticed that in {{KafkaSink}}'s particular implementation of {{addBatch}}, there is a preexisting schema which needs to be followed. More specifically, a {{ProducerRecord}} (provided by Kafka) will be sent to the producer with room only for the topic name, a key, and a value. However, there does not appear to be anyway that exists where we can also export the data involving the {{start}} and {{end}} {{OffsetSeq}}s to Kafka as well. So I am right to assume that the new data included is to be used for checkpointing purposes only? > Sinks need access to offset representation > ------------------------------------------ > > Key: SPARK-18258 > URL: https://issues.apache.org/jira/browse/SPARK-18258 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Reporter: Cody Koeninger > Priority: Major > > Transactional "exactly-once" semantics for output require storing an offset > identifier in the same transaction as results. > The Sink.addBatch method currently only has access to batchId and data, not > the actual offset representation. > I want to store the actual offsets, so that they are recoverable as long as > the results are and I'm not locked in to a particular streaming engine. > I could see this being accomplished by adding parameters to Sink.addBatch for > the starting and ending offsets (either the offsets themselves, or the > SPARK-17829 string/json representation). That would be an API change, but if > there's another way to map batch ids to offset representations without > changing the Sink api that would work as well. > I'm assuming we don't need the same level of access to offsets throughout a > job as e.g. the Kafka dstream gives, because Sinks are the main place that > should need them. > After SPARK-17829 is complete and offsets have a .json method, an api for > this ticket might look like > {code} > trait Sink { > def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: > OffsetSeq): Unit > {code} > where start and end were provided by StreamExecution.runBatch using > committedOffsets and availableOffsets. > I'm not 100% certain that the offsets in the seq could always be mapped back > to the correct source when restarting complicated multi-source jobs, but I > think it'd be sufficient. Passing the string/json representation of the seq > instead of the seq itself would probably be sufficient as well, but the > convention of rendering a None as "-" in the json is maybe a little > idiosyncratic to parse, and the constant defining that is private. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org