OK, so at this point, even without involving commitAsync, you're seeing consumer rebalances after a particular batch takes longer than the session timeout?
Do you have a minimal code example you can share? On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff <matthias.nieh...@codecentric.de> wrote: > Hi, > sry for the late reply. A public holiday in Germany. > > Yes, its using a unique group id which no other job or consumer group is > using. I have increased the session.timeout to 1 minutes and set the > max.poll.rate to 1000. The processing takes ~1 second. > > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> Well, I'd start at the first thing suggested by the error, namely that >> the group has rebalanced. >> >> Is that stream using a unique group id? >> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff >> <matthias.nieh...@codecentric.de> wrote: >> > Hi, >> > >> > the stacktrace: >> > >> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot >> > be >> > completed since the group has already rebalanced and assigned the >> > partitions >> > to another member. This means that the time between subsequent calls to >> > poll() was longer than the configured session.timeout.ms, which >> > typically >> > implies that the poll loop is spending too much time message processing. >> > You >> > can address this either by increasing the session timeout or by reducing >> > the >> > maximum size of batches returned in poll() with max.poll.records. >> > at >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) >> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) >> > at >> > >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201) >> > at >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998) >> > at >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) >> > at >> > >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:169) >> > at >> > >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) >> > at scala.Option.orElse(Option.scala:289) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) >> > at >> > >> > org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) >> > at scala.Option.orElse(Option.scala:289) >> > at >> > >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) >> > at >> > >> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) >> > at >> > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) >> > at >> > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) >> > at >> > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) >> > at >> > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) >> > at >> > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) >> > at >> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) >> > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) >> > at >> > >> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) >> > at >> > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) >> > at >> > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) >> > at scala.util.Try$.apply(Try.scala:192) >> > at >> > >> > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) >> > at >> > >> > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) >> > at >> > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) >> > at >> > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) >> > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> > >> > But it seems like the commit is not the actual problem. The job also >> > falls >> > behind if I do not commit the offsets. The delay would be ok if the >> > processing time was bigger than the batch size, but thats not the case >> > in >> > any of the microbatches. Imho for some reason one of the microbatches >> > falls >> > behind more than session.timeout.ms. Then the consumer we regroup which >> > takes about 1 minute (see timestamps below). Know begins a circle of >> > slow >> > batches each triggering a consumer regroup. Would this be possible? >> > >> > >> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for time >> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34 >> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined group >> > spark_aggregation_job-kafka010 with generation 6 >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned >> > partitions [sapxm.adserving.log.ad_request-0, >> > sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request-1, >> > sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request-3, >> > sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request-5, >> > sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request-7, >> > sapxm.adserving.log.ad_request-9] for group >> > spark_aggregation_job-kafka010 >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously assigned >> > partitions [sapxm.adserving.log.view-3, sapxm.adserving.log.view-4, >> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2, >> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9, >> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8, >> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group >> > spark_aggregation_job-kafka010 >> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group >> > spark_aggregation_job-kafka010 >> > >> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> >> >> What's the actual stacktrace / exception you're getting related to >> >> commit failure? >> >> >> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff >> >> <matthias.nieh...@codecentric.de> wrote: >> >> > Hi everybody, >> >> > >> >> > i am using the new Kafka Receiver for Spark Streaming for my Job. >> >> > When >> >> > running with old consumer it runs fine. >> >> > >> >> > The Job consumes 3 Topics, saves the data to Cassandra, cogroups the >> >> > topic, >> >> > calls mapWithState and stores the results in cassandra. After that I >> >> > manually commit the Kafka offsets using the commitAsync method of the >> >> > KafkaDStream. >> >> > >> >> > With the new consumer I experience the following problem: >> >> > >> >> > After a certain amount of time (about 4-5 minutes, might be more or >> >> > less) >> >> > there are exceptions that the offset commit failed. The processing >> >> > takes >> >> > less than the batch interval. I also adjusted the session.timeout and >> >> > request.timeout as well as the max.poll.records setting which did not >> >> > help. >> >> > >> >> > After the first offset commit failed the time it takes from kafka >> >> > until >> >> > the >> >> > microbatch is started increases, the processing time is constantly >> >> > below >> >> > the >> >> > batch interval. Moreover further offset commits also fail and as >> >> > result >> >> > the >> >> > delay time builds up. >> >> > >> >> > Has anybody made this experience as well? >> >> > >> >> > Thank you >> >> > >> >> > Relevant Kafka Parameters: >> >> > >> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}", >> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}", >> >> > "auto.offset.reset" -> "largest", >> >> > "enable.auto.commit" -> "false", >> >> > "max.poll.records" -> "1000" >> >> > >> >> > >> >> > >> >> > -- >> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory | >> >> > Consulting >> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 >> >> > (0) >> >> > 172.1702676 >> >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >> >> > www.more4fi.de >> >> > >> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >> >> > Schütz >> >> > >> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >> >> > vertrauliche >> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der >> >> > richtige >> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, >> >> > informieren >> >> > Sie >> >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. >> >> > beigefügter >> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. >> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist >> >> > nicht >> >> > gestattet >> > >> > >> > >> > >> > -- >> > Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >> > 172.1702676 >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >> > www.more4fi.de >> > >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >> > Schütz >> > >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >> > vertrauliche >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren >> > Sie >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. >> > beigefügter >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist >> > nicht >> > gestattet > > > > > -- > Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) > 172.1702676 > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > www.more4fi.de > > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz > > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht > gestattet --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org