Hi Cody,

Thank you for testing this on a Saturday morning!  I failed to mention that
when our data engineer runs our drivers(even complex ones) locally on his
Mac, the drivers work fine. However when we launch it into the cluster (4
machines either for a YARN cluster or spark standalone) we get this issue.


On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger <c...@koeninger.org> wrote:

> I ran your example using the versions of kafka and spark you are
> using, against a standalone cluster.  This is what I observed:
> (in kafka working directory)
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> simple_logtest:2:31
> simple_logtest:4:31
> simple_logtest:1:31
> simple_logtest:3:31
> simple_logtest:0:31
> So in other words, there are 5 partitions, they all have messages in them
> (in spark working directory)
> bash-3.2$ ./bin/spark-submit --master
> spark://Codys-MacBook-Pro.local:7077 --class
> example.SimpleKafkaLoggingDriver
> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> localhost:9092 simple_logtest mygroup earliest
> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
> simple_logtest 3 offsets: 0 to 31
> simple_logtest 0 offsets: 0 to 31
> simple_logtest 1 offsets: 0 to 31
> simple_logtest 2 offsets: 0 to 31
> simple_logtest 4 offsets: 0 to 31
> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> 1479574025000 ms (execution: 0.005 s)
> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 1479574030000 ms
> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> 1479574030000 ms.0 from job set of time 1479574030000 ms
> simple_logtest 3 offsets: 31 to 31
> simple_logtest 0 offsets: 31 to 31
> simple_logtest 1 offsets: 31 to 31
> simple_logtest 2 offsets: 31 to 31
> simple_logtest 4 offsets: 31 to 31
> So in other words, spark is indeed seeing offsets for each partition.
> The results you posted look to me like there aren't any messages going
> into the other partitions, which looks like a misbehaving producer.
> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
> <hster.investiga...@gmail.com> wrote:
> > Our team is trying to upgrade to Spark 2.0.2/Kafka and we have
> been
> > struggling with this show stopper problem.
> >
> > When we run our drivers with auto.offset.reset=latest ingesting from a
> > single kafka topic with 10 partitions, the driver reads correctly from
> all
> > 10 partitions.
> >
> > However when we use auto.offset.reset=earliest, the driver will read
> only a
> > single partition.
> >
> > When we turn on the debug logs, we sometimes see partitions being set to
> > different offset configuration even though the consumer config correctly
> > indicates auto.offset.reset=earliest.
> >
> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=8,timestamp=-2}]}]}
> >> to broker (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=9,timestamp=-1}]}]}
> >> to broker (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> >> from broker (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> >> from broker (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >
> >
> >
> > I've enclosed below the completely stripped down trivial test driver that
> > shows this behavior. We normally run with YARN 2.7.3 but have also tried
> > running spark standalone mode which has the same behavior. Our drivers
> are
> > normally java but we have tried the scala version which also has the same
> > incorrect behavior. We have tried different LocationStrategies and
> partition
> > assignment strategies all without success.  Any insight would be greatly
> > appreciated.
> >
> > package com.xxxxx.labs.analytics.diagnostics.spark.drivers
> >
> > import org.apache.kafka.common.serialization.StringDeserializer
> > import org.apache.spark.SparkConf
> > import org.apache.spark.streaming.{Seconds, StreamingContext}
> > import org.apache.spark.streaming.kafka010._
> > import org.apache.spark.streaming.kafka010.LocationStrategies
> > import org.apache.spark.streaming.kafka010.ConsumerStrategies
> >
> >
> > /**
> >   *
> >   * This driver is only for pulling data from the stream and logging to
> > output just to isolate single partition bug
> >   */
> > object SimpleKafkaLoggingDriver {
> >   def main(args: Array[String]) {
> >     if (args.length != 4) {
> >       System.err.println("Usage: SimpleTestDriver <broker bootstrap
> servers>
> > <topic> <groupId> <offsetReset>")
> >       System.exit(1)
> >     }
> >
> >     val Array(brokers, topic, groupId, offsetReset) = args
> >     val preferredHosts = LocationStrategies.PreferConsistent
> >     val topics = List(topic)
> >
> >     val kafkaParams = Map(
> >       "bootstrap.servers" -> brokers,
> >       "key.deserializer" -> classOf[StringDeserializer],
> >       "value.deserializer" -> classOf[StringDeserializer],
> >       "group.id" -> groupId,
> >       "auto.offset.reset" -> offsetReset,
> >       "enable.auto.commit" -> (false: java.lang.Boolean)
> >     )
> >
> >     val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_"
> > +topic)
> >     val streamingContext = new StreamingContext(sparkConf, Seconds(5))
> >
> >
> >     val dstream = KafkaUtils.createDirectStream[String, String](
> >       streamingContext,
> >       preferredHosts,
> >       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
> >
> >     dstream.foreachRDD { rdd =>
> >       // Get the offset ranges in the RDD and log
> >       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >       for (o <- offsetRanges) {
> >         println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to
> > ${o.untilOffset}")
> >       }
> >     }
> >
> >     streamingContext.start
> >     streamingContext.awaitTermination()
> >
> >   }
> >
> > }
> >
> >
> >
> >> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
> >>
> >> auto.commit.interval.ms = 5000
> >>
> >> auto.offset.reset = earliest
> >>
> >> bootstrap.servers = [,]
> >>
> >> check.crcs = true
> >>
> >> client.id =
> >>
> >> connections.max.idle.ms = 540000
> >>
> >> enable.auto.commit = false
> >>
> >> exclude.internal.topics = true
> >>
> >> fetch.max.bytes = 52428800
> >>
> >> fetch.max.wait.ms = 500
> >>
> >> fetch.min.bytes = 1
> >>
> >> group.id = simple_test_group
> >>
> >> heartbeat.interval.ms = 3000
> >>
> >> interceptor.classes = null
> >>
> >> key.deserializer = class
> >> org.apache.kafka.common.serialization.StringDeserializer
> >>
> >> max.partition.fetch.bytes = 1048576
> >>
> >> max.poll.interval.ms = 300000
> >>
> >> max.poll.records = 500
> >>
> >> metadata.max.age.ms = 300000
> >>
> >> metric.reporters = []
> >>
> >> metrics.num.samples = 2
> >>
> >> metrics.sample.window.ms = 30000
> >>
> >> partition.assignment.strategy = [class
> >> org.apache.kafka.clients.consumer.RangeAssignor]
> >>
> >> receive.buffer.bytes = 65536
> >>
> >> reconnect.backoff.ms = 50
> >>
> >> request.timeout.ms = 305000
> >>
> >> retry.backoff.ms = 100
> >>
> >> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >>
> >> sasl.kerberos.min.time.before.relogin = 60000
> >>
> >> sasl.kerberos.service.name = null
> >>
> >> sasl.kerberos.ticket.renew.jitter = 0.05
> >>
> >> sasl.kerberos.ticket.renew.window.factor = 0.8
> >>
> >> sasl.mechanism = GSSAPI
> >>
> >> security.protocol = PLAINTEXT
> >>
> >> send.buffer.bytes = 131072
> >>
> >> session.timeout.ms = 10000
> >>
> >> ssl.cipher.suites = null
> >>
> >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >>
> >> ssl.endpoint.identification.algorithm = null
> >>
> >> ssl.key.password = null
> >>
> >> ssl.keymanager.algorithm = SunX509
> >>
> >> ssl.keystore.location = null
> >>
> >> ssl.keystore.password = null
> >>
> >> ssl.keystore.type = JKS
> >>
> >> ssl.protocol = TLS
> >>
> >> ssl.provider = null
> >>
> >> ssl.secure.random.implementation = null
> >>
> >> ssl.trustmanager.algorithm = PKIX
> >>
> >> ssl.truststore.location = null
> >>
> >> ssl.truststore.password = null
> >>
> >> ssl.truststore.type = JKS
> >>
> >> value.deserializer = class
> >> org.apache.kafka.common.serialization.StringDeserializer
> >
> >
> >
> > Below is the output of above driver for 5 partition topic.  Offsets
> always
> > remain 0 for all but a single partition in this case 3
> >
> > simple_logtest 3 offsets: 1623531 to 1623531
> > simple_logtest 0 offsets: 0 to 0
> > simple_logtest 1 offsets: 0 to 0
> > simple_logtest 2 offsets: 0 to 0
> > simple_logtest 4 offsets: 0 to 0
> > simple_logtest 3 offsets: 1623531 to 1623531
> > simple_logtest 0 offsets: 0 to 0
> > simple_logtest 1 offsets: 0 to 0
> > simple_logtest 2 offsets: 0 to 0
> > simple_logtest 4 offsets: 0 to 0
> >
> > simple_logtest 3 offsets: 1623531 to 1623531
> >
> >
> >
> >

