[jira] [Updated] (SPARK-29639) Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch

2020-01-30 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-29639:
--
Target Version/s:   (was: 2.4.4, 2.4.5)

> Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch
> 
>
> Key: SPARK-29639
> URL: https://issues.apache.org/jira/browse/SPARK-29639
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Abhinav Choudhury
>Priority: Major
>
> We have been running a Spark structured job on production for more than a 
> week now. Put simply, it reads data from source Kafka topics (with 4 
> partitions) and writes to another kafka topic. Everything has been running 
> fine until the job started failing with the following error:
>  
> {noformat}
> Driver stacktrace:
>  === Streaming Query ===
>  Identifier: MetricComputer [id = af26bab1-0d89-4766-934e-ad5752d6bb08, runId 
> = 613a21ad-86e3-4781-891b-17d92c18954a]
>  Current Committed Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: 
> {"kafka-topic-name":
> {"2":10458347,"1":10460151,"3":10475678,"0":9809564}
> }}
>  Current Available Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: 
> {"kafka-topic-name":
> {"2":10458347,"1":10460151,"3":10475678,"0":10509527}
> }}
> Current State: ACTIVE
>  Thread State: RUNNABLE
> <-- Removed Logical plan -->
>  Some data may have been lost because they are not available in Kafka any 
> more; either the
>  data was aged out by Kafka or the topic may have been deleted before all the 
> data in the
>  topic was processed. If you don't want your streaming query to fail on such 
> cases, set the
>  source option "failOnDataLoss" to "false".{noformat}
> Configuration:
> {noformat}
> Spark 2.4.0
> Spark-sql-kafka 0.10{noformat}
> Looking at the Spark structured streaming query progress logs, it seems like 
> the endOffsets computed for the next batch was actually smaller than the 
> starting offset:
> *Microbatch Trigger 1:*
> {noformat}
> 2019/10/26 23:53:51 INFO utils.Logging[26]: 2019-10-26 23:53:51.767 : ( : 
> Query {
>   "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
>   "runId" : "2d20d633-2768-446c-845b-893243361422",
>   "name" : "StreamingProcessorName",
>   "timestamp" : "2019-10-26T23:53:51.741Z",
>   "batchId" : 2145898,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "getEndOffset" : 0,
> "setOffsetRange" : 9,
> "triggerExecution" : 9
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
> "startOffset" : {
>   "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
>   }
> },
> "endOffset" : {
>   "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
>   }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
> "description" : "ForeachBatchSink"
>   }
> } in progress{noformat}
> *Next micro batch trigger:*
> {noformat}
> 2019/10/26 23:53:53 INFO utils.Logging[26]: 2019-10-26 23:53:53.951 : ( : 
> Query {
>   "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
>   "runId" : "2d20d633-2768-446c-845b-893243361422",
>   "name" : "StreamingProcessorName",
>   "timestamp" : "2019-10-26T23:53:52.907Z",
>   "batchId" : 2145898,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "addBatch" : 350,
> "getBatch" : 4,
> "getEndOffset" : 0,
> "queryPlanning" : 102,
> "setOffsetRange" : 24,
> "triggerExecution" : 1043,
> "walCommit" : 349
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
> "startOffset" : {
>   "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
>   }
> },
> "endOffset" : {
>   "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 9773098,
> "0" : 10503762
>   }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
> "description" : "ForeachBatchSink"
>   }
> } in progress{noformat}
> Notice that for partition 3 of the kafka topic, the endOffsets are actually 
> smaller than the starting offsets!
> Checked the HDFS checkpoint dir and the checkpointed offsets look fine and 
> point to the last committed offsets
>  Why is the end offset for a partition being computed to a smaller v

[jira] [Updated] (SPARK-29639) Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch

2019-10-29 Thread Abhinav Choudhury (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhinav Choudhury updated SPARK-29639:
--
Description: 
We have been running a Spark structured job on production for more than a week 
now. Put simply, it reads data from source Kafka topics (with 4 partitions) and 
writes to another kafka topic. Everything has been running fine until the job 
started failing with the following error:

 
{noformat}
Driver stacktrace:
 === Streaming Query ===
 Identifier: MetricComputer [id = af26bab1-0d89-4766-934e-ad5752d6bb08, runId = 
613a21ad-86e3-4781-891b-17d92c18954a]
 Current Committed Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: 
{"kafka-topic-name":
{"2":10458347,"1":10460151,"3":10475678,"0":9809564}
}}
 Current Available Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: 
{"kafka-topic-name":
{"2":10458347,"1":10460151,"3":10475678,"0":10509527}
}}
Current State: ACTIVE
 Thread State: RUNNABLE
<-- Removed Logical plan -->
 Some data may have been lost because they are not available in Kafka any more; 
either the
 data was aged out by Kafka or the topic may have been deleted before all the 
data in the
 topic was processed. If you don't want your streaming query to fail on such 
cases, set the
 source option "failOnDataLoss" to "false".{noformat}
Configuration:
{noformat}
Spark 2.4.0
Spark-sql-kafka 0.10{noformat}
Looking at the Spark structured streaming query progress logs, it seems like 
the endOffsets computed for the next batch was actually smaller than the 
starting offset:

*Microbatch Trigger 1:*
{noformat}
2019/10/26 23:53:51 INFO utils.Logging[26]: 2019-10-26 23:53:51.767 : ( : Query 
{
  "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
  "runId" : "2d20d633-2768-446c-845b-893243361422",
  "name" : "StreamingProcessorName",
  "timestamp" : "2019-10-26T23:53:51.741Z",
  "batchId" : 2145898,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getEndOffset" : 0,
"setOffsetRange" : 9,
"triggerExecution" : 9
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaV2[Subscribe[kafka-topic-name]]",
"startOffset" : {
  "kafka-topic-name" : {
"2" : 10452513,
"1" : 10454326,
"3" : 10469196,
"0" : 10503762
  }
},
"endOffset" : {
  "kafka-topic-name" : {
"2" : 10452513,
"1" : 10454326,
"3" : 10469196,
"0" : 10503762
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "ForeachBatchSink"
  }
} in progress{noformat}
*Next micro batch trigger:*
{noformat}
2019/10/26 23:53:53 INFO utils.Logging[26]: 2019-10-26 23:53:53.951 : ( : Query 
{
  "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
  "runId" : "2d20d633-2768-446c-845b-893243361422",
  "name" : "StreamingProcessorName",
  "timestamp" : "2019-10-26T23:53:52.907Z",
  "batchId" : 2145898,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"addBatch" : 350,
"getBatch" : 4,
"getEndOffset" : 0,
"queryPlanning" : 102,
"setOffsetRange" : 24,
"triggerExecution" : 1043,
"walCommit" : 349
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaV2[Subscribe[kafka-topic-name]]",
"startOffset" : {
  "kafka-topic-name" : {
"2" : 10452513,
"1" : 10454326,
"3" : 10469196,
"0" : 10503762
  }
},
"endOffset" : {
  "kafka-topic-name" : {
"2" : 10452513,
"1" : 10454326,
"3" : 9773098,
"0" : 10503762
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "ForeachBatchSink"
  }
} in progress{noformat}
Notice that for partition 3 of the kafka topic, the endOffsets are actually 
smaller than the starting offsets!

Checked the HDFS checkpoint dir and the checkpointed offsets look fine and 
point to the last committed offsets
 Why is the end offset for a partition being computed to a smaller value?

 

  was:
We have been running a Spark structured job on production for more than a week 
now. Put simply, it reads data from source Kafka topics (with 4 partitions) and 
writes to another kafka topic. Everything has been running fine until the job 
started failing with the following error:

 
{noformat}
Driver stacktrace:
 === Streaming Query ===
 Identifier: MetricComputer [id = af26bab1-0d89-4766-934e-ad5752d6bb08, runId = 
613a21ad-86e3-4781-891b-17d92c18954a]
 Current Committed Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: 
{"kafka-topic-name":
{"2":10458347,"1":10460151,"3":10475678,"0":9809564}
}}
 Current Available Offsets: {KafkaV2[Subscribe[kf-adsk-inquirystep]]: 
{"kf-adsk-inquirystep":
{"2":10458347,"1":10460151,"3":10475678,"0":105

[jira] [Updated] (SPARK-29639) Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch

2019-10-29 Thread Abhinav Choudhury (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhinav Choudhury updated SPARK-29639:
--
Target Version/s: 2.4.4, 2.4.5

> Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch
> 
>
> Key: SPARK-29639
> URL: https://issues.apache.org/jira/browse/SPARK-29639
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Abhinav Choudhury
>Priority: Major
>
> We have been running a Spark structured job on production for more than a 
> week now. Put simply, it reads data from source Kafka topics (with 4 
> partitions) and writes to another kafka topic. Everything has been running 
> fine until the job started failing with the following error:
>  
> {noformat}
> Driver stacktrace:
>  === Streaming Query ===
>  Identifier: MetricComputer [id = af26bab1-0d89-4766-934e-ad5752d6bb08, runId 
> = 613a21ad-86e3-4781-891b-17d92c18954a]
>  Current Committed Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: 
> {"kafka-topic-name":
> {"2":10458347,"1":10460151,"3":10475678,"0":9809564}
> }}
>  Current Available Offsets: {KafkaV2[Subscribe[kf-adsk-inquirystep]]: 
> {"kf-adsk-inquirystep":
> {"2":10458347,"1":10460151,"3":10475678,"0":10509527}
> }}
> Current State: ACTIVE
>  Thread State: RUNNABLE
> <-- Removed Logical plan -->
>  Some data may have been lost because they are not available in Kafka any 
> more; either the
>  data was aged out by Kafka or the topic may have been deleted before all the 
> data in the
>  topic was processed. If you don't want your streaming query to fail on such 
> cases, set the
>  source option "failOnDataLoss" to "false".{noformat}
> Configuration:
> {noformat}
> Spark 2.4.0
> Spark-sql-kafka 0.10{noformat}
> Looking at the Spark structured streaming query progress logs, it seems like 
> the endOffsets computed for the next batch was actually smaller than the 
> starting offset:
> *Microbatch Trigger 1:*
> {noformat}
> 2019/10/26 23:53:51 INFO utils.Logging[26]: 2019-10-26 23:53:51.767 : ( : 
> Query {
>   "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
>   "runId" : "2d20d633-2768-446c-845b-893243361422",
>   "name" : "StreamingProcessorName",
>   "timestamp" : "2019-10-26T23:53:51.741Z",
>   "batchId" : 2145898,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "getEndOffset" : 0,
> "setOffsetRange" : 9,
> "triggerExecution" : 9
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
> "startOffset" : {
>   "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
>   }
> },
> "endOffset" : {
>   "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
>   }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
> "description" : "ForeachBatchSink"
>   }
> } in progress{noformat}
> *Next micro batch trigger:*
> {noformat}
> 2019/10/26 23:53:53 INFO utils.Logging[26]: 2019-10-26 23:53:53.951 : ( : 
> Query {
>   "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
>   "runId" : "2d20d633-2768-446c-845b-893243361422",
>   "name" : "StreamingProcessorName",
>   "timestamp" : "2019-10-26T23:53:52.907Z",
>   "batchId" : 2145898,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
> "addBatch" : 350,
> "getBatch" : 4,
> "getEndOffset" : 0,
> "queryPlanning" : 102,
> "setOffsetRange" : 24,
> "triggerExecution" : 1043,
> "walCommit" : 349
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
> "startOffset" : {
>   "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
>   }
> },
> "endOffset" : {
>   "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 9773098,
> "0" : 10503762
>   }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
> "description" : "ForeachBatchSink"
>   }
> } in progress{noformat}
> Notice that for partition 3 of the kafka topic, the endOffsets are actually 
> smaller than the starting offsets!
> Checked the HDFS checkpoint dir and the checkpointed offsets look fine and 
> point to the last committed offsets
> Why is the end offset for a partition being computed to a small