[ https://issues.apache.org/jira/browse/SPARK-28641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gabor Somogyi closed SPARK-28641. --------------------------------- > MicroBatchExecution committed offsets greater than available offsets > -------------------------------------------------------------------- > > Key: SPARK-28641 > URL: https://issues.apache.org/jira/browse/SPARK-28641 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 2.3.1 > Environment: HDP --> 3.0.0 > Spark --> 2.3.1 > Kafka --> 2.1.1 > Reporter: MariaCarrie > Priority: Major > Labels: MicroBatchExecution, dataAvailable > Original Estimate: 48h > Remaining Estimate: 48h > > I use structure-streaming to consume Kafka data, Trigger Type is default and > checkpoint is enabled, but looking at the log, I find the structure-streaming > data before processing, the application log is as follows: > > {code} > 19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = > Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}), > end = > \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}} > 19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map() > 19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's > offset was changed from 13978260 to 9053058, some data may have been missed.^ > Some data may have been lost because they are not available in Kafka any > more; either the > data was aged out by Kafka or the topic may have been deleted before all the > data in the > topic was processed. If you want your streaming query to fail on such cases, > set the source > option "failOnDataLoss" to "true". > {code} > > I see that when you get the {{latestOffsets}} they are compared with the > {{committedOffsets}} to see if they are {{newData}}. > > {code} > private def dataAvailable: Boolean = { > availableOffsets.exists { > case (source, available) => > committedOffsets.get(source).map(committed => committed != > available).getOrElse(true) > } > } > {code} > > I think it is Kafka appeared what problem, cause the {{fetchLatestOffsets}} > methods returned {{earliestOffsets}}. However, the data was successfully > processed and committed. Whether or not it can be determined in the > {{dataAvailable}} method, if {{availableOffsets}} has been committed, the > batch will no longer be marked as newData. > I don't know what I think is correct, if continue processing > {{earliestOffsets}}, then the structured-streaming can't timely > corresponding, I'm glad to receive any suggestion! > -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org