[ 
https://issues.apache.org/jira/browse/SPARK-24647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585230#comment-16585230
 ] 

Apache Spark commented on SPARK-24647:
--------------------------------------

User 'vackosar' has created a pull request for this issue:
https://github.com/apache/spark/pull/22143

> Sink Should Return Writen Offsets For ProgressReporting
> -------------------------------------------------------
>
>                 Key: SPARK-24647
>                 URL: https://issues.apache.org/jira/browse/SPARK-24647
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Vaclav Kosar
>            Priority: Major
>
> To be able to track data lineage for Structured Streaming (I intend to 
> implement this to Open Source Project Spline), the monitoring needs to be 
> able to not only to track where the data was read from but also where results 
> were written to. This could be to my knowledge best implemented using 
> monitoring {{StreamingQueryProgress}}. However currently written data offsets 
> are not available on {{Sink}} or {{StreamWriter}} interface. Implementing as 
> proposed would also bring symmetry to {{StreamingQueryProgress}} fields 
> sources and sink.
>  
> *Similar Proposals*
> Made in following jiras. These would not be sufficient for lineage tracking.
>  * https://issues.apache.org/jira/browse/SPARK-18258
>  * https://issues.apache.org/jira/browse/SPARK-21313
>  
> *Current State*
>  * Method {{Sink#addBatch}} returns {{Unit}}.
>  * Object {{WriterCommitMessage}} does not carry any progress information 
> about comitted rows.
>  * {{StreamingQueryProgress}} reports {{offsetSeq}} start and end using 
> {{sourceProgress}} value but {{sinkProgress}} only calls {{toString}} method.
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : 
> "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f"
>   }
> {code}
>  
>  
> *Proposed State*
>  * Implement support only for v2 sinks as those are to use used in future.
>  * {{WriterCommitMessage}} to hold optional min and max offset information of 
> commited rows e.g. Kafka does it by returning {{RecordMetadata}} object from 
> {{send}} method.
>  * {{StreamingQueryProgress}} incorporate {{sinkProgress}} in similar fashion 
> as {{sourceProgress}}.
>  
>  
> {code:java}
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test-topic]]",
>     "startOffset" : null,
>     "endOffset" : { "test-topic" : { "0" : 5000 }},
>     "numInputRows" : 5000,
>     "processedRowsPerSecond" : 645.3278265358803
>   } ],
>   "sink" : {
>     "description" : 
> "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f",
>    "startOffset" : null,
>     "endOffset" { "sinkTopic": { "0": 333 }}
>   }
> {code}
>  
> *Implementation*
> * PR submitters: Me and [~wajda] as soon as prerequisite jira is merged.
>  * {{Sinks}}: Modify all v2 sinks to conform a new interface or return dummy 
> values.
>  * {{ProgressReporter}}: Merge offsets from different batches properly, 
> similarly to how it is done for sources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to