[ 
https://issues.apache.org/jira/browse/SPARK-19547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuchang updated SPARK-19547:
----------------------------
    Description: 
Below is my scala code to create spark kafka stream:

val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "server110:2181,server110:9092",
      "zookeeper" -> "server110:2181",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("ABTest")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

But after run for 10 hours, it throws exceptions:

2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.ConsumerCoordinator: 
Revoking previously assigned partitions [ABTest-0, ABTest-1] for group example
2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.AbstractCoordinator: 
(Re-)joining group example
2017-02-10 10:56:20,011 INFO  [JobGenerator] internals.AbstractCoordinator: 
(Re-)joining group example
2017-02-10 10:56:40,057 INFO  [JobGenerator] internals.AbstractCoordinator: 
Successfully joined group example with generation 5
2017-02-10 10:56:40,058 INFO  [JobGenerator] internals.ConsumerCoordinator: 
Setting newly assigned partitions [ABTest-1] for group example
2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error 
generating jobs for time 1486695380000 ms
java.lang.IllegalStateException: No current assignment for partition ABTest-0
        at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
        at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
        at scala.util.Try$.apply(Try.scala:192)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Obviously , The partition ABTestMsg-0 has already be revoked for this consumer, 
but it seems that the spark streaming consumer are not aware of that  and 
continue to consume data of this revoked topic-partition , so the exception 
occurs and the total spark job aborted.

I think the kafka rebalance event is very normal , how can I modify my code to 
make Spark streaming deal with the  partition-revoke event correctly?

  was:
val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "server110:2181,server110:9092",
      "zookeeper" -> "server110:2181",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("ABTest")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

This is my code to create a kafka stream.After run for 10 hours, it throws 
exceptions:

2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.ConsumerCoordinator: 
Revoking previously assigned partitions [ABTest-0, ABTest-1] for group example
2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.AbstractCoordinator: 
(Re-)joining group example
2017-02-10 10:56:20,011 INFO  [JobGenerator] internals.AbstractCoordinator: 
(Re-)joining group example
2017-02-10 10:56:40,057 INFO  [JobGenerator] internals.AbstractCoordinator: 
Successfully joined group example with generation 5
2017-02-10 10:56:40,058 INFO  [JobGenerator] internals.ConsumerCoordinator: 
Setting newly assigned partitions [ABTest-1] for group example
2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error 
generating jobs for time 1486695380000 ms
java.lang.IllegalStateException: No current assignment for partition ABTest-0
        at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
        at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
        at scala.util.Try$.apply(Try.scala:192)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Obviously , The partition ABTestMsg-0 has already be revoked for this consumer, 
but it seems that the spark streaming consumer are not aware that  and continue 
to consume data from it , so the exception occurs and the total spark job 
aborted.

I think the kafka rebalance is very normal , how can Spark streaming deal with 
the rebalance and partition-revoked cases ?


> KafkaUtil throw 'No current assignment for partition' Exception
> ---------------------------------------------------------------
>
>                 Key: SPARK-19547
>                 URL: https://issues.apache.org/jira/browse/SPARK-19547
>             Project: Spark
>          Issue Type: Question
>          Components: DStreams
>    Affects Versions: 1.6.1
>            Reporter: wuchang
>
> Below is my scala code to create spark kafka stream:
> val kafkaParams = Map[String, Object](
>       "bootstrap.servers" -> "server110:2181,server110:9092",
>       "zookeeper" -> "server110:2181",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "group.id" -> "example",
>       "auto.offset.reset" -> "latest",
>       "enable.auto.commit" -> (false: java.lang.Boolean)
>     )
>     val topics = Array("ABTest")
>     val stream = KafkaUtils.createDirectStream[String, String](
>       ssc,
>       PreferConsistent,
>       Subscribe[String, String](topics, kafkaParams)
>     )
> But after run for 10 hours, it throws exceptions:
> 2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.ConsumerCoordinator: 
> Revoking previously assigned partitions [ABTest-0, ABTest-1] for group example
> 2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.AbstractCoordinator: 
> (Re-)joining group example
> 2017-02-10 10:56:20,011 INFO  [JobGenerator] internals.AbstractCoordinator: 
> (Re-)joining group example
> 2017-02-10 10:56:40,057 INFO  [JobGenerator] internals.AbstractCoordinator: 
> Successfully joined group example with generation 5
> 2017-02-10 10:56:40,058 INFO  [JobGenerator] internals.ConsumerCoordinator: 
> Setting newly assigned partitions [ABTest-1] for group example
> 2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error 
> generating jobs for time 1486695380000 ms
> java.lang.IllegalStateException: No current assignment for partition ABTest-0
>         at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
>         at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169)
>         at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
>         at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>         at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>         at scala.Option.orElse(Option.scala:289)
>         at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>         at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>         at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>         at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>         at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
>         at scala.util.Try$.apply(Try.scala:192)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Obviously , The partition ABTestMsg-0 has already be revoked for this 
> consumer, but it seems that the spark streaming consumer are not aware of 
> that  and continue to consume data of this revoked topic-partition , so the 
> exception occurs and the total spark job aborted.
> I think the kafka rebalance event is very normal , how can I modify my code 
> to make Spark streaming deal with the  partition-revoke event correctly?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to