Justin Miller created KAFKA-4396:
------------------------------------
Summary: Seeing offsets not resetting even when reset policy is
configured explicitly
Key: KAFKA-4396
URL: https://issues.apache.org/jira/browse/KAFKA-4396
Project: Kafka
Issue Type: Bug
Reporter: Justin Miller
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two
separate errors, I'm not sure. What's puzzling is that I'm setting
auto.offset.reset to latest and it's still throwing an
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)
val kafkaParams = Map[String, Object](
"group.id" -> consumerGroup,
"bootstrap.servers" -> bootstrapServers,
"key.deserializer" -> classOf[ByteArrayDeserializer],
"value.deserializer" -> classOf[MessageRowDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"max.poll.records" -> persisterConfig.maxPollRecords,
"request.timeout.ms" -> persisterConfig.requestTimeoutMs,
"session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
"heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
"connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)
16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory
on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID
38837, ip-172-20-212-51.int.protectwise.net):
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of
range with no configured reset policy for partitions:
{observation.http-final-main-0-0=231884473}
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID
39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID
39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843,
ip-172-20-212-52.int.protectwise.net):
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)