[ 
https://issues.apache.org/jira/browse/BEAM-3259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía reassigned BEAM-3259:
----------------------------------

    Assignee:     (was: Reuven Lax)

> KafkaIO.read fails job upon broker death
> ----------------------------------------
>
>                 Key: BEAM-3259
>                 URL: https://issues.apache.org/jira/browse/BEAM-3259
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.1.0
>            Reporter: Nikoleta Verbeck
>            Priority: Major
>
> The KafkaIO.read() causes the job/pipeline to fail when a broker falls out of 
> the Kafka Cluster. I'd expect the job to continue running by sourcing the 
> data for the failed broker from one of the partition replicates. 
> Stacktrace of exception thrown:
> {code:java}
> 17/11/27 19:41:16 WARN TaskSetManager: Lost task 8.0 in stage 269649.0 (TID 
> 4044336, 96.118.131.19): 
> org.apache.beam.runners.spark.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.IllegalStateException: checkpointed partition PARTITION-38 and 
> assigned partition PARTITION-39 don't match
>       at 
> org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
>       at 
> org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
>       at 
> org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
>       at 
> org.apache.beam.runners.spark.io.MicrobatchSource.getOrCreateReader(MicrobatchSource.java:131)
>       at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:154)
>       at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
>       at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>       at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>       at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: checkpointed partition 
> PARTITION-38 and assigned partition PARTITION-39 don't match
>       at 
> shadded.com.google.common.base.Preconditions.checkState(Preconditions.java:737)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.<init>(KafkaIO.java:1000)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.createReader(KafkaIO.java:826)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.createReader(KafkaIO.java:1)
>       at 
> org.apache.beam.runners.spark.io.MicrobatchSource$ReaderLoader.call(MicrobatchSource.java:312)
>       at 
> org.apache.beam.runners.spark.io.MicrobatchSource$ReaderLoader.call(MicrobatchSource.java:299)
>       at 
> org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
>       at 
> org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
>       at 
> org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
>       at 
> org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
>       at 
> org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
>       ... 28 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to