[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic when kafka-clients 0.10.1.0 is used

2016-12-01 Thread Heji Kim (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heji Kim updated SPARK-18506:
-
Summary: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only 
read from a single partition on a multi partition topic when kafka-clients 
0.10.1.0 is used  (was: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest 
will only read from a single partition on a multi partition topic when 
kafka-clients 0.10.0.1 is used)

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic when kafka-clients 0.10.1.0 is 
> used
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the compl

[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic when kafka-clients 0.10.0.1 is used

2016-12-01 Thread Heji Kim (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heji Kim updated SPARK-18506:
-
Summary: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only 
read from a single partition on a multi partition topic when kafka-clients 
0.10.0.1 is used  (was: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest 
will only read from a single partition on a multi partition topic)

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic when kafka-clients 0.10.0.1 is 
> used
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driv

[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Heji Kim (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heji Kim updated SPARK-18506:
-
Description: 
Our team is trying to upgrade to Spark 2.0.2/Kafka 
0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
drivers to read all partitions of a single stream when kafka 
auto.offset.reset=earliest running on a real cluster(separate VM nodes). 

When we run our drivers with auto.offset.reset=latest ingesting from a single 
kafka topic with multiple partitions (usually 10 but problem shows up  with 
only 3 partitions), the driver reads correctly from all partitions.  
Unfortunately, we need "earliest" for exactly once semantics.

In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
spark-streaming-kafka-0-8_2.11 with the prior setting 
auto.offset.reset=smallest runs correctly.

We have tried the following configurations in trying to isolate our problem but 
it is only auto.offset.reset=earliest on a "real multi-machine cluster" which 
causes this problem.

1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  instead 
of YARN 2.7.3. Single partition read problem persists both cases. Please note 
this problem occurs on an actual cluster of separate VM nodes (but not when our 
engineer runs in as a cluster on his own Mac.)

2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
3. Turned off checkpointing. Problem persists with or without checkpointing.
4. Turned off backpressure. Problem persists with or without backpressure.
5. Tried both partition.assignment.strategy RangeAssignor and 
RoundRobinAssignor. Broken with both.
6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
both.
7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
Broken with both.
8. Tried increasing GCE capacity for cluster but already we were highly 
overprovisioned for cores and memory. Also tried ramping up executors and 
cores.  Since driver works with auto.offset.reset=latest, we have ruled out GCP 
cloud infrastructure issues.


When we turn on the debug logs, we sometimes see partitions being set to 
different offset configuration even though the consumer config correctly 
indicates auto.offset.reset=earliest. 
{noformat}
8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 TRACE Sending ListOffsetRequest 
{replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
 to broker 10.102.20.12:9092 (id: 12 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 TRACE Sending ListOffsetRequest 
{replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
 to broker 10.102.20.13:9092 (id: 13 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 TRACE Received ListOffsetResponse 
{responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
 from broker 10.102.20.12:9092 (id: 12 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 TRACE Received ListOffsetResponse 
{responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
 from broker 10.102.20.13:9092 (id: 13 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
(org.apache.kafka.clients.consumer.internals.Fetcher)
{noformat}

I've enclosed below the completely stripped down trivial test driver that shows 
this behavior.  After spending 2 weeks trying all combinations with a really 
stripped down driver, we think either there might be a bug in the kafka spark 
integration or if the kafka 0.10/spark upgrade needs special configuration, it 
should be fantastic if it was clearer in the documentation. But currently we 
cannot upgrade.

{code}
package com.x.labs.analytics.diagnostics.spark.drivers

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies


/**
  *
  * This driver is only for pulling data from the stream and logging to output 
just to isolate single partition bug
  */
object SimpleKafkaLoggingDriver {
  def main(args: Array[String]) {
if (args.length != 4) {
  System.err.println("Usage: SimpleTestDriver  
  ")
  System.exit(1)
}

val Array(brokers, topic, groupId, offsetReset

[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Heji Kim (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heji Kim updated SPARK-18506:
-
Description: 
Our team is trying to upgrade to Spark 2.0.2/Kafka 
0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
drivers to read all partitions of a single stream when kafka 
auto.offset.reset=earliest.  
When we run our drivers with auto.offset.reset=latest ingesting from a single 
kafka topic with multiple partitions (usually 10 but problem shows up  with 
only 3 partitions), the driver reads correctly from all partitions.  
Unfortunately, we need "earliest" for exactly once semantics.

In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
spark-streaming-kafka-0-8_2.11 with the prior setting 
auto.offset.reset=smallest runs correctly.

We have tried the following configurations in trying to isolate our problem but 
it is only auto.offset.reset=earliest on a "real multi-machine cluster" which 
causes this problem.

1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  instead 
of YARN 2.7.3. Single partition read problem persists both cases. Please note 
this problem occurs on an actual cluster of separate VM nodes (but not when our 
engineer runs in as a cluster on his own Mac.)

2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
3. Turned off checkpointing. Problem persists with or without checkpointing.
4. Turned off backpressure. Problem persists with or without backpressure.
5. Tried both partition.assignment.strategy RangeAssignor and 
RoundRobinAssignor. Broken with both.
6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
both.
7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
Broken with both.
8. Tried increasing GCE capacity for cluster but already we were highly 
overprovisioned for cores and memory. Also tried ramping up executors and 
cores.  Since driver works with auto.offset.reset=latest, we have ruled out GCP 
cloud infrastructure issues.


When we turn on the debug logs, we sometimes see partitions being set to 
different offset configuration even though the consumer config correctly 
indicates auto.offset.reset=earliest. 
{noformat}
8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 TRACE Sending ListOffsetRequest 
{replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
 to broker 10.102.20.12:9092 (id: 12 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 TRACE Sending ListOffsetRequest 
{replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
 to broker 10.102.20.13:9092 (id: 13 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 TRACE Received ListOffsetResponse 
{responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
 from broker 10.102.20.12:9092 (id: 12 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 TRACE Received ListOffsetResponse 
{responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
 from broker 10.102.20.13:9092 (id: 13 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
(org.apache.kafka.clients.consumer.internals.Fetcher)
{noformat}

I've enclosed below the completely stripped down trivial test driver that shows 
this behavior.  After spending 2 weeks trying all combinations with a really 
stripped down driver, we think either there might be a bug in the kafka spark 
integration or if the kafka 0.10/spark upgrade needs special configuration, it 
should be fantastic if it was clearer in the documentation. But currently we 
cannot upgrade.

{code}
package com.x.labs.analytics.diagnostics.spark.drivers

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies


/**
  *
  * This driver is only for pulling data from the stream and logging to output 
just to isolate single partition bug
  */
object SimpleKafkaLoggingDriver {
  def main(args: Array[String]) {
if (args.length != 4) {
  System.err.println("Usage: SimpleTestDriver  
  ")
  System.exit(1)
}

val Array(brokers, topic, groupId, offsetReset) = args
val preferredHosts = LocationStr

[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-18 Thread Heji Kim (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heji Kim updated SPARK-18506:
-
Description: 
Our team is trying to upgrade to Spark 2.0.2/Kafka 
0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
drivers to read all partitions of a single stream when kafka 
auto.offset.reset=earliest.

When we run our drivers with auto.offset.reset=latest ingesting from a single 
kafka topic with multiple partitions (usually 10 but problem shows up  with 
only 3 partitions), the driver reads correctly from all partitions.  
Unfortunately, we need "earliest" for exactly once semantics.

In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
spark-streaming-kafka-0-8_2.11 with the prior setting 
auto.offset.reset=smallest runs correctly.

We have tried the following configurations in trying to isolate our problem but 
it is only auto.offset.reset=earliest which causes this problem.
1. Ran with spark standalone cluster instead of YARN 2.7.3. Single partition 
read problem persists both cases.
2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
3. Turned off checkpointing. Problem persists with or without checkpointing.
4. Turned off backpressure. Problem persists with or without backpressure.
5. Tried both partition.assignment.strategy RangeAssignor and 
RoundRobinAssignor. Broken with both.
6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
both.
7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
Broken with both.
8. Tried increasing GCE capacity for cluster but already we were highly 
overprovisioned for cores and memory. Also tried ramping up executors and 
cores.  Since driver works with auto.offset.reset=latest, we have ruled out GCP 
cloud infrastructure issues.


When we turn on the debug logs, we sometimes see partitions being set to 
different offset configuration even though the consumer config correctly 
indicates auto.offset.reset=earliest. 
{noformat}
8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 TRACE Sending ListOffsetRequest 
{replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
 to broker 10.102.20.12:9092 (id: 12 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 TRACE Sending ListOffsetRequest 
{replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
 to broker 10.102.20.13:9092 (id: 13 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 TRACE Received ListOffsetResponse 
{responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
 from broker 10.102.20.12:9092 (id: 12 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 TRACE Received ListOffsetResponse 
{responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
 from broker 10.102.20.13:9092 (id: 13 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
(org.apache.kafka.clients.consumer.internals.Fetcher)
9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
(org.apache.kafka.clients.consumer.internals.Fetcher)
{noformat}

I've enclosed below the completely stripped down trivial test driver that shows 
this behavior.  After spending 2 weeks trying all combinations with a really 
stripped down driver, we think either there might be a bug in the kafka spark 
integration or if the kafka 0.10/spark upgrade needs special configuration, it 
should be fantastic if it was clearer in the documentation. But currently we 
cannot upgrade.

{code}
package com.x.labs.analytics.diagnostics.spark.drivers

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies


/**
  *
  * This driver is only for pulling data from the stream and logging to output 
just to isolate single partition bug
  */
object SimpleKafkaLoggingDriver {
  def main(args: Array[String]) {
if (args.length != 4) {
  System.err.println("Usage: SimpleTestDriver  
  ")
  System.exit(1)
}

val Array(brokers, topic, groupId, offsetReset) = args
val preferredHosts = LocationStrategies.PreferConsistent
val topics = List(topic)

val kafkaParams = Map(
  "bootstrap.servers" -> brokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classO