Well, I'd start at the first thing suggested by the error, namely that
the group has rebalanced.

Is that stream using a unique group id?

On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
<matthias.nieh...@codecentric.de> wrote:
> Hi,
>
> the stacktrace:
>
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the partitions
> to another member. This means that the time between subsequent calls to
> poll() was longer than the configured session.timeout.ms, which typically
> implies that the poll loop is spending too much time message processing. You
> can address this either by increasing the session timeout or by reducing the
> maximum size of batches returned in poll() with max.poll.records.
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:169)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
> at scala.util.Try$.apply(Try.scala:192)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> But it seems like the commit is not the actual problem. The job also falls
> behind if I do not commit the offsets. The delay would be ok if the
> processing time was bigger than the batch size, but thats not the case in
> any of the microbatches. Imho for some reason one of the microbatches falls
> behind more than session.timeout.ms. Then the consumer we regroup which
> takes about 1 minute (see timestamps below). Know begins a circle of slow
> batches each triggering a consumer regroup. Would this be possible?
>
>
> 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for time
> 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34
> 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined group
> spark_aggregation_job-kafka010 with generation 6
> 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned
> partitions [sapxm.adserving.log.ad_request-0,
> sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request-1,
> sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request-3,
> sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request-5,
> sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request-7,
> sapxm.adserving.log.ad_request-9] for group spark_aggregation_job-kafka010
> 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously assigned
> partitions [sapxm.adserving.log.view-3, sapxm.adserving.log.view-4,
> sapxm.adserving.log.view-1, sapxm.adserving.log.view-2,
> sapxm.adserving.log.view-0, sapxm.adserving.log.view-9,
> sapxm.adserving.log.view-7, sapxm.adserving.log.view-8,
> sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group
> spark_aggregation_job-kafka010
> 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group
> spark_aggregation_job-kafka010
>
> 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>
>> What's the actual stacktrace / exception you're getting related to
>> commit failure?
>>
>> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff
>> <matthias.nieh...@codecentric.de> wrote:
>> > Hi everybody,
>> >
>> > i am using the new Kafka Receiver for Spark Streaming for my Job. When
>> > running with old consumer it runs fine.
>> >
>> > The Job consumes 3 Topics, saves the data to Cassandra, cogroups the
>> > topic,
>> > calls mapWithState and stores the results in cassandra. After that I
>> > manually commit the Kafka offsets using the commitAsync method of the
>> > KafkaDStream.
>> >
>> > With the new consumer I experience the following problem:
>> >
>> > After a certain amount of time (about 4-5 minutes, might be more or
>> > less)
>> > there are exceptions that the offset commit failed. The processing takes
>> > less than the batch interval. I also adjusted the session.timeout and
>> > request.timeout as well as the max.poll.records setting which did not
>> > help.
>> >
>> > After the first offset commit failed the time it takes from kafka until
>> > the
>> > microbatch is started increases, the processing time is constantly below
>> > the
>> > batch interval. Moreover further offset commits also fail and as result
>> > the
>> > delay time builds up.
>> >
>> > Has anybody made this experience as well?
>> >
>> > Thank you
>> >
>> > Relevant Kafka Parameters:
>> >
>> > "session.timeout.ms" -> s"${1 * 60 * 1000}",
>> > "request.timeout.ms" -> s"${2 * 60 * 1000}",
>> > "auto.offset.reset" -> "largest",
>> > "enable.auto.commit" -> "false",
>> > "max.poll.records" -> "1000"
>> >
>> >
>> >
>> > --
>> > Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> > 172.1702676
>> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> > www.more4fi.de
>> >
>> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> > Schütz
>> >
>> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> > vertrauliche
>> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
>> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren
>> > Sie
>> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
>> > beigefügter
>> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
>> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
>> > nicht
>> > gestattet
>
>
>
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter
> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht
> gestattet

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to