[jira] [Commented] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-15 Thread Justin Miller (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668217#comment-15668217
 ] 

Justin Miller commented on KAFKA-4396:
--

Any updates on this? Thanks!

> Seeing offsets not resetting even when reset policy is configured explicitly
> 
>
> Key: KAFKA-4396
> URL: https://issues.apache.org/jira/browse/KAFKA-4396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justin Miller
>
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be 
> two separate errors, I'm not sure. What's puzzling is that I'm setting 
> auto.offset.reset to latest and it's still throwing an 
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help! 
> :)
> {code}
> val kafkaParams = Map[String, Object](
>   "group.id" -> consumerGroup,
>   "bootstrap.servers" -> bootstrapServers,
>   "key.deserializer" -> classOf[ByteArrayDeserializer],
>   "value.deserializer" -> classOf[MessageRowDeserializer],
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "max.poll.records" -> persisterConfig.maxPollRecords,
>   "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>   "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>   "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>   "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
> )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {topic=231884473}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
> 39388) in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
> 39375) in 13444 ms on xyz (2/16)
> 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 
> 38843, xyz): java.util.ConcurrentModificationException: KafkaConsumer is not 
> safe for multi-threaded access
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>  

[jira] [Commented] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653148#comment-15653148
 ] 

Justin Miller commented on KAFKA-4396:
--

Hi huxi, thanks for responding.

I do have that set to false as I'm doing a

{code}
 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
{code}

with a custom OffsetCommitCallback to verify that I'm getting the offsets 
committed with no exception. I haven't tried running it on a new consumer group 
but I can try that tomorrow (though I would note that the problem only seems to 
manifest itself after it's processed a number of time batches. I do save all 
the parquet files that are generated for the time batch before I commit the 
offsets, this process can take up to 8 minutes. Should I perhaps just commit 
the offsets and deal with a potential data loss if retried puts to S3 fail?

Getting really close to putting this system in production. I've tweaked quite a 
few settings on the kafka consumer (can provide ConsumerConfigs if that would 
help), Streaming Kafka 0.10 has been very impressive so far!



> Seeing offsets not resetting even when reset policy is configured explicitly
> 
>
> Key: KAFKA-4396
> URL: https://issues.apache.org/jira/browse/KAFKA-4396
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justin Miller
>
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be 
> two separate errors, I'm not sure. What's puzzling is that I'm setting 
> auto.offset.reset to latest and it's still throwing an 
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help! 
> :)
> {code}
> val kafkaParams = Map[String, Object](
>   "group.id" -> consumerGroup,
>   "bootstrap.servers" -> bootstrapServers,
>   "key.deserializer" -> classOf[ByteArrayDeserializer],
>   "value.deserializer" -> classOf[MessageRowDeserializer],
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "max.poll.records" -> persisterConfig.maxPollRecords,
>   "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>   "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>   "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>   "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
> )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {topic=231884473}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPo

[jira] [Updated] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Miller updated KAFKA-4396:
-
Description: 
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

{code}
val kafkaParams = Map[String, Object](
  "group.id" -> consumerGroup,
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[ByteArrayDeserializer],
  "value.deserializer" -> classOf[MessageRowDeserializer],
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "max.poll.records" -> persisterConfig.maxPollRecords,
  "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
  "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
  "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
  "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)
{code}

{code}
16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on xyz (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
Offsets out of range with no configured reset policy for partitions: 
{topic=231884473}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on xyz (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on xyz (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
xyz): java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

{code}

  was:
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing 

[jira] [Updated] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Miller updated KAFKA-4396:
-
Description: 
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

{code}
val kafkaParams = Map[String, Object](
  "group.id" -> consumerGroup,
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[ByteArrayDeserializer],
  "value.deserializer" -> classOf[MessageRowDeserializer],
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "max.poll.records" -> persisterConfig.maxPollRecords,
  "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
  "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
  "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
  "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)
{code}

{code}
16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, ip-172-20-212-51.int.protectwise.net): 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: 
{observation.http-final-main-0-0=231884473}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
ip-172-20-212-52.int.protectwise.net): 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

{code}

  was:
I'

[jira] [Created] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)
Justin Miller created KAFKA-4396:


 Summary: Seeing offsets not resetting even when reset policy is 
configured explicitly
 Key: KAFKA-4396
 URL: https://issues.apache.org/jira/browse/KAFKA-4396
 Project: Kafka
  Issue Type: Bug
Reporter: Justin Miller


I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

val kafkaParams = Map[String, Object](
  "group.id" -> consumerGroup,
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[ByteArrayDeserializer],
  "value.deserializer" -> classOf[MessageRowDeserializer],
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "max.poll.records" -> persisterConfig.maxPollRecords,
  "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
  "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
  "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
  "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)

16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, ip-172-20-212-51.int.protectwise.net): 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: 
{observation.http-final-main-0-0=231884473}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
ip-172-20-212-52.int.protectwise.net): 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$K

[jira] [Updated] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly

2016-11-09 Thread Justin Miller (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Miller updated KAFKA-4396:
-
Description: 
I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two 
separate errors, I'm not sure. What's puzzling is that I'm setting 
auto.offset.reset to latest and it's still throwing an 
OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :)

```
val kafkaParams = Map[String, Object](
  "group.id" -> consumerGroup,
  "bootstrap.servers" -> bootstrapServers,
  "key.deserializer" -> classOf[ByteArrayDeserializer],
  "value.deserializer" -> classOf[MessageRowDeserializer],
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "max.poll.records" -> persisterConfig.maxPollRecords,
  "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
  "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
  "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
  "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
)
```

16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB)
16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
38837, ip-172-20-212-51.int.protectwise.net): 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: 
{observation.http-final-main-0-0=231884473}
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at 
org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16)
16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16)
16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, 
ip-172-20-212-52.int.protectwise.net): 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

  was:
I've been seeing a curi