[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2019-08-19 Thread Ryne Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910475#comment-16910475
 ] 

Ryne Yang commented on KAFKA-3410:
--

experiencing the same issue on kafka 1.1.1. 

this is causing production broker to crash, and the only way to solve this for 
us is to delete the follower's partition logs from the logdir and restart the 
follower. since we don't want to enable unclean leader election. 

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8625) intra broker data balance stuck

2019-07-03 Thread Ryne Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryne Yang updated KAFKA-8625:
-
Description: 
when we used kafka cruise control to invoke kafka's new feature( [feature 
proposal|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories#KIP-113:Supportreplicasmovementbetweenlogdirectories-1)Howtomovereplicabetweenlogdirectoriesonthesamebroker]]
 ) intra broker disk balance, it did a great work however the process seems to 
stuck at the last mile. 

we stop seeing more movements meaning the move is done and we do see great 
balanced results from our monitoring, but there are some logdirs that are stuck 
at moving indicated as below example:
{code:java}
{"partition":"LOGSTASH5-4","size":0,"offsetLag":123189442,"isFuture":true}
{code}
 there are a handful of those partitions on each broker and they seem to be 
random.

we have waited for days and they don't seem to go away. however we haven't 
tried to restart the controller broker yet. 

does anyone know how to solve this and more importantly why did this happen?

so far we've only tried on version 1.1.1. no idea if this got fixed in the 
later version. 

  was:
when we used kafka cruise control to invoke kafka's new feature([feature 
proposal|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories#KIP-113:Supportreplicasmovementbetweenlogdirectories-1)Howtomovereplicabetweenlogdirectoriesonthesamebroker]])
 intra broker disk balance, it did a great work however the process seems to 
stuck at the last mile. 

we stop seeing more movements meaning the move is done and we do see great 
balanced results from our monitoring, but there are some logdirs that are stuck 
at moving indicated as below example:
{code:java}
{"partition":"LOGSTASH5-4","size":0,"offsetLag":123189442,"isFuture":true}
{code}
 there are a handful of those partitions on each broker and they seem to be 
random.

we have waited for days and they don't seem to go away. however we haven't 
tried to restart the controller broker yet. 

does anyone know how to solve this and more importantly why did this happen?

so far we've only tried on version 1.1.1. no idea if this got fixed in the 
later version. 


> intra broker data balance stuck
> ---
>
> Key: KAFKA-8625
> URL: https://issues.apache.org/jira/browse/KAFKA-8625
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1
> Environment: Linux 2.6.32-431.5.1.el6.x86_64 #1 SMP x86_64 x86_64 
> x86_64 GNU/Linux
>Reporter: Ryne Yang
>Priority: Major
>
> when we used kafka cruise control to invoke kafka's new feature( [feature 
> proposal|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories#KIP-113:Supportreplicasmovementbetweenlogdirectories-1)Howtomovereplicabetweenlogdirectoriesonthesamebroker]]
>  ) intra broker disk balance, it did a great work however the process seems 
> to stuck at the last mile. 
> we stop seeing more movements meaning the move is done and we do see great 
> balanced results from our monitoring, but there are some logdirs that are 
> stuck at moving indicated as below example:
> {code:java}
> {"partition":"LOGSTASH5-4","size":0,"offsetLag":123189442,"isFuture":true}
> {code}
>  there are a handful of those partitions on each broker and they seem to be 
> random.
> we have waited for days and they don't seem to go away. however we haven't 
> tried to restart the controller broker yet. 
> does anyone know how to solve this and more importantly why did this happen?
> so far we've only tried on version 1.1.1. no idea if this got fixed in the 
> later version. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8625) intra broker data balance stuck

2019-07-03 Thread Ryne Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryne Yang updated KAFKA-8625:
-
Description: 
when we used kafka cruise control to invoke kafka's new feature([feature 
proposal|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories#KIP-113:Supportreplicasmovementbetweenlogdirectories-1)Howtomovereplicabetweenlogdirectoriesonthesamebroker]])
 intra broker disk balance, it did a great work however the process seems to 
stuck at the last mile. 

we stop seeing more movements meaning the move is done and we do see great 
balanced results from our monitoring, but there are some logdirs that are stuck 
at moving indicated as below example:
{code:java}
{"partition":"LOGSTASH5-4","size":0,"offsetLag":123189442,"isFuture":true}
{code}
 there are a handful of those partitions on each broker and they seem to be 
random.

we have waited for days and they don't seem to go away. however we haven't 
tried to restart the controller broker yet. 

does anyone know how to solve this and more importantly why did this happen?

so far we've only tried on version 1.1.1. no idea if this got fixed in the 
later version. 

  was:
when we used kafka cruise control to invoke kafka's new feature intra broker 
disk balance, it did a great work however the process seems to stuck at the 
last mile. 

we stop seeing more movements meaning the move is done and we do see great 
balanced results from our monitoring, but there are some logdirs that are stuck 
at moving indicated as below example:
{code:java}
{"partition":"LOGSTASH5-4","size":0,"offsetLag":123189442,"isFuture":true}
{code}
 there are a handful of those partitions on each broker and they seem to be 
random.

we have waited for days and they don't seem to go away. however we haven't 
tried to restart the controller broker yet. 

does anyone know how to solve this and more importantly why did this happen?

so far we've only tried on version 1.1.1. no idea if this got fixed in the 
later version. 


> intra broker data balance stuck
> ---
>
> Key: KAFKA-8625
> URL: https://issues.apache.org/jira/browse/KAFKA-8625
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1
> Environment: Linux 2.6.32-431.5.1.el6.x86_64 #1 SMP x86_64 x86_64 
> x86_64 GNU/Linux
>Reporter: Ryne Yang
>Priority: Major
>
> when we used kafka cruise control to invoke kafka's new feature([feature 
> proposal|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories#KIP-113:Supportreplicasmovementbetweenlogdirectories-1)Howtomovereplicabetweenlogdirectoriesonthesamebroker]])
>  intra broker disk balance, it did a great work however the process seems to 
> stuck at the last mile. 
> we stop seeing more movements meaning the move is done and we do see great 
> balanced results from our monitoring, but there are some logdirs that are 
> stuck at moving indicated as below example:
> {code:java}
> {"partition":"LOGSTASH5-4","size":0,"offsetLag":123189442,"isFuture":true}
> {code}
>  there are a handful of those partitions on each broker and they seem to be 
> random.
> we have waited for days and they don't seem to go away. however we haven't 
> tried to restart the controller broker yet. 
> does anyone know how to solve this and more importantly why did this happen?
> so far we've only tried on version 1.1.1. no idea if this got fixed in the 
> later version. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8625) intra broker data balance stuck

2019-07-03 Thread Ryne Yang (JIRA)
Ryne Yang created KAFKA-8625:


 Summary: intra broker data balance stuck
 Key: KAFKA-8625
 URL: https://issues.apache.org/jira/browse/KAFKA-8625
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.1
 Environment: Linux 2.6.32-431.5.1.el6.x86_64 #1 SMP x86_64 x86_64 
x86_64 GNU/Linux
Reporter: Ryne Yang


when we used kafka cruise control to invoke kafka's new feature intra broker 
disk balance, it did a great work however the process seems to stuck at the 
last mile. 

we stop seeing more movements meaning the move is done and we do see great 
balanced results from our monitoring, but there are some logdirs that are stuck 
at moving indicated as below example:
{code:java}
{"partition":"LOGSTASH5-4","size":0,"offsetLag":123189442,"isFuture":true}
{code}
 there are a handful of those partitions on each broker and they seem to be 
random.

we have waited for days and they don't seem to go away. however we haven't 
tried to restart the controller broker yet. 

does anyone know how to solve this and more importantly why did this happen?

so far we've only tried on version 1.1.1. no idea if this got fixed in the 
later version. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7796) structured streaming fetched wrong current offset from kafka

2019-01-24 Thread Ryne Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryne Yang resolved KAFKA-7796.
--
Resolution: Invalid

Not related to kafka, it's actually spark structured streaming integer 
overflow: https://issues.apache.org/jira/browse/SPARK-26718

> structured streaming fetched wrong current offset from kafka
> 
>
> Key: KAFKA-7796
> URL: https://issues.apache.org/jira/browse/KAFKA-7796
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
> Environment: Linux, Centos 7
>Reporter: Ryne Yang
>Priority: Major
>
> when running spark structured streaming using lib: `"org.apache.spark" %% 
> "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
> offset fetching:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): 
> java.lang.AssertionError: assertion failed: latest offs
> et -9223372036854775808 does not equal -1
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
> at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for 
> one of the partitions. I checked the structured streaming checkpoint, that 
> was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.
> kafka broker version: 1.1.0.
> lib we used:
> {{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % 
> "2.4.0" }}
> how to reproduce:
> basically we started a structured streamer and subscribed a topic of 4 
> partitions. then produced some messages into topic, job crashed and logged 
> the stacktrace like above.
> also the committed offsets seem fine as we see in the logs: 
> {code:java}
> === Streaming Query ===
> Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
> 31878627-d473-4ee8-955d-d4d3f3f45eb9]
> Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":1}}}
> Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":-9223372036854775808}}}
> {code}
> so spark streaming recorded the correct value for partition: 0, but the 
> current available offsets returned from kafka is showing Long.MIN_VALUE. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (KAFKA-7796) structured streaming fetched wrong current offset from kafka

2019-01-24 Thread Ryne Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryne Yang closed KAFKA-7796.


> structured streaming fetched wrong current offset from kafka
> 
>
> Key: KAFKA-7796
> URL: https://issues.apache.org/jira/browse/KAFKA-7796
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
> Environment: Linux, Centos 7
>Reporter: Ryne Yang
>Priority: Major
>
> when running spark structured streaming using lib: `"org.apache.spark" %% 
> "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
> offset fetching:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): 
> java.lang.AssertionError: assertion failed: latest offs
> et -9223372036854775808 does not equal -1
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
> at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for 
> one of the partitions. I checked the structured streaming checkpoint, that 
> was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.
> kafka broker version: 1.1.0.
> lib we used:
> {{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % 
> "2.4.0" }}
> how to reproduce:
> basically we started a structured streamer and subscribed a topic of 4 
> partitions. then produced some messages into topic, job crashed and logged 
> the stacktrace like above.
> also the committed offsets seem fine as we see in the logs: 
> {code:java}
> === Streaming Query ===
> Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
> 31878627-d473-4ee8-955d-d4d3f3f45eb9]
> Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":1}}}
> Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":-9223372036854775808}}}
> {code}
> so spark streaming recorded the correct value for partition: 0, but the 
> current available offsets returned from kafka is showing Long.MIN_VALUE. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7796) structured streaming fetched wrong current offset from kafka

2019-01-08 Thread Ryne Yang (JIRA)
Ryne Yang created KAFKA-7796:


 Summary: structured streaming fetched wrong current offset from 
kafka
 Key: KAFKA-7796
 URL: https://issues.apache.org/jira/browse/KAFKA-7796
 Project: Kafka
  Issue Type: Bug
  Components: consumer
 Environment: Linux, Centos 7
Reporter: Ryne Yang


when running spark structured streaming using lib: `"org.apache.spark" %% 
"spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
offset fetching:
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): java.lang.AssertionError: 
assertion failed: latest offs
et -9223372036854775808 does not equal -1
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
at 
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329)
at 
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for one 
of the partitions. I checked the structured streaming checkpoint, that was 
correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.

kafka broker version: 1.1.0.
lib we used:

{{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" 
}}

how to reproduce:
basically we started a structured streamer and subscribed a topic of 4 
partitions. then produced some messages into topic, job crashed and logged the 
stacktrace like above.

also the committed offsets seem fine as we see in the logs: 
{code:java}
=== Streaming Query ===
Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
31878627-d473-4ee8-955d-d4d3f3f45eb9]
Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
{"REVENUEEVENT":{"0":1}}}
Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
{"REVENUEEVENT":{"0":-9223372036854775808}}}
{code}
so spark streaming recorded the correct value for partition: 0, but the current 
available offsets returned from kafka is showing Long.MIN_VALUE. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)