[ https://issues.apache.org/jira/browse/SPARK-19547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-19547. ------------------------------- Resolution: Invalid > KafkaUtil throw 'No current assignment for partition' Exception > --------------------------------------------------------------- > > Key: SPARK-19547 > URL: https://issues.apache.org/jira/browse/SPARK-19547 > Project: Spark > Issue Type: Question > Components: DStreams > Affects Versions: 1.6.1 > Reporter: wuchang > > Below is my scala code to create spark kafka stream: > val kafkaParams = Map[String, Object]( > "bootstrap.servers" -> "server110:2181,server110:9092", > "zookeeper" -> "server110:2181", > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> "example", > "auto.offset.reset" -> "latest", > "enable.auto.commit" -> (false: java.lang.Boolean) > ) > val topics = Array("ABTest") > val stream = KafkaUtils.createDirectStream[String, String]( > ssc, > PreferConsistent, > Subscribe[String, String](topics, kafkaParams) > ) > But after run for 10 hours, it throws exceptions: > 2017-02-10 10:56:20,000 INFO [JobGenerator] internals.ConsumerCoordinator: > Revoking previously assigned partitions [ABTest-0, ABTest-1] for group example > 2017-02-10 10:56:20,000 INFO [JobGenerator] internals.AbstractCoordinator: > (Re-)joining group example > 2017-02-10 10:56:20,011 INFO [JobGenerator] internals.AbstractCoordinator: > (Re-)joining group example > 2017-02-10 10:56:40,057 INFO [JobGenerator] internals.AbstractCoordinator: > Successfully joined group example with generation 5 > 2017-02-10 10:56:40,058 INFO [JobGenerator] internals.ConsumerCoordinator: > Setting newly assigned partitions [ABTest-1] for group example > 2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error > generating jobs for time 1486695380000 ms > java.lang.IllegalStateException: No current assignment for partition ABTest-0 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) > at scala.util.Try$.apply(Try.scala:192) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Obviously , The partition ABTestMsg-0 has already be revoked for this > consumer, but it seems that the spark streaming consumer are not aware of > that and continue to consume data of this revoked topic-partition , so the > exception occurs and the total spark job aborted. > I think the kafka rebalance event is very normal , how can I modify my code > to make Spark streaming deal with the partition-revoke event correctly? -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org