heartbeat.interval.ms default group.max.session.timeout.ms default session.timeout.ms 60000
default values as of kafka 0.10. complete Kafka params: val kafkaParams = Map[String, String]( "bootstrap.servers" -> kafkaBrokers, "auto.offset.reset" -> "latest", "enable.auto.commit" -> "false", "key.deserializer" -> classOf[StringDeserializer].getName, "value.deserializer" -> classOf[BytesDeserializer].getName, "session.timeout.ms" -> s"${1 * 60 * 1000}", "request.timeout.ms" -> s"${2 * 60 * 1000}", "max.poll.records" -> "1000" ) As pointed out, when using different groups for each DirectStream everything is fine. 2016-10-15 2:42 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > For you or anyone else having issues with consumer rebalance, what are > your settings for > > heartbeat.interval.ms > session.timeout.ms > group.max.session.timeout.ms > > relative to your batch time? > > On Tue, Oct 11, 2016 at 10:19 AM, static-max <flasha...@googlemail.com> > wrote: > > Hi, > > > > I run into the same exception > > (org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > be > > completed since the group has already rebalanced ...), but I only use one > > stream. > > I get the exceptions when trying to manually commit the offset to Kafka: > > > > OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); > > CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream.dstream(); > > cco.commitAsync(offsets); > > > > I tried setting "max.poll.records" to 1000 but this did not help. > > > > Any idea what could be wrong? > > > > 2016-10-11 15:36 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> > >> The new underlying kafka consumer prefetches data and is generally > heavier > >> weight, so it is cached on executors. Group id is part of the cache > key. I > >> assumed kafka users would use different group ids for consumers they > wanted > >> to be distinct, since otherwise would cause problems even with the > normal > >> kafka consumer, but that appears to be a poor assumption. > >> > >> I'll figure out a way to make this more obvious. > >> > >> > >> On Oct 11, 2016 8:19 AM, "Matthias Niehoff" > >> <matthias.nieh...@codecentric.de> wrote: > >> > >> good point, I changed the group id to be unique for the separate streams > >> and now it works. Thanks! > >> > >> Although changing this is ok for us, i am interested in the why :-) With > >> the old connector this was not a problem nor is it afaik with the pure > kafka > >> consumer api > >> > >> 2016-10-11 14:30 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >>> > >>> Just out of curiosity, have you tried using separate group ids for the > >>> separate streams? > >>> > >>> > >>> On Oct 11, 2016 4:46 AM, "Matthias Niehoff" > >>> <matthias.nieh...@codecentric.de> wrote: > >>>> > >>>> I stripped down the job to just consume the stream and print it, > without > >>>> avro deserialization. When I only consume one topic, everything is > fine. As > >>>> soon as I add a second stream it stucks after about 5 minutes. So I > >>>> basically bails down to: > >>>> > >>>> > >>>> val kafkaParams = Map[String, String]( > >>>> "bootstrap.servers" -> kafkaBrokers, > >>>> "group.id" -> group, > >>>> "key.deserializer" -> classOf[StringDeserializer].getName, > >>>> "value.deserializer" -> classOf[BytesDeserializer].getName, > >>>> "session.timeout.ms" -> s"${1 * 60 * 1000}", > >>>> "request.timeout.ms" -> s"${2 * 60 * 1000}", > >>>> "auto.offset.reset" -> "latest", > >>>> "enable.auto.commit" -> "false" > >>>> ) > >>>> > >>>> def main(args: Array[String]) { > >>>> > >>>> def createStreamingContext(): StreamingContext = { > >>>> > >>>> val sparkConf = new SparkConf(true) > >>>> .setAppName("Kafka Consumer Test") > >>>> sparkConf.setMaster("local[*]") > >>>> > >>>> val ssc = new StreamingContext(sparkConf, > >>>> Seconds(streaming_interval_seconds)) > >>>> > >>>> // AD REQUESTS > >>>> // =========== > >>>> val serializedAdRequestStream = createStream(ssc, > topic_adrequest) > >>>> serializedAdRequestStream.map(record => > >>>> record.value().get()).print(10) > >>>> > >>>> // VIEWS > >>>> // ====== > >>>> val serializedViewStream = createStream(ssc, topic_view) > >>>> serializedViewStream.map(record => record.value().get()).print( > 10) > >>>> > >>>> // // CLICKS > >>>> // // ====== > >>>> // val serializedClickStream = createStream(ssc, topic_click) > >>>> // serializedClickStream.map(record => > >>>> record.value().get()).print(10) > >>>> > >>>> ssc > >>>> } > >>>> > >>>> val streamingContext = createStreamingContext > >>>> streamingContext.start() > >>>> streamingContext.awaitTermination() > >>>> } > >>>> > >>>> > >>>> And in the logs you see: > >>>> > >>>> > >>>> 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job > >>>> 1476100944000 ms.2 from job set of time 1476100944000 ms > >>>> 16/10/10 14:02:26 INFO JobScheduler: Total delay: 2,314 s for time > >>>> 1476100944000 ms (execution: 0,698 s) > >>>> 16/10/10 14:03:26 INFO JobScheduler: Added jobs for time 1476100946000 > >>>> ms > >>>> 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from > >>>> persistence list > >>>> 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job > >>>> 1476100946000 ms.0 from job set of time 1476100946000 ms > >>>> > >>>> > >>>> 2016-10-11 9:28 GMT+02:00 Matthias Niehoff > >>>> <matthias.nieh...@codecentric.de>: > >>>>> > >>>>> This Job will fail after about 5 minutes: > >>>>> > >>>>> > >>>>> object SparkJobMinimal { > >>>>> > >>>>> //read Avro schemas > >>>>> var stream = getClass.getResourceAsStream(" > /avro/AdRequestLog.avsc") > >>>>> val avroSchemaAdRequest = > >>>>> scala.io.Source.fromInputStream(stream).getLines.mkString > >>>>> stream.close > >>>>> stream = > >>>>> getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc") > >>>>> val avroSchemaEvent = > >>>>> scala.io.Source.fromInputStream(stream).getLines.mkString > >>>>> stream.close > >>>>> > >>>>> > >>>>> val kafkaBrokers = "broker-0.kafka.mesos:9092" > >>>>> > >>>>> val topic_adrequest = "adserving.log.ad_request" > >>>>> val topic_view = "adserving.log.view" > >>>>> val topic_click = "adserving.log.click" > >>>>> val group = UUID.randomUUID().toString > >>>>> val streaming_interval_seconds = 2 > >>>>> > >>>>> val kafkaParams = Map[String, String]( > >>>>> "bootstrap.servers" -> kafkaBrokers, > >>>>> "group.id" -> group, > >>>>> "key.deserializer" -> classOf[StringDeserializer].getName, > >>>>> "value.deserializer" -> classOf[BytesDeserializer].getName, > >>>>> "session.timeout.ms" -> s"${1 * 60 * 1000}", > >>>>> "request.timeout.ms" -> s"${2 * 60 * 1000}", > >>>>> "auto.offset.reset" -> "latest", > >>>>> "enable.auto.commit" -> "false" > >>>>> ) > >>>>> > >>>>> def main(args: Array[String]) { > >>>>> > >>>>> def createStreamingContext(): StreamingContext = { > >>>>> > >>>>> val sparkConf = new SparkConf(true) > >>>>> .setAppName("Kafka Consumer Test") > >>>>> sparkConf.setMaster("local[*]") > >>>>> > >>>>> > >>>>> val ssc = new StreamingContext(sparkConf, > >>>>> Seconds(streaming_interval_seconds)) > >>>>> > >>>>> // AD REQUESTS > >>>>> // =========== > >>>>> val serializedAdRequestStream = createStream(ssc, > >>>>> topic_adrequest) > >>>>> > >>>>> val adRequestStream = > >>>>> deserializeStream(serializedAdRequestStream, avroSchemaAdRequest, > record => > >>>>> AdRequestLog(record)).cache() > >>>>> adRequestStream.print(10) > >>>>> > >>>>> // VIEWS > >>>>> // ====== > >>>>> > >>>>> val serializedViewStream = createStream(ssc, topic_view) > >>>>> val viewStream = deserializeStream(serializedViewStream, > >>>>> avroSchemaEvent, record => Event(record, EventType.View)).cache() > >>>>> viewStream.print(10) > >>>>> > >>>>> > >>>>> // CLICKS > >>>>> // ====== > >>>>> val serializedClickStream = createStream(ssc, topic_click) > >>>>> val clickEventStream = deserializeStream( > serializedClickStream, > >>>>> avroSchemaEvent, record => Event(record, EventType.Click)).cache() > >>>>> clickEventStream.print(10) > >>>>> > >>>>> ssc > >>>>> } > >>>>> > >>>>> val streamingContext = createStreamingContext > >>>>> streamingContext.start() > >>>>> streamingContext.awaitTermination() > >>>>> } > >>>>> > >>>>> def createStream(ssc: StreamingContext, topic: String): > >>>>> InputDStream[ConsumerRecord[String, Bytes]] = { > >>>>> KafkaUtils.createDirectStream[String, Bytes](ssc, > >>>>> LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[ > String, > >>>>> Bytes](Set(topic), kafkaParams)) > >>>>> } > >>>>> > >>>>> def deserializeStream[EventType: ClassTag]( > serializedAdRequestStream: > >>>>> InputDStream[ConsumerRecord[String, Bytes]], avroSchema: String, > >>>>> recordMapper: GenericRecord => EventType): DStream[EventType] = { > >>>>> serializedAdRequestStream.mapPartitions { > >>>>> iteratorOfMessages => > >>>>> val schema: Schema = new Schema.Parser().parse(avroSchema) > >>>>> val recordInjection = GenericAvroCodecs.toBinary(schema) > >>>>> iteratorOfMessages.map(message => { > >>>>> recordInjection.invert(message.value().get()) > >>>>> > >>>>> }).filter(_.isSuccess).map(_.get.asInstanceOf[GenericRecord]).map( > recordMapper) > >>>>> } > >>>>> } > >>>>> } > >>>>> > >>>>> > >>>>> 2016-10-10 17:42 GMT+02:00 Matthias Niehoff > >>>>> <matthias.nieh...@codecentric.de>: > >>>>>> > >>>>>> Yes, without commiting the data the consumer rebalances. > >>>>>> The job consumes 3 streams process them. When consuming only one > >>>>>> stream it runs fine. But when consuming three streams, even without > joining > >>>>>> them, just deserialize the payload and trigger an output action it > fails. > >>>>>> > >>>>>> I will prepare code sample. > >>>>>> > >>>>>> 2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >>>>>>> > >>>>>>> OK, so at this point, even without involving commitAsync, you're > >>>>>>> seeing consumer rebalances after a particular batch takes longer > than > >>>>>>> the session timeout? > >>>>>>> > >>>>>>> Do you have a minimal code example you can share? > >>>>>>> > >>>>>>> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff > >>>>>>> <matthias.nieh...@codecentric.de> wrote: > >>>>>>> > Hi, > >>>>>>> > sry for the late reply. A public holiday in Germany. > >>>>>>> > > >>>>>>> > Yes, its using a unique group id which no other job or consumer > >>>>>>> > group is > >>>>>>> > using. I have increased the session.timeout to 1 minutes and set > >>>>>>> > the > >>>>>>> > max.poll.rate to 1000. The processing takes ~1 second. > >>>>>>> > > >>>>>>> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >>>>>>> >> > >>>>>>> >> Well, I'd start at the first thing suggested by the error, > namely > >>>>>>> >> that > >>>>>>> >> the group has rebalanced. > >>>>>>> >> > >>>>>>> >> Is that stream using a unique group id? > >>>>>>> >> > >>>>>>> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff > >>>>>>> >> <matthias.nieh...@codecentric.de> wrote: > >>>>>>> >> > Hi, > >>>>>>> >> > > >>>>>>> >> > the stacktrace: > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.CommitFailedException: > Commit > >>>>>>> >> > cannot > >>>>>>> >> > be > >>>>>>> >> > completed since the group has already rebalanced and assigned > >>>>>>> >> > the > >>>>>>> >> > partitions > >>>>>>> >> > to another member. This means that the time between subsequent > >>>>>>> >> > calls to > >>>>>>> >> > poll() was longer than the configured session.timeout.ms, > which > >>>>>>> >> > typically > >>>>>>> >> > implies that the poll loop is spending too much time message > >>>>>>> >> > processing. > >>>>>>> >> > You > >>>>>>> >> > can address this either by increasing the session timeout or > by > >>>>>>> >> > reducing > >>>>>>> >> > the > >>>>>>> >> > maximum size of batches returned in poll() with > >>>>>>> >> > max.poll.records. > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator$OffsetCommitResponseHandler. > handle(ConsumerCoordinator.java:578) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator$OffsetCommitResponseHandler. > handle(ConsumerCoordinator.java:519) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > AbstractCoordinator$CoordinatorResponseHandler. > onSuccess(AbstractCoordinator.java:679) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > AbstractCoordinator$CoordinatorResponseHandler. > onSuccess(AbstractCoordinator.java:658) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > RequestFuture$1.onSuccess(RequestFuture.java:167) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > RequestFuture.fireSuccess(RequestFuture.java:133) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > RequestFuture.complete(RequestFuture.java:107) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete( > ConsumerNetworkClient.java:426) > >>>>>>> >> > at > >>>>>>> >> > org.apache.kafka.clients.NetworkClient.poll( > NetworkClient.java:278) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:998) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:937) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream. > latestOffsets(DirectKafkaInputDStream.scala:169) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream. > compute(DirectKafkaInputDStream.scala:196) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > >>>>>>> >> > at > >>>>>>> >> > scala.util.DynamicVariable.withValue(DynamicVariable. > scala:58) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream. > createRDDWithLocalProperties(DStream.scala:415) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:335) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:333) > >>>>>>> >> > at scala.Option.orElse(Option.scala:289) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute( > DStream.scala:330) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.MapPartitionedDStream. > compute(MapPartitionedDStream.scala:37) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > >>>>>>> >> > at > >>>>>>> >> > scala.util.DynamicVariable.withValue(DynamicVariable. > scala:58) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream. > createRDDWithLocalProperties(DStream.scala:415) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:335) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:333) > >>>>>>> >> > at scala.Option.orElse(Option.scala:289) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute( > DStream.scala:330) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.dstream.ForEachDStream. > generateJob(ForEachDStream.scala:48) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply( > DStreamGraph.scala:117) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply( > DStreamGraph.scala:116) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply( > TraversableLike.scala:241) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply( > TraversableLike.scala:241) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > >>>>>>> >> > at > >>>>>>> >> > scala.collection.mutable.ArrayBuffer.foreach( > ArrayBuffer.scala:48) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > scala.collection.TraversableLike$class.flatMap( > TraversableLike.scala:241) > >>>>>>> >> > at > >>>>>>> >> > scala.collection.AbstractTraversable.flatMap( > Traversable.scala:104) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.DStreamGraph.generateJobs( > DStreamGraph.scala:116) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$ > anonfun$3.apply(JobGenerator.scala:248) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$ > anonfun$3.apply(JobGenerator.scala:246) > >>>>>>> >> > at scala.util.Try$.apply(Try.scala:192) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator. > generateJobs(JobGenerator.scala:246) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.org$ > apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator. > scala:182) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$ > 1.onReceive(JobGenerator.scala:88) > >>>>>>> >> > at > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$ > 1.onReceive(JobGenerator.scala:87) > >>>>>>> >> > at > >>>>>>> >> > org.apache.spark.util.EventLoop$$anon$1.run( > EventLoop.scala:48) > >>>>>>> >> > > >>>>>>> >> > But it seems like the commit is not the actual problem. The > job > >>>>>>> >> > also > >>>>>>> >> > falls > >>>>>>> >> > behind if I do not commit the offsets. The delay would be ok > if > >>>>>>> >> > the > >>>>>>> >> > processing time was bigger than the batch size, but thats not > >>>>>>> >> > the case > >>>>>>> >> > in > >>>>>>> >> > any of the microbatches. Imho for some reason one of the > >>>>>>> >> > microbatches > >>>>>>> >> > falls > >>>>>>> >> > behind more than session.timeout.ms. Then the consumer we > >>>>>>> >> > regroup which > >>>>>>> >> > takes about 1 minute (see timestamps below). Know begins a > >>>>>>> >> > circle of > >>>>>>> >> > slow > >>>>>>> >> > batches each triggering a consumer regroup. Would this be > >>>>>>> >> > possible? > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s > for > >>>>>>> >> > time > >>>>>>> >> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34 > >>>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully > joined > >>>>>>> >> > group > >>>>>>> >> > spark_aggregation_job-kafka010 with generation 6 > >>>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly > >>>>>>> >> > assigned > >>>>>>> >> > partitions [sapxm.adserving.log.ad_request-0, > >>>>>>> >> > sapxm.adserving.log.ad_request-2, > >>>>>>> >> > sapxm.adserving.log.ad_request-1, > >>>>>>> >> > sapxm.adserving.log.ad_request-4, > >>>>>>> >> > sapxm.adserving.log.ad_request-3, > >>>>>>> >> > sapxm.adserving.log.ad_request-6, > >>>>>>> >> > sapxm.adserving.log.ad_request-5, > >>>>>>> >> > sapxm.adserving.log.ad_request-8, > >>>>>>> >> > sapxm.adserving.log.ad_request-7, > >>>>>>> >> > sapxm.adserving.log.ad_request-9] for group > >>>>>>> >> > spark_aggregation_job-kafka010 > >>>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking > previously > >>>>>>> >> > assigned > >>>>>>> >> > partitions [sapxm.adserving.log.view-3, > >>>>>>> >> > sapxm.adserving.log.view-4, > >>>>>>> >> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2, > >>>>>>> >> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9, > >>>>>>> >> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8, > >>>>>>> >> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for > >>>>>>> >> > group > >>>>>>> >> > spark_aggregation_job-kafka010 > >>>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group > >>>>>>> >> > spark_aggregation_job-kafka010 > >>>>>>> >> > > >>>>>>> >> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org > >: > >>>>>>> >> >> > >>>>>>> >> >> What's the actual stacktrace / exception you're getting > related > >>>>>>> >> >> to > >>>>>>> >> >> commit failure? > >>>>>>> >> >> > >>>>>>> >> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff > >>>>>>> >> >> <matthias.nieh...@codecentric.de> wrote: > >>>>>>> >> >> > Hi everybody, > >>>>>>> >> >> > > >>>>>>> >> >> > i am using the new Kafka Receiver for Spark Streaming for > my > >>>>>>> >> >> > Job. > >>>>>>> >> >> > When > >>>>>>> >> >> > running with old consumer it runs fine. > >>>>>>> >> >> > > >>>>>>> >> >> > The Job consumes 3 Topics, saves the data to Cassandra, > >>>>>>> >> >> > cogroups the > >>>>>>> >> >> > topic, > >>>>>>> >> >> > calls mapWithState and stores the results in cassandra. > After > >>>>>>> >> >> > that I > >>>>>>> >> >> > manually commit the Kafka offsets using the commitAsync > >>>>>>> >> >> > method of the > >>>>>>> >> >> > KafkaDStream. > >>>>>>> >> >> > > >>>>>>> >> >> > With the new consumer I experience the following problem: > >>>>>>> >> >> > > >>>>>>> >> >> > After a certain amount of time (about 4-5 minutes, might be > >>>>>>> >> >> > more or > >>>>>>> >> >> > less) > >>>>>>> >> >> > there are exceptions that the offset commit failed. The > >>>>>>> >> >> > processing > >>>>>>> >> >> > takes > >>>>>>> >> >> > less than the batch interval. I also adjusted the > >>>>>>> >> >> > session.timeout and > >>>>>>> >> >> > request.timeout as well as the max.poll.records setting > which > >>>>>>> >> >> > did not > >>>>>>> >> >> > help. > >>>>>>> >> >> > > >>>>>>> >> >> > After the first offset commit failed the time it takes from > >>>>>>> >> >> > kafka > >>>>>>> >> >> > until > >>>>>>> >> >> > the > >>>>>>> >> >> > microbatch is started increases, the processing time is > >>>>>>> >> >> > constantly > >>>>>>> >> >> > below > >>>>>>> >> >> > the > >>>>>>> >> >> > batch interval. Moreover further offset commits also fail > and > >>>>>>> >> >> > as > >>>>>>> >> >> > result > >>>>>>> >> >> > the > >>>>>>> >> >> > delay time builds up. > >>>>>>> >> >> > > >>>>>>> >> >> > Has anybody made this experience as well? > >>>>>>> >> >> > > >>>>>>> >> >> > Thank you > >>>>>>> >> >> > > >>>>>>> >> >> > Relevant Kafka Parameters: > >>>>>>> >> >> > > >>>>>>> >> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}", > >>>>>>> >> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}", > >>>>>>> >> >> > "auto.offset.reset" -> "largest", > >>>>>>> >> >> > "enable.auto.commit" -> "false", > >>>>>>> >> >> > "max.poll.records" -> "1000" > >>>>>>> >> >> > > >>>>>>> >> >> > > >>>>>>> >> >> > > >>>>>>> >> >> > -- > >>>>>>> >> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory > | > >>>>>>> >> >> > Consulting > >>>>>>> >> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | > >>>>>>> >> >> > Deutschland > >>>>>>> >> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | > >>>>>>> >> >> > mobil: +49 > >>>>>>> >> >> > (0) > >>>>>>> >> >> > 172.1702676 > >>>>>>> >> >> > www.codecentric.de | blog.codecentric.de | > >>>>>>> >> >> > www.meettheexperts.de | > >>>>>>> >> >> > www.more4fi.de > >>>>>>> >> >> > > >>>>>>> >> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht > >>>>>>> >> >> > Wuppertal > >>>>>>> >> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer > Vehns > >>>>>>> >> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus > Jäger . > >>>>>>> >> >> > Jürgen > >>>>>>> >> >> > Schütz > >>>>>>> >> >> > > >>>>>>> >> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien > enthält > >>>>>>> >> >> > vertrauliche > >>>>>>> >> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht > >>>>>>> >> >> > der > >>>>>>> >> >> > richtige > >>>>>>> >> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, > >>>>>>> >> >> > informieren > >>>>>>> >> >> > Sie > >>>>>>> >> >> > bitte sofort den Absender und löschen Sie diese E-Mail und > >>>>>>> >> >> > evtl. > >>>>>>> >> >> > beigefügter > >>>>>>> >> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder > Öffnen > >>>>>>> >> >> > evtl. > >>>>>>> >> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser > >>>>>>> >> >> > E-Mail ist > >>>>>>> >> >> > nicht > >>>>>>> >> >> > gestattet > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > -- > >>>>>>> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory | > >>>>>>> >> > Consulting > >>>>>>> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > >>>>>>> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | > mobil: > >>>>>>> >> > +49 (0) > >>>>>>> >> > 172.1702676 > >>>>>>> >> > www.codecentric.de | blog.codecentric.de | > www.meettheexperts.de > >>>>>>> >> > | > >>>>>>> >> > www.more4fi.de > >>>>>>> >> > > >>>>>>> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht > >>>>>>> >> > Wuppertal > >>>>>>> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > >>>>>>> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . > >>>>>>> >> > Jürgen > >>>>>>> >> > Schütz > >>>>>>> >> > > >>>>>>> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält > >>>>>>> >> > vertrauliche > >>>>>>> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht > der > >>>>>>> >> > richtige > >>>>>>> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, > >>>>>>> >> > informieren > >>>>>>> >> > Sie > >>>>>>> >> > bitte sofort den Absender und löschen Sie diese E-Mail und > evtl. > >>>>>>> >> > beigefügter > >>>>>>> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen > >>>>>>> >> > evtl. > >>>>>>> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser > E-Mail > >>>>>>> >> > ist > >>>>>>> >> > nicht > >>>>>>> >> > gestattet > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > -- > >>>>>>> > Matthias Niehoff | IT-Consultant | Agile Software Factory | > >>>>>>> > Consulting > >>>>>>> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > >>>>>>> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: > +49 > >>>>>>> > (0) > >>>>>>> > 172.1702676 > >>>>>>> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de > | > >>>>>>> > www.more4fi.de > >>>>>>> > > >>>>>>> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht > Wuppertal > >>>>>>> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > >>>>>>> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . > >>>>>>> > Jürgen Schütz > >>>>>>> > > >>>>>>> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält > >>>>>>> > vertrauliche > >>>>>>> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der > >>>>>>> > richtige > >>>>>>> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, > >>>>>>> > informieren Sie > >>>>>>> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. > >>>>>>> > beigefügter > >>>>>>> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen > evtl. > >>>>>>> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail > >>>>>>> > ist nicht > >>>>>>> > gestattet > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory | > >>>>>> Consulting > >>>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > >>>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 > (0) > >>>>>> 172.1702676 > >>>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > >>>>>> www.more4fi.de > >>>>>> > >>>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > >>>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > >>>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen > >>>>>> Schütz > >>>>>> > >>>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält > >>>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie > nicht der > >>>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, > >>>>>> informieren Sie bitte sofort den Absender und löschen Sie diese > E-Mail und > >>>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen > oder > >>>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe > dieser > >>>>>> E-Mail ist nicht gestattet > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory | > Consulting > >>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > >>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 > (0) > >>>>> 172.1702676 > >>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > >>>>> www.more4fi.de > >>>>> > >>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > >>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > >>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen > >>>>> Schütz > >>>>> > >>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält > >>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie > nicht der > >>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, > >>>>> informieren Sie bitte sofort den Absender und löschen Sie diese > E-Mail und > >>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen > oder > >>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe > dieser > >>>>> E-Mail ist nicht gestattet > >>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> Matthias Niehoff | IT-Consultant | Agile Software Factory | > Consulting > >>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > >>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 > (0) > >>>> 172.1702676 > >>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > >>>> www.more4fi.de > >>>> > >>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > >>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > >>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen > >>>> Schütz > >>>> > >>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält > >>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie > nicht der > >>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, > >>>> informieren Sie bitte sofort den Absender und löschen Sie diese > E-Mail und > >>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen > oder > >>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser > >>>> E-Mail ist nicht gestattet > >> > >> > >> > >> > >> -- > >> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting > >> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > >> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) > >> 172.1702676 > >> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > >> www.more4fi.de > >> > >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > >> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen > >> Schütz > >> > >> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält > vertrauliche > >> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige > >> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren > Sie > >> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. > beigefügter > >> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. > >> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist > nicht > >> gestattet > >> > >> > > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet