[
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668217#comment-15668217
]
Justin Miller commented on KAFKA-4396:
--------------------------------------
Any updates on this? Thanks!
> Seeing offsets not resetting even when reset policy is configured explicitly
> ----------------------------------------------------------------------------
>
> Key: KAFKA-4396
> URL: https://issues.apache.org/jira/browse/KAFKA-4396
> Project: Kafka
> Issue Type: Bug
> Reporter: Justin Miller
>
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be
> two separate errors, I'm not sure. What's puzzling is that I'm setting
> auto.offset.reset to latest and it's still throwing an
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help!
> :)
> {code}
> val kafkaParams = Map[String, Object](
> "group.id" -> consumerGroup,
> "bootstrap.servers" -> bootstrapServers,
> "key.deserializer" -> classOf[ByteArrayDeserializer],
> "value.deserializer" -> classOf[MessageRowDeserializer],
> "auto.offset.reset" -> "latest",
> "enable.auto.commit" -> (false: java.lang.Boolean),
> "max.poll.records" -> persisterConfig.maxPollRecords,
> "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
> "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
> "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
> "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
> )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> Offsets out of range with no configured reset policy for partitions:
> {topic=231884473}
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID
> 39388) in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID
> 39375) in 13444 ms on xyz (2/16)
> 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID
> 38843, xyz): java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)