Hi, I am having this peculiar problem with our spark jobs in our cluster, where the spark job ends with a message:
No current assignment for partition iomkafkaconnector-deliverydata-dev-2 We have a setup where we have 4 kafka partitions and 4 spark executors, so each partition should be directly read by each executor. Also, The interesting thing to note is that we haven't set the ' spark.dynamicAllocation.enabled' to true but still the executors become dead and new executors are added at times. The problem I see is that when executors go down, the partitions are not reassigned to active executors, I can't say why is this happening, but I have the following log which I receive after which the spark job dies. 18/05/12 03:52:52 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [hierarchy-updates-dev-0, iomkafkaconnector-deliverydata-dev-0, iomkafkaconnector-deliverydata-dev-1] for group data-in-cleansing-consumer 18/05/12 03:53:01 ERROR scheduler.JobScheduler: Error generating jobs for time 1526075610000 ms java.lang.IllegalStateException: No current assignment for partition iomkafkaconnector-deliverydata-dev-2 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:194) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 18/05/12 03:53:01 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.IllegalStateException: No current assignment for partition iomkafkaconnector-deliverydata-dev-2 java.lang.IllegalStateException: No current assignment for partition iomkafkaconnector-deliverydata-dev-2 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:194) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 18/05/12 03:53:01 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.IllegalStateException: No current assignment for partition iomkafkaconnector-deliverydata-dev-2) 18/05/12 03:53:01 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 18/05/12 03:53:01 INFO scheduler.ReceiverTracker: ReceiverTracker stopped 18/05/12 03:53:01 INFO scheduler.JobGenerator: Stopping JobGenerator immediately 18/05/12 03:53:01 INFO util.RecurringTimer: Stopped timer for JobGenerator after time 1526097180000 18/05/12 03:53:02 ERROR scheduler.JobScheduler: Error generating jobs for time 1526075620000 ms java.lang.IllegalStateException: No current assignment for partition iomkafkaconnector-deliverydata-dev-2 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:194) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 18/05/12 03:53:02 WARN kerberos.KerberosLogin: TGT renewal thread has been interrupted and will exit. 18/05/12 03:53:02 ERROR scheduler.JobScheduler: Error generating jobs for time 1526075630000 ms java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417) at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 18/05/12 03:53:02 INFO scheduler.JobGenerator: Stopped JobGenerator 18/05/12 03:53:02 INFO scheduler.JobScheduler: Stopped JobScheduler 18/05/12 03:53:02 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@56a3af9e{/streaming,null,UNAVAILABLE,@Spark} 18/05/12 03:53:02 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@4ca2cc2b{/streaming/batch,null,UNAVAILABLE,@Spark} 18/05/12 03:53:02 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@39330226{/static/streaming,null,UNAVAILABLE,@Spark} 18/05/12 03:53:02 INFO streaming.StreamingContext: StreamingContext stopped successfully 18/05/12 03:53:02 INFO spark.SparkContext: Invoking stop() from shutdown hook 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on 10.2.0.12:46563 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO server.AbstractConnector: Stopped Spark@45ad8de8{HTTP/1.1,[http/1.1]}{0.0.0.0:0} 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on cdh2.germanycentral.cloudapp.microsoftazure.de:36878 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on cdh3.germanycentral.cloudapp.microsoftazure.de:42605 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on cdh2.germanycentral.cloudapp.microsoftazure.de:36740 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on cdh2.germanycentral.cloudapp.microsoftazure.de:34228 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO ui.SparkUI: Stopped Spark web UI at http://10.2.0.12:37118 18/05/12 03:53:02 INFO cluster.YarnClusterSchedulerBackend: Shutting down all executors 18/05/12 03:53:02 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down 18/05/12 03:53:02 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices (serviceOption=None, services=List(), started=false) 18/05/12 03:53:02 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/05/12 03:53:02 INFO memory.MemoryStore: MemoryStore cleared 18/05/12 03:53:02 INFO storage.BlockManager: BlockManager stopped 18/05/12 03:53:02 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 18/05/12 03:53:02 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/05/12 03:53:02 INFO spark.SparkContext: Successfully stopped SparkContext 18/05/12 03:53:02 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.IllegalStateException: No current assignment for partition iomkafkaconnector-deliverydata-dev-2) 18/05/12 03:53:02 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. 18/05/12 03:53:02 INFO util.ShutdownHookManager: Shutdown hook called Thanks & Regards Biplob Biswas