[ 
https://issues.apache.org/jira/browse/SPARK-28738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16907642#comment-16907642
 ] 

Joseph Cooper edited comment on SPARK-28738 at 8/14/19 9:42 PM:
----------------------------------------------------------------

I think I see why commitAsync might not support this. During the polling loop 
for offset commits if a higher offset is encountered, a lesser one might get 
skipped and the metadata for that one wont get committed.

 

At least I think that is the spirit of the commitAll function, but it doesn't 
seem to make sense. Once that map is full, wont there be no more polling of the 
queue?

[https://github.com/apache/spark/blob/017919b636fd3ce43ccf5ec57f1c1842aa2130db/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala#L303]

 

Maybe Im missing something


was (Author: jrciii):
I think I see why commitAsync might not support this. During the polling loop 
for offset commits if a higher offset is encountered, a lesser one might get 
skipped and the metadata for that one wont get committed.

> Add ability to include metadata in CanCommitOffsets API
> -------------------------------------------------------
>
>                 Key: SPARK-28738
>                 URL: https://issues.apache.org/jira/browse/SPARK-28738
>             Project: Spark
>          Issue Type: New Feature
>          Components: DStreams
>    Affects Versions: 2.4.4
>            Reporter: Joseph Cooper
>            Priority: Major
>
> It is possible to commit metadata with an offset to Kafka. Currently, the 
> CanCommitOffsets API does not expose this functionality. See 
> [https://github.com/apache/spark/blob/017919b636fd3ce43ccf5ec57f1c1842aa2130db/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala#L300]
>  
> We could make the commit queue take (OffsetRange, String) instead of just 
> OffsetRange and copy the two existing commitAsync functions and make them 
> take Array[(OffsetRange, String)].



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to