Yeah, setting those params helped. On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <c...@koeninger.org> wrote:
> 60 seconds for a batch is above the default settings in kafka related > to heartbeat timeouts, so that might be related. Have you tried > tweaking session.timeout.ms, heartbeat.interval.ms, or related > configs? > > On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <srikanth...@gmail.com> wrote: > > Bringing this thread back as I'm seeing this exception on a production > kafka > > cluster. > > > > I have two Spark streaming apps reading the same topic. App1 has batch > > interval 2secs and app2 has 60secs. > > Both apps are running on the same cluster on similar hardware. I see this > > exception only in app2 and fairly consistently. > > > > Difference I see between the apps is > > App1 > > spark.streaming.kafka.maxRatePerPartition, 6000 > > batch interval 2 secs > > App2 > > spark.streaming.kafka.maxRatePerPartition, 10000 > > batch interval 60 secs > > > > All other kafka/spark related configs are same for both apps. > > spark.streaming.kafka.consumer.poll.ms = 4096 > > spark.streaming.backpressure.enabled = true > > > > Not sure if pre-fetching or caching is messing things up. > > > > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID > > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: > assertion > > failed: Failed to get records for spark-executor-StreamingEventSplitProd > > mt_event 6 49091480 after polling for 4096 > > at scala.Predef$.assert(Predef.scala:170) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$21.next(Iterator.scala:838) > > > > > > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> That's not what I would have expected to happen with a lower cache > >> setting, but in general disabling the cache isn't something you want > >> to do with the new kafka consumer. > >> > >> > >> As far as the original issue, are you seeing those polling errors > >> intermittently, or consistently? From your description, it sounds > >> like retry is working correctly. > >> > >> > >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> wrote: > >> > Setting those two results in below exception. > >> > No.of executors < no.of partitions. Could that be triggering this? > >> > > >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 > >> > (TID 9) > >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe > for > >> > multi-threaded access > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1360) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > >> > at java.util.HashMap.putVal(Unknown Source) > >> > at java.util.HashMap.put(Unknown Source) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$. > get(CachedKafkaConsumer.scala:158) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>( > KafkaRDD.scala:210) > >> > at > >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute( > KafkaRDD.scala:185) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > > >> > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:79) > >> > at > >> > > >> > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:47) > >> > at org.apache.spark.scheduler.Task.run(Task.scala:85) > >> > at org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:274) > >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > >> > at java.lang.Thread.run(Unknown Source) > >> > > >> > > >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org> > >> > wrote: > >> >> > >> >> you could try setting > >> >> > >> >> spark.streaming.kafka.consumer.cache.initialCapacity > >> >> > >> >> spark.streaming.kafka.consumer.cache.maxCapacity > >> >> > >> >> to 1 > >> >> > >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> > wrote: > >> >> > I had a look at the executor logs and noticed that this exception > >> >> > happens > >> >> > only when using the cached consumer. > >> >> > Every retry is successful. This is consistent. > >> >> > One possibility is that the cached consumer is causing the failure > as > >> >> > retry > >> >> > clears it. > >> >> > Is there a way to disable cache and test this? > >> >> > Again, kafkacat is running fine on the same node. > >> >> > > >> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 > (TID > >> >> > 7849) > >> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 > (TID > >> >> > 7851 > >> >> > > >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, > partition > >> >> > 2 > >> >> > offsets 57079162 -> 57090330 > >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, > partition > >> >> > 0 > >> >> > offsets 57098866 -> 57109957 > >> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 > >> >> > (TID > >> >> > 7851). 1030 bytes result sent to driver > >> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage > >> >> > 138.0 > >> >> > (TID > >> >> > 7849) > >> >> > java.lang.AssertionError: assertion failed: Failed to get records > for > >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after > polling > >> >> > for > >> >> > 2048 > >> >> > at scala.Predef$.assert(Predef.scala:170) > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$ > KafkaRDDIterator.next(KafkaRDD.scala:227) > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$ > KafkaRDDIterator.next(KafkaRDD.scala:193) > >> >> > > >> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned > >> >> > task > >> >> > 7854 > >> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 > (TID > >> >> > 7854) > >> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, > partition > >> >> > 0 > >> >> > offsets 57098866 -> 57109957 > >> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for > >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866 > >> >> > > >> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 > >> >> > (TID > >> >> > 7854). 1103 bytes result sent to driver > >> >> > > >> >> > > >> >> > > >> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> > >> >> > wrote: > >> >> >> > >> >> >> Thanks Cody. Setting poll timeout helped. > >> >> >> Our network is fine but brokers are not fully provisioned in test > >> >> >> cluster. > >> >> >> But there isn't enough load to max out on broker capacity. > >> >> >> Curious that kafkacat running on the same node doesn't have any > >> >> >> issues. > >> >> >> > >> >> >> Srikanth > >> >> >> > >> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger < > c...@koeninger.org> > >> >> >> wrote: > >> >> >>> > >> >> >>> You can set that poll timeout higher with > >> >> >>> > >> >> >>> spark.streaming.kafka.consumer.poll.ms > >> >> >>> > >> >> >>> but half a second is fairly generous. I'd try to take a look at > >> >> >>> what's going on with your network or kafka broker during that > time. > >> >> >>> > >> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com > > > >> >> >>> wrote: > >> >> >>> > Hello, > >> >> >>> > > >> >> >>> > I'm getting the below exception when testing Spark 2.0 with > Kafka > >> >> >>> > 0.10. > >> >> >>> > > >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 > >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : > >> >> >>> >> b8642491e78c5a13 > >> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for > >> >> >>> >> spark-executor-example mt_event 0 15782114 > >> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered > >> >> >>> >> coordinator > >> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group > >> >> >>> >> spark-executor-example. > >> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in > stage > >> >> >>> >> 1.0 > >> >> >>> >> (TID > >> >> >>> >> 6) > >> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get > >> >> >>> >> records > >> >> >>> >> for > >> >> >>> >> spark-executor-example mt_event 0 15782114 after polling for > 512 > >> >> >>> >> at scala.Predef$.assert(Predef.scala:170) > >> >> >>> >> at > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > >> >> >>> >> at > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$ > KafkaRDDIterator.next(KafkaRDD.scala:227) > >> >> >>> >> at > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> > >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$ > KafkaRDDIterator.next(KafkaRDD.scala:193) > >> >> >>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala: > 409) > >> >> >>> > > >> >> >>> > > >> >> >>> > I get this error intermittently. Sometimes a few batches are > >> >> >>> > scheduled > >> >> >>> > and > >> >> >>> > run fine. Then I get this error. > >> >> >>> > kafkacat is able to fetch from this topic continuously. > >> >> >>> > > >> >> >>> > Full exception is here -- > >> >> >>> > > >> >> >>> > > >> >> >>> > https://gist.github.com/SrikanthTati/ > c2e95c4ac689cd49aab817e24ec42767 > >> >> >>> > > >> >> >>> > Srikanth > >> >> >> > >> >> >> > >> >> > > >> > > >> > > > > > >