[ https://issues.apache.org/jira/browse/SPARK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Abhinav Choudhury resolved SPARK-29639. --------------------------------------- Resolution: Abandoned Resolved because of lack of enough info and steps to reproduce > Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch > ---------------------------------------------------------------------------- > > Key: SPARK-29639 > URL: https://issues.apache.org/jira/browse/SPARK-29639 > Project: Spark > Issue Type: Bug > Components: Input/Output, Structured Streaming > Affects Versions: 2.4.0 > Reporter: Abhinav Choudhury > Priority: Major > > We have been running a Spark structured job on production for more than a > week now. Put simply, it reads data from source Kafka topics (with 4 > partitions) and writes to another kafka topic. Everything has been running > fine until the job started failing with the following error: > > {noformat} > Driver stacktrace: > === Streaming Query === > Identifier: MetricComputer [id = af26bab1-0d89-4766-934e-ad5752d6bb08, runId > = 613a21ad-86e3-4781-891b-17d92c18954a] > Current Committed Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: > {"kafka-topic-name": > {"2":10458347,"1":10460151,"3":10475678,"0":9809564} > }} > Current Available Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: > {"kafka-topic-name": > {"2":10458347,"1":10460151,"3":10475678,"0":10509527} > }} > Current State: ACTIVE > Thread State: RUNNABLE > <-- Removed Logical plan --> > Some data may have been lost because they are not available in Kafka any > more; either the > data was aged out by Kafka or the topic may have been deleted before all the > data in the > topic was processed. If you don't want your streaming query to fail on such > cases, set the > source option "failOnDataLoss" to "false".{noformat} > Configuration: > {noformat} > Spark 2.4.0 > Spark-sql-kafka 0.10{noformat} > Looking at the Spark structured streaming query progress logs, it seems like > the endOffsets computed for the next batch was actually smaller than the > starting offset: > *Microbatch Trigger 1:* > {noformat} > 2019/10/26 23:53:51 INFO utils.Logging[26]: 2019-10-26 23:53:51.767 : ( : > Query { > "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b", > "runId" : "2d20d633-2768-446c-845b-893243361422", > "name" : "StreamingProcessorName", > "timestamp" : "2019-10-26T23:53:51.741Z", > "batchId" : 2145898, > "numInputRows" : 0, > "inputRowsPerSecond" : 0.0, > "processedRowsPerSecond" : 0.0, > "durationMs" : { > "getEndOffset" : 0, > "setOffsetRange" : 9, > "triggerExecution" : 9 > }, > "stateOperators" : [ ], > "sources" : [ { > "description" : "KafkaV2[Subscribe[kafka-topic-name]]", > "startOffset" : { > "kafka-topic-name" : { > "2" : 10452513, > "1" : 10454326, > "3" : 10469196, > "0" : 10503762 > } > }, > "endOffset" : { > "kafka-topic-name" : { > "2" : 10452513, > "1" : 10454326, > "3" : 10469196, > "0" : 10503762 > } > }, > "numInputRows" : 0, > "inputRowsPerSecond" : 0.0, > "processedRowsPerSecond" : 0.0 > } ], > "sink" : { > "description" : "ForeachBatchSink" > } > } in progress{noformat} > *Next micro batch trigger:* > {noformat} > 2019/10/26 23:53:53 INFO utils.Logging[26]: 2019-10-26 23:53:53.951 : ( : > Query { > "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b", > "runId" : "2d20d633-2768-446c-845b-893243361422", > "name" : "StreamingProcessorName", > "timestamp" : "2019-10-26T23:53:52.907Z", > "batchId" : 2145898, > "numInputRows" : 0, > "inputRowsPerSecond" : 0.0, > "processedRowsPerSecond" : 0.0, > "durationMs" : { > "addBatch" : 350, > "getBatch" : 4, > "getEndOffset" : 0, > "queryPlanning" : 102, > "setOffsetRange" : 24, > "triggerExecution" : 1043, > "walCommit" : 349 > }, > "stateOperators" : [ ], > "sources" : [ { > "description" : "KafkaV2[Subscribe[kafka-topic-name]]", > "startOffset" : { > "kafka-topic-name" : { > "2" : 10452513, > "1" : 10454326, > "3" : 10469196, > "0" : 10503762 > } > }, > "endOffset" : { > "kafka-topic-name" : { > "2" : 10452513, > "1" : 10454326, > "3" : 9773098, > "0" : 10503762 > } > }, > "numInputRows" : 0, > "inputRowsPerSecond" : 0.0, > "processedRowsPerSecond" : 0.0 > } ], > "sink" : { > "description" : "ForeachBatchSink" > } > } in progress{noformat} > Notice that for partition 3 of the kafka topic, the endOffsets are actually > smaller than the starting offsets! > Checked the HDFS checkpoint dir and the checkpointed offsets look fine and > point to the last committed offsets > Why is the end offset for a partition being computed to a smaller value? > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org