Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
been struggling with this show stopper problem.

When we run our drivers with auto.offset.reset=latest ingesting from a
single kafka topic with 10 partitions, the driver reads correctly from all
10 partitions.

However when we use auto.offset.reset=earliest, the driver will read only a
single partition.

When we turn on the debug logs, we sometimes see partitions being set to
different offset configuration even though the consumer config correctly
indicates auto.offset.reset=earliest.

8 DEBUG Resetting offset for *partition simple_test-8 to earliest offset*.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for *partition simple_test-9 to latest offset*.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
> to broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
> to broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> from broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> from broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> (org.apache.kafka.clients.consumer.internals.Fetcher)



I've enclosed below the completely stripped down trivial test driver that
shows this behavior. We normally run with YARN 2.7.3 but have also tried
running spark standalone mode which has the same behavior. Our drivers are
normally java but we have tried the scala version which also has the same
incorrect behavior. We have tried different LocationStrategies and
partition assignment strategies all without success.  Any insight would be
greatly appreciated.

package com.xxxxx.labs.analytics.diagnostics.spark.drivers

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies


/**
  *
  * This driver is only for pulling data from the stream and logging
to output just to isolate single partition bug
  */
object SimpleKafkaLoggingDriver {
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: SimpleTestDriver <broker bootstrap
servers> <topic> <groupId> <offsetReset>")
      System.exit(1)
    }

    val Array(brokers, topic, groupId, offsetReset) = args
    val preferredHosts = LocationStrategies.PreferConsistent
    val topics = List(topic)

    val kafkaParams = Map(
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> offsetReset,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic)
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))


    val dstream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

    dstream.foreachRDD { rdd =>
      // Get the offset ranges in the RDD and log
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset}
to ${o.untilOffset}")
      }
    }

    streamingContext.start
    streamingContext.awaitTermination()

  }

}



16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:

auto.commit.interval.ms = 5000

auto.offset.reset = earliest

bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]

check.crcs = true

client.id =

connections.max.idle.ms = 540000

enable.auto.commit = false

exclude.internal.topics = true

fetch.max.bytes = 52428800

fetch.max.wait.ms = 500

fetch.min.bytes = 1

group.id = simple_test_group

heartbeat.interval.ms = 3000

interceptor.classes = null

key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer

max.partition.fetch.bytes = 1048576

max.poll.interval.ms = 300000

max.poll.records = 500

metadata.max.age.ms = 300000

metric.reporters = []

metrics.num.samples = 2

metrics.sample.window.ms = 30000

partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]

receive.buffer.bytes = 65536

reconnect.backoff.ms = 50

request.timeout.ms = 305000

retry.backoff.ms = 100

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 60000

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.mechanism = GSSAPI

security.protocol = PLAINTEXT

send.buffer.bytes = 131072

session.timeout.ms = 10000

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.endpoint.identification.algorithm = null

ssl.key.password = null

ssl.keymanager.algorithm = SunX509

ssl.keystore.location = null

ssl.keystore.password = null

ssl.keystore.type = JKS

ssl.protocol = TLS

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = null

ssl.truststore.password = null

ssl.truststore.type = JKS

value.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer



Below is the output of above driver for 5 partition topic.  Offsets always
remain 0 for all but a single partition in this case 3

simple_logtest 3 offsets: 1623531 to 1623531
simple_logtest 0 offsets: 0 to 0
simple_logtest 1 offsets: 0 to 0
simple_logtest 2 offsets: 0 to 0
simple_logtest 4 offsets: 0 to 0
simple_logtest 3 offsets: 1623531 to 1623531
simple_logtest 0 offsets: 0 to 0
simple_logtest 1 offsets: 0 to 0
simple_logtest 2 offsets: 0 to 0
simple_logtest 4 offsets: 0 to 0

simple_logtest 3 offsets: 1623531 to 1623531

Reply via email to