[ 
https://issues.apache.org/jira/browse/SPARK-24647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-24647:
---------------------------------
    Fix Version/s:     (was: 2.4.0)

> Sink Should Return OffsetSeqs For ProgressReporting
> ---------------------------------------------------
>
>                 Key: SPARK-24647
>                 URL: https://issues.apache.org/jira/browse/SPARK-24647
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Vaclav Kosar
>            Priority: Major
>
> To be able to track data lineage for Structured Streaming (I intend to 
> implement this to Open Source Project Spline), the monitoring needs to be 
> able to not only to track where the data was read from but also where results 
> were written to. This could be to my knowledge best implemented using 
> monitoring {{StreamingQueryProgress}}. However currently batch data offsets 
> are not available on {{Sink}} interface. Implementing as proposed would also 
> bring symmetry to {{StreamingQueryProgress}} fields sources and sink.
>  
> *Similar Proposals*
> Made in following jiras. These would not be sufficient for lineage tracking.
>  * https://issues.apache.org/jira/browse/SPARK-18258
>  * https://issues.apache.org/jira/browse/SPARK-21313
>  
> *Current State*
>  * Method {{Sink#addBatch}} returns {{Unit}}.
>  * {{StreamingQueryProgress}} reports {{offsetSeq}} start and end using 
> {{sourceProgress}} value but {{sinkProgress}} only calls {{toString}} method.
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : 
> "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f"
>   }
> {code}
>  
>  
> *Proposed State*
>  * {{Sink#addBatch}} to return {{OffsetSeq}} or {{StreamProgress}} specifying 
> offsets of the written batch, e.g. Kafka does it by returning 
> {{RecordMetadata}} object from {{send}} method.
>  * {{StreamingQueryProgress}} incorporate {{sinkProgress}} in similar fashion 
> as {{sourceProgress}}.
>  
>  
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : 
> "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f",
>    "startOffset" : null,
>     "endOffset" { "sinkTopic": { "0": 333 }}
>   }
> {code}
>  
> *Implementation*
> * PR submitters: Likely will be me and [~wajda] as soon as the discussion 
> ends positively. 
>  * {{Sinks}}: Modify all sinks to conform a new interface or return dummy 
> values.
>  * {{ProgressReporter}}: Merge offsets from different batches properly, 
> similarly to how it is done for sources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to