good point, I changed the group id to be unique for the separate streams
and now it works. Thanks!

Although changing this is ok for us, i am interested in the why :-) With
the old connector this was not a problem nor is it afaik with the pure
kafka consumer api

2016-10-11 14:30 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> Just out of curiosity, have you tried using separate group ids for the
> separate streams?
>
> On Oct 11, 2016 4:46 AM, "Matthias Niehoff" <matthias.niehoff@codecentric.
> de> wrote:
>
>> I stripped down the job to just consume the stream and print it, without
>> avro deserialization. When I only consume one topic, everything is fine. As
>> soon as I add a second stream it stucks after about 5 minutes. So I
>> basically bails down to:
>>
>>
>>   val kafkaParams = Map[String, String](
>>     "bootstrap.servers" -> kafkaBrokers,
>>     "group.id" -> group,
>>     "key.deserializer" -> classOf[StringDeserializer].getName,
>>     "value.deserializer" -> classOf[BytesDeserializer].getName,
>>     "session.timeout.ms" -> s"${1 * 60 * 1000}",
>>     "request.timeout.ms" -> s"${2 * 60 * 1000}",
>>     "auto.offset.reset" -> "latest",
>>     "enable.auto.commit" -> "false"
>>   )
>>
>>   def main(args: Array[String]) {
>>
>>     def createStreamingContext(): StreamingContext = {
>>
>>       val sparkConf = new SparkConf(true)
>>         .setAppName("Kafka Consumer Test")
>>       sparkConf.setMaster("local[*]")
>>
>>       val ssc = new StreamingContext(sparkConf, 
>> Seconds(streaming_interval_seconds))
>>
>>       // AD REQUESTS
>>       // ===========
>>       val serializedAdRequestStream = createStream(ssc, topic_adrequest)
>>       serializedAdRequestStream.map(record => record.value().get()).print(10)
>>
>>       // VIEWS
>>       // ======
>>       val serializedViewStream = createStream(ssc, topic_view)
>>       serializedViewStream.map(record => record.value().get()).print(10)
>>
>> //      // CLICKS
>> //      // ======
>> //      val serializedClickStream = createStream(ssc, topic_click)
>> //      serializedClickStream.map(record => record.value().get()).print(10)
>>
>>       ssc
>>     }
>>
>>     val streamingContext = createStreamingContext
>>     streamingContext.start()
>>     streamingContext.awaitTermination()
>>   }
>>
>>
>> And in the logs you see:
>>
>>
>> 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job 
>> 1476100944000 ms.2 from job set of time 1476100944000 ms*16/10/10 14:02:26 
>> *INFO JobScheduler: Total delay: 2,314 s for time 1476100944000 ms 
>> (execution: 0,698 s)*16/10/10 14:03:26 *INFO JobScheduler: Added jobs for 
>> time 1476100946000 ms
>> 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from persistence 
>> list
>> 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job 
>> 1476100946000 ms.0 from job set of time 1476100946000 ms
>>
>>
>> 2016-10-11 9:28 GMT+02:00 Matthias Niehoff <matthias.niehoff@codecentric.
>> de>:
>>
>>> This Job will fail after about 5 minutes:
>>>
>>>
>>> object SparkJobMinimal {
>>>
>>>   //read Avro schemas
>>>   var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc")
>>>   val avroSchemaAdRequest = 
>>> scala.io.Source.fromInputStream(stream).getLines.mkString
>>>   stream.close
>>>   stream = getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc")
>>>   val avroSchemaEvent = 
>>> scala.io.Source.fromInputStream(stream).getLines.mkString
>>>   stream.close
>>>
>>>
>>>   val kafkaBrokers = "broker-0.kafka.mesos:9092"
>>>
>>>   val topic_adrequest = "adserving.log.ad_request"
>>>   val topic_view = "adserving.log.view"
>>>   val topic_click = "adserving.log.click"
>>>   val group = UUID.randomUUID().toString
>>>   val streaming_interval_seconds = 2
>>>
>>>   val kafkaParams = Map[String, String](
>>>     "bootstrap.servers" -> kafkaBrokers,
>>>     "group.id" -> group,
>>>     "key.deserializer" -> classOf[StringDeserializer].getName,
>>>     "value.deserializer" -> classOf[BytesDeserializer].getName,
>>>     "session.timeout.ms" -> s"${1 * 60 * 1000}",
>>>     "request.timeout.ms" -> s"${2 * 60 * 1000}",
>>>     "auto.offset.reset" -> "latest",
>>>     "enable.auto.commit" -> "false"
>>>   )
>>>
>>>   def main(args: Array[String]) {
>>>
>>>     def createStreamingContext(): StreamingContext = {
>>>
>>>       val sparkConf = new SparkConf(true)
>>>         .setAppName("Kafka Consumer Test")
>>>       sparkConf.setMaster("local[*]")
>>>
>>>
>>>       val ssc = new StreamingContext(sparkConf, 
>>> Seconds(streaming_interval_seconds))
>>>
>>>       // AD REQUESTS
>>>       // ===========
>>>       val serializedAdRequestStream = createStream(ssc, topic_adrequest)
>>>
>>>       val adRequestStream = deserializeStream(serializedAdRequestStream, 
>>> avroSchemaAdRequest, record => AdRequestLog(record)).cache()
>>>       adRequestStream.print(10)
>>>
>>>       // VIEWS
>>>       // ======
>>>
>>>       val serializedViewStream = createStream(ssc, topic_view)
>>>       val viewStream = deserializeStream(serializedViewStream, 
>>> avroSchemaEvent, record => Event(record, EventType.View)).cache()
>>>       viewStream.print(10)
>>>
>>>
>>>       // CLICKS
>>>       // ======
>>>       val serializedClickStream = createStream(ssc, topic_click)
>>>       val clickEventStream = deserializeStream(serializedClickStream, 
>>> avroSchemaEvent, record => Event(record, EventType.Click)).cache()
>>>       clickEventStream.print(10)
>>>
>>>       ssc
>>>     }
>>>
>>>     val streamingContext = createStreamingContext
>>>     streamingContext.start()
>>>     streamingContext.awaitTermination()
>>>   }
>>>
>>>   def createStream(ssc: StreamingContext, topic: String): 
>>> InputDStream[ConsumerRecord[String, Bytes]] = {
>>>     KafkaUtils.createDirectStream[String, Bytes](ssc, 
>>> LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, 
>>> Bytes](Set(topic), kafkaParams))
>>>   }
>>>
>>>   def deserializeStream[EventType: ClassTag](serializedAdRequestStream: 
>>> InputDStream[ConsumerRecord[String, Bytes]], avroSchema: String, 
>>> recordMapper: GenericRecord => EventType): DStream[EventType] = {
>>>     serializedAdRequestStream.mapPartitions {
>>>       iteratorOfMessages =>
>>>         val schema: Schema = new Schema.Parser().parse(avroSchema)
>>>         val recordInjection = GenericAvroCodecs.toBinary(schema)
>>>         iteratorOfMessages.map(message => {
>>>           recordInjection.invert(message.value().get())
>>>         
>>> }).filter(_.isSuccess).map(_.get.asInstanceOf[GenericRecord]).map(recordMapper)
>>>     }
>>>   }
>>> }
>>>
>>>
>>> 2016-10-10 17:42 GMT+02:00 Matthias Niehoff <
>>> matthias.nieh...@codecentric.de>:
>>>
>>>> Yes, without commiting the data the consumer rebalances.
>>>> The job consumes 3 streams process them. When consuming only one stream
>>>> it runs fine. But when consuming three streams, even without joining them,
>>>> just deserialize the payload and trigger an output action it fails.
>>>>
>>>> I will prepare code sample.
>>>>
>>>> 2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>
>>>>> OK, so at this point, even without involving commitAsync, you're
>>>>> seeing consumer rebalances after a particular batch takes longer than
>>>>> the session timeout?
>>>>>
>>>>> Do you have a minimal code example you can share?
>>>>>
>>>>> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff
>>>>> <matthias.nieh...@codecentric.de> wrote:
>>>>> > Hi,
>>>>> > sry for the late reply. A public holiday in Germany.
>>>>> >
>>>>> > Yes, its using a unique group id which no other job or consumer
>>>>> group is
>>>>> > using. I have increased the session.timeout to 1 minutes and set the
>>>>> > max.poll.rate to 1000. The processing takes ~1 second.
>>>>> >
>>>>> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>> >>
>>>>> >> Well, I'd start at the first thing suggested by the error, namely
>>>>> that
>>>>> >> the group has rebalanced.
>>>>> >>
>>>>> >> Is that stream using a unique group id?
>>>>> >>
>>>>> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
>>>>> >> <matthias.nieh...@codecentric.de> wrote:
>>>>> >> > Hi,
>>>>> >> >
>>>>> >> > the stacktrace:
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>>> cannot
>>>>> >> > be
>>>>> >> > completed since the group has already rebalanced and assigned the
>>>>> >> > partitions
>>>>> >> > to another member. This means that the time between subsequent
>>>>> calls to
>>>>> >> > poll() was longer than the configured session.timeout.ms, which
>>>>> >> > typically
>>>>> >> > implies that the poll loop is spending too much time message
>>>>> processing.
>>>>> >> > You
>>>>> >> > can address this either by increasing the session timeout or by
>>>>> reducing
>>>>> >> > the
>>>>> >> > maximum size of batches returned in poll() with max.poll.records.
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordina
>>>>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordina
>>>>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.
>>>>> onSuccess(RequestFuture.java:167)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.fi
>>>>> reSuccess(RequestFuture.java:133)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.co
>>>>> mplete(RequestFuture.java:107)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>>>> lient$RequestFutureCompletionHandler.onComplete(ConsumerNetw
>>>>> orkClient.java:426)
>>>>> >> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.ja
>>>>> va:278)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>>>> lient.clientPoll(ConsumerNetworkClient.java:360)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>>>> lient.poll(ConsumerNetworkClient.java:224)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>>>> lient.poll(ConsumerNetworkClient.java:201)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(Kaf
>>>>> kaConsumer.java:998)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo
>>>>> nsumer.java:937)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
>>>>> latestOffsets(DirectKafkaInputDStream.scala:169)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
>>>>> compute(DirectKafkaInputDStream.scala:196)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>>>> >> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLoca
>>>>> lProperties(DStream.scala:415)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1.apply(DStream.scala:335)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1.apply(DStream.scala:333)
>>>>> >> > at scala.Option.orElse(Option.scala:289)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStr
>>>>> eam.scala:330)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.MapPartitionedDStream.com
>>>>> pute(MapPartitionedDStream.scala:37)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>>>> >> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLoca
>>>>> lProperties(DStream.scala:415)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1.apply(DStream.scala:335)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>>>>> pute$1.apply(DStream.scala:333)
>>>>> >> > at scala.Option.orElse(Option.scala:289)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStr
>>>>> eam.scala:330)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.dstream.ForEachDStream.generateJo
>>>>> b(ForEachDStream.scala:48)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DSt
>>>>> reamGraph.scala:117)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DSt
>>>>> reamGraph.scala:116)
>>>>> >> > at
>>>>> >> >
>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>> aversableLike.scala:241)
>>>>> >> > at
>>>>> >> >
>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>>>> aversableLike.scala:241)
>>>>> >> > at
>>>>> >> >
>>>>> >> > scala.collection.mutable.ResizableArray$class.foreach(Resiza
>>>>> bleArray.scala:59)
>>>>> >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.sca
>>>>> la:48)
>>>>> >> > at
>>>>> >> > scala.collection.TraversableLike$class.flatMap(TraversableLi
>>>>> ke.scala:241)
>>>>> >> > at scala.collection.AbstractTraversable.flatMap(Traversable.sca
>>>>> la:104)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStream
>>>>> Graph.scala:116)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3
>>>>> .apply(JobGenerator.scala:248)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3
>>>>> .apply(JobGenerator.scala:246)
>>>>> >> > at scala.util.Try$.apply(Try.scala:192)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.generateJo
>>>>> bs(JobGenerator.scala:246)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.org$apache
>>>>> $spark$streaming$scheduler$JobGenerator$$processEvent(JobGen
>>>>> erator.scala:182)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.on
>>>>> Receive(JobGenerator.scala:88)
>>>>> >> > at
>>>>> >> >
>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.on
>>>>> Receive(JobGenerator.scala:87)
>>>>> >> > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:
>>>>> 48)
>>>>> >> >
>>>>> >> > But it seems like the commit is not the actual problem. The job
>>>>> also
>>>>> >> > falls
>>>>> >> > behind if I do not commit the offsets. The delay would be ok if
>>>>> the
>>>>> >> > processing time was bigger than the batch size, but thats not the
>>>>> case
>>>>> >> > in
>>>>> >> > any of the microbatches. Imho for some reason one of the
>>>>> microbatches
>>>>> >> > falls
>>>>> >> > behind more than session.timeout.ms. Then the consumer we
>>>>> regroup which
>>>>> >> > takes about 1 minute (see timestamps below). Know begins a circle
>>>>> of
>>>>> >> > slow
>>>>> >> > batches each triggering a consumer regroup. Would this be
>>>>> possible?
>>>>> >> >
>>>>> >> >
>>>>> >> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for
>>>>> time
>>>>> >> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34
>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined
>>>>> group
>>>>> >> > spark_aggregation_job-kafka010 with generation 6
>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned
>>>>> >> > partitions [sapxm.adserving.log.ad_request-0,
>>>>> >> > sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request
>>>>> -1,
>>>>> >> > sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request
>>>>> -3,
>>>>> >> > sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request
>>>>> -5,
>>>>> >> > sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request
>>>>> -7,
>>>>> >> > sapxm.adserving.log.ad_request-9] for group
>>>>> >> > spark_aggregation_job-kafka010
>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously
>>>>> assigned
>>>>> >> > partitions [sapxm.adserving.log.view-3,
>>>>> sapxm.adserving.log.view-4,
>>>>> >> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2,
>>>>> >> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9,
>>>>> >> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8,
>>>>> >> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group
>>>>> >> > spark_aggregation_job-kafka010
>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group
>>>>> >> > spark_aggregation_job-kafka010
>>>>> >> >
>>>>> >> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>> >> >>
>>>>> >> >> What's the actual stacktrace / exception you're getting related
>>>>> to
>>>>> >> >> commit failure?
>>>>> >> >>
>>>>> >> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff
>>>>> >> >> <matthias.nieh...@codecentric.de> wrote:
>>>>> >> >> > Hi everybody,
>>>>> >> >> >
>>>>> >> >> > i am using the new Kafka Receiver for Spark Streaming for my
>>>>> Job.
>>>>> >> >> > When
>>>>> >> >> > running with old consumer it runs fine.
>>>>> >> >> >
>>>>> >> >> > The Job consumes 3 Topics, saves the data to Cassandra,
>>>>> cogroups the
>>>>> >> >> > topic,
>>>>> >> >> > calls mapWithState and stores the results in cassandra. After
>>>>> that I
>>>>> >> >> > manually commit the Kafka offsets using the commitAsync method
>>>>> of the
>>>>> >> >> > KafkaDStream.
>>>>> >> >> >
>>>>> >> >> > With the new consumer I experience the following problem:
>>>>> >> >> >
>>>>> >> >> > After a certain amount of time (about 4-5 minutes, might be
>>>>> more or
>>>>> >> >> > less)
>>>>> >> >> > there are exceptions that the offset commit failed. The
>>>>> processing
>>>>> >> >> > takes
>>>>> >> >> > less than the batch interval. I also adjusted the
>>>>> session.timeout and
>>>>> >> >> > request.timeout as well as the max.poll.records setting which
>>>>> did not
>>>>> >> >> > help.
>>>>> >> >> >
>>>>> >> >> > After the first offset commit failed the time it takes from
>>>>> kafka
>>>>> >> >> > until
>>>>> >> >> > the
>>>>> >> >> > microbatch is started increases, the processing time is
>>>>> constantly
>>>>> >> >> > below
>>>>> >> >> > the
>>>>> >> >> > batch interval. Moreover further offset commits also fail and
>>>>> as
>>>>> >> >> > result
>>>>> >> >> > the
>>>>> >> >> > delay time builds up.
>>>>> >> >> >
>>>>> >> >> > Has anybody made this experience as well?
>>>>> >> >> >
>>>>> >> >> > Thank you
>>>>> >> >> >
>>>>> >> >> > Relevant Kafka Parameters:
>>>>> >> >> >
>>>>> >> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}",
>>>>> >> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}",
>>>>> >> >> > "auto.offset.reset" -> "largest",
>>>>> >> >> > "enable.auto.commit" -> "false",
>>>>> >> >> > "max.poll.records" -> "1000"
>>>>> >> >> >
>>>>> >> >> >
>>>>> >> >> >
>>>>> >> >> > --
>>>>> >> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
>>>>> >> >> > Consulting
>>>>> >> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>>> >> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 |
>>>>> mobil: +49
>>>>> >> >> > (0)
>>>>> >> >> > 172.1702676
>>>>> >> >> > www.codecentric.de | blog.codecentric.de |
>>>>> www.meettheexperts.de |
>>>>> >> >> > www.more4fi.de
>>>>> >> >> >
>>>>> >> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht
>>>>> Wuppertal
>>>>> >> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>>> >> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
>>>>> Jürgen
>>>>> >> >> > Schütz
>>>>> >> >> >
>>>>> >> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>>> >> >> > vertrauliche
>>>>> >> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
>>>>> >> >> > richtige
>>>>> >> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>>> >> >> > informieren
>>>>> >> >> > Sie
>>>>> >> >> > bitte sofort den Absender und löschen Sie diese E-Mail und
>>>>> evtl.
>>>>> >> >> > beigefügter
>>>>> >> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
>>>>> evtl.
>>>>> >> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>>>> E-Mail ist
>>>>> >> >> > nicht
>>>>> >> >> > gestattet
>>>>> >> >
>>>>> >> >
>>>>> >> >
>>>>> >> >
>>>>> >> > --
>>>>> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
>>>>> Consulting
>>>>> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>>> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil:
>>>>> +49 (0)
>>>>> >> > 172.1702676
>>>>> >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
>>>>> |
>>>>> >> > www.more4fi.de
>>>>> >> >
>>>>> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>>> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>>> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
>>>>> Jürgen
>>>>> >> > Schütz
>>>>> >> >
>>>>> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>>> >> > vertrauliche
>>>>> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
>>>>> richtige
>>>>> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>>> informieren
>>>>> >> > Sie
>>>>> >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
>>>>> >> > beigefügter
>>>>> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
>>>>> evtl.
>>>>> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail
>>>>> ist
>>>>> >> > nicht
>>>>> >> > gestattet
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
>>>>> Consulting
>>>>> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>>> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
>>>>> (0)
>>>>> > 172.1702676
>>>>> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>>>> > www.more4fi.de
>>>>> >
>>>>> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>>> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>>> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>>>> Schütz
>>>>> >
>>>>> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>>> vertrauliche
>>>>> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
>>>>> richtige
>>>>> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>>> informieren Sie
>>>>> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
>>>>> beigefügter
>>>>> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
>>>>> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
>>>>> nicht
>>>>> > gestattet
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>>> 172.1702676
>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>>> www.more4fi.de
>>>>
>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>>> Schütz
>>>>
>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>>>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>>> E-Mail ist nicht gestattet
>>>>
>>>
>>>
>>>
>>> --
>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>> 172.1702676
>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>> www.more4fi.de
>>>
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>> Schütz
>>>
>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>> E-Mail ist nicht gestattet
>>>
>>
>>
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet

Reply via email to