[ 
https://issues.apache.org/jira/browse/SPARK-19888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905730#comment-15905730
 ] 

Cody Koeninger commented on SPARK-19888:
----------------------------------------

That stacktrace also shows a concurrent modification exception, yes?.  See 
SPARK-19185 for that

See e.g. SPARK-19680 for background on why offset out of range may occur on 
executor when it doesn't on driver.  Although if you're using reset latest, 
unless you have really short retention this is kind of surprising.

> Seeing offsets not resetting even when reset policy is configured explicitly
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-19888
>                 URL: https://issues.apache.org/jira/browse/SPARK-19888
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>            Reporter: Justin Miller
>
> I was told to post this in a Spark ticket from KAFKA-4396:
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be 
> two separate errors, I'm not sure. What's puzzling is that I'm setting 
> auto.offset.reset to latest and it's still throwing an 
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help! 
> :)
> {code}
> val kafkaParams = Map[String, Object](
>       "group.id" -> consumerGroup,
>       "bootstrap.servers" -> bootstrapServers,
>       "key.deserializer" -> classOf[ByteArrayDeserializer],
>       "value.deserializer" -> classOf[MessageRowDeserializer],
>       "auto.offset.reset" -> "latest",
>       "enable.auto.commit" -> (false: java.lang.Boolean),
>       "max.poll.records" -> persisterConfig.maxPollRecords,
>       "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>       "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>       "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>       "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
>     )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {topic=231884473}
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
>         at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>         at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
>         at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>         at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
>         at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
> 39388) in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
> 39375) in 13444 ms on xyz (2/16)
> 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 
> 38843, xyz): java.util.ConcurrentModificationException: KafkaConsumer is not 
> safe for multi-threaded access
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
>         at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>         at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
>         at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>         at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to