[ 
https://issues.apache.org/jira/browse/SPARK-19888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19888:
-------------------------------------
    Component/s:     (was: Spark Core)
                 DStreams

> Seeing offsets not resetting even when reset policy is configured explicitly
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-19888
>                 URL: https://issues.apache.org/jira/browse/SPARK-19888
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>            Reporter: Justin Miller
>
> I was told to post this in a Spark ticket from KAFKA-4396:
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be 
> two separate errors, I'm not sure. What's puzzling is that I'm setting 
> auto.offset.reset to latest and it's still throwing an 
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help! 
> :)
> {code}
> val kafkaParams = Map[String, Object](
>       "group.id" -> consumerGroup,
>       "bootstrap.servers" -> bootstrapServers,
>       "key.deserializer" -> classOf[ByteArrayDeserializer],
>       "value.deserializer" -> classOf[MessageRowDeserializer],
>       "auto.offset.reset" -> "latest",
>       "enable.auto.commit" -> (false: java.lang.Boolean),
>       "max.poll.records" -> persisterConfig.maxPollRecords,
>       "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>       "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>       "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>       "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
>     )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {topic=231884473}
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
>         at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>         at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
>         at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>         at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
>         at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
> 39388) in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
> 39375) in 13444 ms on xyz (2/16)
> 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 
> 38843, xyz): java.util.ConcurrentModificationException: KafkaConsumer is not 
> safe for multi-threaded access
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
>         at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>         at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
>         at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>         at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to