It's a really noticeable overhead, without the cache you're basically
pulling every message twice due to prefetching.

On Wed, Sep 7, 2016 at 3:23 PM, Srikanth <srikanth...@gmail.com> wrote:
> Yea, disabling cache was not going to be my permanent solution either.
> I was going to ask how big an overhead is that?
>
> It happens intermittently and each time it happens retry is successful.
>
> Srikanth
>
> On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> That's not what I would have expected to happen with a lower cache
>> setting, but in general disabling the cache isn't something you want
>> to do with the new kafka consumer.
>>
>>
>> As far as the original issue, are you seeing those polling errors
>> intermittently, or consistently?  From your description, it sounds
>> like retry is working correctly.
>>
>>
>> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> wrote:
>> > Setting those two results in below exception.
>> > No.of executors < no.of partitions. Could that be triggering this?
>> >
>> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
>> > (TID 9)
>> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> > multi-threaded access
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
>> > at java.util.HashMap.putVal(Unknown Source)
>> > at java.util.HashMap.put(Unknown Source)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:210)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> >
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> > at
>> >
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> > at java.lang.Thread.run(Unknown Source)
>> >
>> >
>> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> you could try setting
>> >>
>> >> spark.streaming.kafka.consumer.cache.initialCapacity
>> >>
>> >> spark.streaming.kafka.consumer.cache.maxCapacity
>> >>
>> >> to 1
>> >>
>> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote:
>> >> > I had a look at the executor logs and noticed that this exception
>> >> > happens
>> >> > only when using the cached consumer.
>> >> > Every retry is successful. This is consistent.
>> >> > One possibility is that the cached consumer is causing the failure as
>> >> > retry
>> >> > clears it.
>> >> > Is there a way to disable cache and test this?
>> >> > Again, kafkacat is running fine on the same node.
>> >> >
>> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
>> >> > 7849)
>> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
>> >> > 7851
>> >> >
>> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition
>> >> > 2
>> >> > offsets 57079162 -> 57090330
>> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition
>> >> > 0
>> >> > offsets 57098866 -> 57109957
>> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0
>> >> > (TID
>> >> > 7851). 1030 bytes result sent to driver
>> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage
>> >> > 138.0
>> >> > (TID
>> >> > 7849)
>> >> > java.lang.AssertionError: assertion failed: Failed to get records for
>> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
>> >> > for
>> >> > 2048
>> >> >       at scala.Predef$.assert(Predef.scala:170)
>> >> >       at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >       at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >       at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >
>> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned
>> >> > task
>> >> > 7854
>> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID
>> >> > 7854)
>> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition
>> >> > 0
>> >> > offsets 57098866 -> 57109957
>> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
>> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
>> >> >
>> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0
>> >> > (TID
>> >> > 7854). 1103 bytes result sent to driver
>> >> >
>> >> >
>> >> >
>> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> Thanks Cody. Setting poll timeout helped.
>> >> >> Our network is fine but brokers are not fully provisioned in test
>> >> >> cluster.
>> >> >> But there isn't enough load to max out on broker capacity.
>> >> >> Curious that kafkacat running on the same node doesn't have any
>> >> >> issues.
>> >> >>
>> >> >> Srikanth
>> >> >>
>> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <c...@koeninger.org>
>> >> >> wrote:
>> >> >>>
>> >> >>> You can set that poll timeout higher with
>> >> >>>
>> >> >>> spark.streaming.kafka.consumer.poll.ms
>> >> >>>
>> >> >>> but half a second is fairly generous.  I'd try to take a look at
>> >> >>> what's going on with your network or kafka broker during that time.
>> >> >>>
>> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com>
>> >> >>> wrote:
>> >> >>> > Hello,
>> >> >>> >
>> >> >>> > I'm getting the below exception when testing Spark 2.0 with Kafka
>> >> >>> > 0.10.
>> >> >>> >
>> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
>> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
>> >> >>> >> b8642491e78c5a13
>> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
>> >> >>> >> spark-executor-example mt_event 0 15782114
>> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered
>> >> >>> >> coordinator
>> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>> >> >>> >> spark-executor-example.
>> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage
>> >> >>> >> 1.0
>> >> >>> >> (TID
>> >> >>> >> 6)
>> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get
>> >> >>> >> records
>> >> >>> >> for
>> >> >>> >> spark-executor-example mt_event 0 15782114 after polling for 512
>> >> >>> >> at scala.Predef$.assert(Predef.scala:170)
>> >> >>> >> at
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >>> >> at
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >>> >> at
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >> >>> >
>> >> >>> >
>> >> >>> > I get this error intermittently. Sometimes a few batches are
>> >> >>> > scheduled
>> >> >>> > and
>> >> >>> > run fine. Then I get this error.
>> >> >>> > kafkacat is able to fetch from this topic continuously.
>> >> >>> >
>> >> >>> > Full exception is here --
>> >> >>> >
>> >> >>> >
>> >> >>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>> >> >>> >
>> >> >>> > Srikanth
>> >> >>
>> >> >>
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to