That's a good point... the dstreams package is still on 10.0.1 though. I'll make a ticket to update it.
On Fri, Oct 21, 2016 at 1:02 PM, Srikanth <srikanth...@gmail.com> wrote: > Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms > & max.poll.interval.ms can be set differently. > I'll leave it to you on how to add this to docs! > > > On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger <c...@koeninger.org> wrote: >> >> Right on, I put in a PR to make a note of that in the docs. >> >> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth <srikanth...@gmail.com> wrote: >> > Yeah, setting those params helped. >> > >> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <c...@koeninger.org> >> > wrote: >> >> >> >> 60 seconds for a batch is above the default settings in kafka related >> >> to heartbeat timeouts, so that might be related. Have you tried >> >> tweaking session.timeout.ms, heartbeat.interval.ms, or related >> >> configs? >> >> >> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <srikanth...@gmail.com> >> >> wrote: >> >> > Bringing this thread back as I'm seeing this exception on a >> >> > production >> >> > kafka >> >> > cluster. >> >> > >> >> > I have two Spark streaming apps reading the same topic. App1 has >> >> > batch >> >> > interval 2secs and app2 has 60secs. >> >> > Both apps are running on the same cluster on similar hardware. I see >> >> > this >> >> > exception only in app2 and fairly consistently. >> >> > >> >> > Difference I see between the apps is >> >> > App1 >> >> > spark.streaming.kafka.maxRatePerPartition, 6000 >> >> > batch interval 2 secs >> >> > App2 >> >> > spark.streaming.kafka.maxRatePerPartition, 10000 >> >> > batch interval 60 secs >> >> > >> >> > All other kafka/spark related configs are same for both apps. >> >> > spark.streaming.kafka.consumer.poll.ms = 4096 >> >> > spark.streaming.backpressure.enabled = true >> >> > >> >> > Not sure if pre-fetching or caching is messing things up. >> >> > >> >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 >> >> > (TID >> >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: >> >> > assertion >> >> > failed: Failed to get records for >> >> > spark-executor-StreamingEventSplitProd >> >> > mt_event 6 49091480 after polling for 4096 >> >> > at scala.Predef$.assert(Predef.scala:170) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >> >> > at >> >> > scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> >> > at >> >> > scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> >> > at >> >> > scala.collection.Iterator$$anon$21.next(Iterator.scala:838) >> >> > >> >> > >> >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> >> >> > wrote: >> >> >> >> >> >> That's not what I would have expected to happen with a lower cache >> >> >> setting, but in general disabling the cache isn't something you want >> >> >> to do with the new kafka consumer. >> >> >> >> >> >> >> >> >> As far as the original issue, are you seeing those polling errors >> >> >> intermittently, or consistently? From your description, it sounds >> >> >> like retry is working correctly. >> >> >> >> >> >> >> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> >> >> >> wrote: >> >> >> > Setting those two results in below exception. >> >> >> > No.of executors < no.of partitions. Could that be triggering this? >> >> >> > >> >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage >> >> >> > 2.0 >> >> >> > (TID 9) >> >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not >> >> >> > safe >> >> >> > for >> >> >> > multi-threaded access >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) >> >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) >> >> >> > at java.util.HashMap.putVal(Unknown Source) >> >> >> > at java.util.HashMap.put(Unknown Source) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:210) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185) >> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) >> >> >> > at org.apache.spark.scheduler.Task.run(Task.scala:85) >> >> >> > at >> >> >> > >> >> >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >> >> >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >> >> >> > Source) >> >> >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >> >> >> > Source) >> >> >> > at java.lang.Thread.run(Unknown Source) >> >> >> > >> >> >> > >> >> >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger >> >> >> > <c...@koeninger.org> >> >> >> > wrote: >> >> >> >> >> >> >> >> you could try setting >> >> >> >> >> >> >> >> spark.streaming.kafka.consumer.cache.initialCapacity >> >> >> >> >> >> >> >> spark.streaming.kafka.consumer.cache.maxCapacity >> >> >> >> >> >> >> >> to 1 >> >> >> >> >> >> >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> >> >> >> >> wrote: >> >> >> >> > I had a look at the executor logs and noticed that this >> >> >> >> > exception >> >> >> >> > happens >> >> >> >> > only when using the cached consumer. >> >> >> >> > Every retry is successful. This is consistent. >> >> >> >> > One possibility is that the cached consumer is causing the >> >> >> >> > failure >> >> >> >> > as >> >> >> >> > retry >> >> >> >> > clears it. >> >> >> >> > Is there a way to disable cache and test this? >> >> >> >> > Again, kafkacat is running fine on the same node. >> >> >> >> > >> >> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage >> >> >> >> > 138.0 >> >> >> >> > (TID >> >> >> >> > 7849) >> >> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage >> >> >> >> > 138.0 >> >> >> >> > (TID >> >> >> >> > 7851 >> >> >> >> > >> >> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, >> >> >> >> > partition >> >> >> >> > 2 >> >> >> >> > offsets 57079162 -> 57090330 >> >> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, >> >> >> >> > partition >> >> >> >> > 0 >> >> >> >> > offsets 57098866 -> 57109957 >> >> >> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage >> >> >> >> > 138.0 >> >> >> >> > (TID >> >> >> >> > 7851). 1030 bytes result sent to driver >> >> >> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in >> >> >> >> > stage >> >> >> >> > 138.0 >> >> >> >> > (TID >> >> >> >> > 7849) >> >> >> >> > java.lang.AssertionError: assertion failed: Failed to get >> >> >> >> > records >> >> >> >> > for >> >> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after >> >> >> >> > polling >> >> >> >> > for >> >> >> >> > 2048 >> >> >> >> > at scala.Predef$.assert(Predef.scala:170) >> >> >> >> > at >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >> >> >> >> > at >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >> >> >> >> > at >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >> >> >> >> > >> >> >> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got >> >> >> >> > assigned >> >> >> >> > task >> >> >> >> > 7854 >> >> >> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage >> >> >> >> > 138.0 >> >> >> >> > (TID >> >> >> >> > 7854) >> >> >> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, >> >> >> >> > partition >> >> >> >> > 0 >> >> >> >> > offsets 57098866 -> 57109957 >> >> >> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for >> >> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866 >> >> >> >> > >> >> >> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage >> >> >> >> > 138.0 >> >> >> >> > (TID >> >> >> >> > 7854). 1103 bytes result sent to driver >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth >> >> >> >> > <srikanth...@gmail.com> >> >> >> >> > wrote: >> >> >> >> >> >> >> >> >> >> Thanks Cody. Setting poll timeout helped. >> >> >> >> >> Our network is fine but brokers are not fully provisioned in >> >> >> >> >> test >> >> >> >> >> cluster. >> >> >> >> >> But there isn't enough load to max out on broker capacity. >> >> >> >> >> Curious that kafkacat running on the same node doesn't have >> >> >> >> >> any >> >> >> >> >> issues. >> >> >> >> >> >> >> >> >> >> Srikanth >> >> >> >> >> >> >> >> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger >> >> >> >> >> <c...@koeninger.org> >> >> >> >> >> wrote: >> >> >> >> >>> >> >> >> >> >>> You can set that poll timeout higher with >> >> >> >> >>> >> >> >> >> >>> spark.streaming.kafka.consumer.poll.ms >> >> >> >> >>> >> >> >> >> >>> but half a second is fairly generous. I'd try to take a look >> >> >> >> >>> at >> >> >> >> >>> what's going on with your network or kafka broker during that >> >> >> >> >>> time. >> >> >> >> >>> >> >> >> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth >> >> >> >> >>> <srikanth...@gmail.com> >> >> >> >> >>> wrote: >> >> >> >> >>> > Hello, >> >> >> >> >>> > >> >> >> >> >>> > I'm getting the below exception when testing Spark 2.0 with >> >> >> >> >>> > Kafka >> >> >> >> >>> > 0.10. >> >> >> >> >>> > >> >> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : >> >> >> >> >>> >> 0.10.0.0 >> >> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : >> >> >> >> >>> >> b8642491e78c5a13 >> >> >> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch >> >> >> >> >>> >> for >> >> >> >> >>> >> spark-executor-example mt_event 0 15782114 >> >> >> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered >> >> >> >> >>> >> coordinator >> >> >> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group >> >> >> >> >>> >> spark-executor-example. >> >> >> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in >> >> >> >> >>> >> stage >> >> >> >> >>> >> 1.0 >> >> >> >> >>> >> (TID >> >> >> >> >>> >> 6) >> >> >> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get >> >> >> >> >>> >> records >> >> >> >> >>> >> for >> >> >> >> >>> >> spark-executor-example mt_event 0 15782114 after polling >> >> >> >> >>> >> for >> >> >> >> >>> >> 512 >> >> >> >> >>> >> at scala.Predef$.assert(Predef.scala:170) >> >> >> >> >>> >> at >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >> >> >> >> >>> >> at >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >> >> >> >> >>> >> at >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >> >> >> >> >>> >> at >> >> >> >> >>> >> >> >> >> >> >>> >> scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> >> >> >> >>> > >> >> >> >> >>> > >> >> >> >> >>> > I get this error intermittently. Sometimes a few batches >> >> >> >> >>> > are >> >> >> >> >>> > scheduled >> >> >> >> >>> > and >> >> >> >> >>> > run fine. Then I get this error. >> >> >> >> >>> > kafkacat is able to fetch from this topic continuously. >> >> >> >> >>> > >> >> >> >> >>> > Full exception is here -- >> >> >> >> >>> > >> >> >> >> >>> > >> >> >> >> >>> > >> >> >> >> >>> > >> >> >> >> >>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 >> >> >> >> >>> > >> >> >> >> >>> > Srikanth >> >> >> >> >> >> >> >> >> >> >> >> >> >> > >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org