[ https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891428#comment-15891428 ]
Helena Edelson edited comment on KAFKA-4396 at 3/2/17 1:39 AM: --------------------------------------------------------------- I ran into this as well. Re-deploying with a new consumer group.id fixed it but that's not a sustainable solution. Kafka 0.10.1.0. @jrmiller I recommend opening a related ticket against Spark - spark streaming kafka was (Author: helena_e): I ran into this as well. Re-deploying with a new consumer group.id fixed it but that's not a sustainable solution. Kafka 0.10.1.0 > Seeing offsets not resetting even when reset policy is configured explicitly > ---------------------------------------------------------------------------- > > Key: KAFKA-4396 > URL: https://issues.apache.org/jira/browse/KAFKA-4396 > Project: Kafka > Issue Type: Bug > Reporter: Justin Miller > > I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be > two separate errors, I'm not sure. What's puzzling is that I'm setting > auto.offset.reset to latest and it's still throwing an > OffsetOutOfRangeException, behavior that's contrary to the code. Please help! > :) > {code} > val kafkaParams = Map[String, Object]( > "group.id" -> consumerGroup, > "bootstrap.servers" -> bootstrapServers, > "key.deserializer" -> classOf[ByteArrayDeserializer], > "value.deserializer" -> classOf[MessageRowDeserializer], > "auto.offset.reset" -> "latest", > "enable.auto.commit" -> (false: java.lang.Boolean), > "max.poll.records" -> persisterConfig.maxPollRecords, > "request.timeout.ms" -> persisterConfig.requestTimeoutMs, > "session.timeout.ms" -> persisterConfig.sessionTimeoutMs, > "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs, > "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs > ) > {code} > {code} > 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory > on xyz (size: 146.3 KB, free: 8.4 GB) > 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID > 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > Offsets out of range with no configured reset policy for partitions: > {topic=231884473} > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID > 39388) in 12043 ms on xyz (1/16) > 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID > 39375) in 13444 ms on xyz (2/16) > 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID > 38843, xyz): java.util.ConcurrentModificationException: KafkaConsumer is not > safe for multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)