Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms
& max.poll.interval.ms can be set differently.
I'll leave it to you on how to add this to docs!


On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Right on, I put in a PR to make a note of that in the docs.
>
> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth <srikanth...@gmail.com> wrote:
> > Yeah, setting those params helped.
> >
> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> 60 seconds for a batch is above the default settings in kafka related
> >> to heartbeat timeouts, so that might be related.  Have you tried
> >> tweaking session.timeout.ms, heartbeat.interval.ms, or related
> >> configs?
> >>
> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <srikanth...@gmail.com>
> wrote:
> >> > Bringing this thread back as I'm seeing this exception on a production
> >> > kafka
> >> > cluster.
> >> >
> >> > I have two Spark streaming apps reading the same topic. App1 has batch
> >> > interval 2secs and app2 has 60secs.
> >> > Both apps are running on the same cluster on similar hardware. I see
> >> > this
> >> > exception only in app2 and fairly consistently.
> >> >
> >> > Difference I see between the apps is
> >> > App1
> >> >       spark.streaming.kafka.maxRatePerPartition, 6000
> >> >       batch interval 2 secs
> >> > App2
> >> >       spark.streaming.kafka.maxRatePerPartition, 10000
> >> >       batch interval 60 secs
> >> >
> >> > All other kafka/spark related configs are same for both apps.
> >> >       spark.streaming.kafka.consumer.poll.ms = 4096
> >> >       spark.streaming.backpressure.enabled = true
> >> >
> >> > Not sure if pre-fetching or caching is messing things up.
> >> >
> >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0
> >> > (TID
> >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
> >> > assertion
> >> > failed: Failed to get records for spark-executor-
> StreamingEventSplitProd
> >> > mt_event 6 49091480 after polling for 4096
> >> >         at scala.Predef$.assert(Predef.scala:170)
> >> >         at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >         at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >> >         at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >> >         at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> >         at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> >         at scala.collection.Iterator$$anon$21.next(Iterator.scala:
> 838)
> >> >
> >> >
> >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> That's not what I would have expected to happen with a lower cache
> >> >> setting, but in general disabling the cache isn't something you want
> >> >> to do with the new kafka consumer.
> >> >>
> >> >>
> >> >> As far as the original issue, are you seeing those polling errors
> >> >> intermittently, or consistently?  From your description, it sounds
> >> >> like retry is working correctly.
> >> >>
> >> >>
> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com>
> wrote:
> >> >> > Setting those two results in below exception.
> >> >> > No.of executors < no.of partitions. Could that be triggering this?
> >> >> >
> >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage
> 2.0
> >> >> > (TID 9)
> >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not
> safe
> >> >> > for
> >> >> > multi-threaded access
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> >> >> > at java.util.HashMap.putVal(Unknown Source)
> >> >> > at java.util.HashMap.put(Unknown Source)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.<init>(KafkaRDD.scala:210)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> >> >> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> >> >> > at
> >> >> > org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
> >> >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> >> >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> >> >> > at java.lang.Thread.run(Unknown Source)
> >> >> >
> >> >> >
> >> >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org
> >
> >> >> > wrote:
> >> >> >>
> >> >> >> you could try setting
> >> >> >>
> >> >> >> spark.streaming.kafka.consumer.cache.initialCapacity
> >> >> >>
> >> >> >> spark.streaming.kafka.consumer.cache.maxCapacity
> >> >> >>
> >> >> >> to 1
> >> >> >>
> >> >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com>
> >> >> >> wrote:
> >> >> >> > I had a look at the executor logs and noticed that this
> exception
> >> >> >> > happens
> >> >> >> > only when using the cached consumer.
> >> >> >> > Every retry is successful. This is consistent.
> >> >> >> > One possibility is that the cached consumer is causing the
> failure
> >> >> >> > as
> >> >> >> > retry
> >> >> >> > clears it.
> >> >> >> > Is there a way to disable cache and test this?
> >> >> >> > Again, kafkacat is running fine on the same node.
> >> >> >> >
> >> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0
> >> >> >> > (TID
> >> >> >> > 7849)
> >> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0
> >> >> >> > (TID
> >> >> >> > 7851
> >> >> >> >
> >> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
> >> >> >> > partition
> >> >> >> > 2
> >> >> >> > offsets 57079162 -> 57090330
> >> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
> >> >> >> > partition
> >> >> >> > 0
> >> >> >> > offsets 57098866 -> 57109957
> >> >> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage
> 138.0
> >> >> >> > (TID
> >> >> >> > 7851). 1030 bytes result sent to driver
> >> >> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage
> >> >> >> > 138.0
> >> >> >> > (TID
> >> >> >> > 7849)
> >> >> >> > java.lang.AssertionError: assertion failed: Failed to get
> records
> >> >> >> > for
> >> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after
> >> >> >> > polling
> >> >> >> > for
> >> >> >> > 2048
> >> >> >> >       at scala.Predef$.assert(Predef.scala:170)
> >> >> >> >       at
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >> >> >       at
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:227)
> >> >> >> >       at
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:193)
> >> >> >> >
> >> >> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got
> assigned
> >> >> >> > task
> >> >> >> > 7854
> >> >> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0
> >> >> >> > (TID
> >> >> >> > 7854)
> >> >> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event,
> >> >> >> > partition
> >> >> >> > 0
> >> >> >> > offsets 57098866 -> 57109957
> >> >> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
> >> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
> >> >> >> >
> >> >> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage
> 138.0
> >> >> >> > (TID
> >> >> >> > 7854). 1103 bytes result sent to driver
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <
> srikanth...@gmail.com>
> >> >> >> > wrote:
> >> >> >> >>
> >> >> >> >> Thanks Cody. Setting poll timeout helped.
> >> >> >> >> Our network is fine but brokers are not fully provisioned in
> test
> >> >> >> >> cluster.
> >> >> >> >> But there isn't enough load to max out on broker capacity.
> >> >> >> >> Curious that kafkacat running on the same node doesn't have any
> >> >> >> >> issues.
> >> >> >> >>
> >> >> >> >> Srikanth
> >> >> >> >>
> >> >> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger
> >> >> >> >> <c...@koeninger.org>
> >> >> >> >> wrote:
> >> >> >> >>>
> >> >> >> >>> You can set that poll timeout higher with
> >> >> >> >>>
> >> >> >> >>> spark.streaming.kafka.consumer.poll.ms
> >> >> >> >>>
> >> >> >> >>> but half a second is fairly generous.  I'd try to take a look
> at
> >> >> >> >>> what's going on with your network or kafka broker during that
> >> >> >> >>> time.
> >> >> >> >>>
> >> >> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth
> >> >> >> >>> <srikanth...@gmail.com>
> >> >> >> >>> wrote:
> >> >> >> >>> > Hello,
> >> >> >> >>> >
> >> >> >> >>> > I'm getting the below exception when testing Spark 2.0 with
> >> >> >> >>> > Kafka
> >> >> >> >>> > 0.10.
> >> >> >> >>> >
> >> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version :
> >> >> >> >>> >> 0.10.0.0
> >> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
> >> >> >> >>> >> b8642491e78c5a13
> >> >> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch
> for
> >> >> >> >>> >> spark-executor-example mt_event 0 15782114
> >> >> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered
> >> >> >> >>> >> coordinator
> >> >> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
> >> >> >> >>> >> spark-executor-example.
> >> >> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in
> >> >> >> >>> >> stage
> >> >> >> >>> >> 1.0
> >> >> >> >>> >> (TID
> >> >> >> >>> >> 6)
> >> >> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get
> >> >> >> >>> >> records
> >> >> >> >>> >> for
> >> >> >> >>> >> spark-executor-example mt_event 0 15782114 after polling
> for
> >> >> >> >>> >> 512
> >> >> >> >>> >> at scala.Predef$.assert(Predef.scala:170)
> >> >> >> >>> >> at
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >> >> >>> >> at
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:227)
> >> >> >> >>> >> at
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:193)
> >> >> >> >>> >> at
> >> >> >> >>> >> scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> > I get this error intermittently. Sometimes a few batches are
> >> >> >> >>> > scheduled
> >> >> >> >>> > and
> >> >> >> >>> > run fine. Then I get this error.
> >> >> >> >>> > kafkacat is able to fetch from this topic continuously.
> >> >> >> >>> >
> >> >> >> >>> > Full exception is here --
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> > https://gist.github.com/SrikanthTati/
> c2e95c4ac689cd49aab817e24ec42767
> >> >> >> >>> >
> >> >> >> >>> > Srikanth
> >> >> >> >>
> >> >> >> >>
> >> >> >> >
> >> >> >
> >> >> >
> >> >
> >> >
> >
> >
>

Reply via email to