Justin Miller created SPARK-19888:
-------------------------------------

             Summary: Seeing offsets not resetting even when reset policy is 
configured explicitly
                 Key: SPARK-19888
                 URL: https://issues.apache.org/jira/browse/SPARK-19888
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.1.0
            Reporter: Justin Miller


I was told to post this in a Spark ticket from KAFKA-4396:

I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

{code}
val kafkaParams = Map[String, Object](
      "group.id" -> consumerGroup,
      "bootstrap.servers" -> bootstrapServers,
      "key.deserializer" -> classOf[ByteArrayDeserializer],
      "value.deserializer" -> classOf[MessageRowDeserializer],
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "max.poll.records" -> persisterConfig.maxPollRecords,
      "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
      "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
      "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
      "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
    )
{code}

{code}
16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on xyz (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
Offsets out of range with no configured reset policy for partitions: 
{topic=231884473}
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
        at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
        at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
        at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
        at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on xyz (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on xyz (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
xyz): java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
        at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
        at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
        at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
        at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to