[
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653148#comment-15653148
]
Justin Miller commented on KAFKA-4396:
--------------------------------------
Hi huxi, thanks for responding.
I do have that set to false as I'm doing a
{code}
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
{code}
with a custom OffsetCommitCallback to verify that I'm getting the offsets
committed with no exception. I haven't tried running it on a new consumer group
but I can try that tomorrow (though I would note that the problem only seems to
manifest itself after it's processed a number of time batches. I do save all
the parquet files that are generated for the time batch before I commit the
offsets, this process can take up to 8 minutes. Should I perhaps just commit
the offsets and deal with a potential data loss if retried puts to S3 fail?
Getting really close to putting this system in production. I've tweaked quite a
few settings on the kafka consumer (can provide ConsumerConfigs if that would
help), Streaming Kafka 0.10 has been very impressive so far!
> Seeing offsets not resetting even when reset policy is configured explicitly
> ----------------------------------------------------------------------------
>
> Key: KAFKA-4396
> URL: https://issues.apache.org/jira/browse/KAFKA-4396
> Project: Kafka
> Issue Type: Bug
> Reporter: Justin Miller
>
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be
> two separate errors, I'm not sure. What's puzzling is that I'm setting
> auto.offset.reset to latest and it's still throwing an
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help!
> :)
> {code}
> val kafkaParams = Map[String, Object](
> "group.id" -> consumerGroup,
> "bootstrap.servers" -> bootstrapServers,
> "key.deserializer" -> classOf[ByteArrayDeserializer],
> "value.deserializer" -> classOf[MessageRowDeserializer],
> "auto.offset.reset" -> "latest",
> "enable.auto.commit" -> (false: java.lang.Boolean),
> "max.poll.records" -> persisterConfig.maxPollRecords,
> "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
> "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
> "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
> "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
> )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> Offsets out of range with no configured reset policy for partitions:
> {topic=231884473}
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID
> 39388) in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID
> 39375) in 13444 ms on xyz (2/16)
> 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID
> 38843, xyz): java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)