Setting those two results in below exception. No.of executors < no.of partitions. Could that be triggering this?
16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 9) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) at java.util.HashMap.putVal(Unknown Source) at java.util.HashMap.put(Unknown Source) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:210) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org> wrote: > you could try setting > > spark.streaming.kafka.consumer.cache.initialCapacity > > spark.streaming.kafka.consumer.cache.maxCapacity > > to 1 > > On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote: > > I had a look at the executor logs and noticed that this exception happens > > only when using the cached consumer. > > Every retry is successful. This is consistent. > > One possibility is that the cached consumer is causing the failure as > retry > > clears it. > > Is there a way to disable cache and test this? > > Again, kafkacat is running fine on the same node. > > > > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID > 7849) > > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID > 7851 > > > > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 > > offsets 57079162 -> 57090330 > > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 > > offsets 57098866 -> 57109957 > > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID > > 7851). 1030 bytes result sent to driver > > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 > (TID > > 7849) > > java.lang.AssertionError: assertion failed: Failed to get records for > > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling > for > > 2048 > > at scala.Predef$.assert(Predef.scala:170) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > > > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task > 7854 > > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID > 7854) > > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0 > > offsets 57098866 -> 57109957 > > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for > > spark-executor-StreamingPixelCount1 mt_event 0 57098866 > > > > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID > > 7854). 1103 bytes result sent to driver > > > > > > > > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> wrote: > >> > >> Thanks Cody. Setting poll timeout helped. > >> Our network is fine but brokers are not fully provisioned in test > cluster. > >> But there isn't enough load to max out on broker capacity. > >> Curious that kafkacat running on the same node doesn't have any issues. > >> > >> Srikanth > >> > >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <c...@koeninger.org> > >> wrote: > >>> > >>> You can set that poll timeout higher with > >>> > >>> spark.streaming.kafka.consumer.poll.ms > >>> > >>> but half a second is fairly generous. I'd try to take a look at > >>> what's going on with your network or kafka broker during that time. > >>> > >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com> > wrote: > >>> > Hello, > >>> > > >>> > I'm getting the below exception when testing Spark 2.0 with Kafka > 0.10. > >>> > > >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 > >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : > >>> >> b8642491e78c5a13 > >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for > >>> >> spark-executor-example mt_event 0 15782114 > >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator > >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group > >>> >> spark-executor-example. > >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 > >>> >> (TID > >>> >> 6) > >>> >> java.lang.AssertionError: assertion failed: Failed to get records > for > >>> >> spark-executor-example mt_event 0 15782114 after polling for 512 > >>> >> at scala.Predef$.assert(Predef.scala:170) > >>> >> at > >>> >> > >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > >>> >> at > >>> >> > >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > >>> >> at > >>> >> > >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > >>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > >>> > > >>> > > >>> > I get this error intermittently. Sometimes a few batches are > scheduled > >>> > and > >>> > run fine. Then I get this error. > >>> > kafkacat is able to fetch from this topic continuously. > >>> > > >>> > Full exception is here -- > >>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec427 > 67 > >>> > > >>> > Srikanth > >> > >> > > >