Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have been struggling with this show stopper problem.
When we run our drivers with auto.offset.reset=latest ingesting from a single kafka topic with 10 partitions, the driver reads correctly from all 10 partitions. However when we use auto.offset.reset=earliest, the driver will read only a single partition. When we turn on the debug logs, we sometimes see partitions being set to different offset configuration even though the consumer config correctly indicates auto.offset.reset=earliest. 8 DEBUG Resetting offset for *partition simple_test-8 to earliest offset*. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for *partition simple_test-9 to latest offset*. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) I've enclosed below the completely stripped down trivial test driver that shows this behavior. We normally run with YARN 2.7.3 but have also tried running spark standalone mode which has the same behavior. Our drivers are normally java but we have tried the scala version which also has the same incorrect behavior. We have tried different LocationStrategies and partition assignment strategies all without success. Any insight would be greatly appreciated. package com.xxxxx.labs.analytics.diagnostics.spark.drivers import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies /** * * This driver is only for pulling data from the stream and logging to output just to isolate single partition bug */ object SimpleKafkaLoggingDriver { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> <topic> <groupId> <offsetReset>") System.exit(1) } val Array(brokers, topic, groupId, offsetReset) = args val preferredHosts = LocationStrategies.PreferConsistent val topics = List(topic) val kafkaParams = Map( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> offsetReset, "enable.auto.commit" -> (false: java.lang.Boolean) ) val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic) val streamingContext = new StreamingContext(sparkConf, Seconds(5)) val dstream = KafkaUtils.createDirectStream[String, String]( streamingContext, preferredHosts, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) dstream.foreachRDD { rdd => // Get the offset ranges in the RDD and log val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}") } } streamingContext.start streamingContext.awaitTermination() } } 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = simple_test_group heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer Below is the output of above driver for 5 partition topic. Offsets always remain 0 for all but a single partition in this case 3 simple_logtest 3 offsets: 1623531 to 1623531 simple_logtest 0 offsets: 0 to 0 simple_logtest 1 offsets: 0 to 0 simple_logtest 2 offsets: 0 to 0 simple_logtest 4 offsets: 0 to 0 simple_logtest 3 offsets: 1623531 to 1623531 simple_logtest 0 offsets: 0 to 0 simple_logtest 1 offsets: 0 to 0 simple_logtest 2 offsets: 0 to 0 simple_logtest 4 offsets: 0 to 0 simple_logtest 3 offsets: 1623531 to 1623531