[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15685190#comment-15685190
 ] 

Cody Koeninger commented on SPARK-18506:
----------------------------------------

I'd try to isolate aws vs gce as a possible cause before filing a bug against 
the kafka consumer.

I don't get paid to work on Spark, so most any time spent on it is during 
non-work hours regardless :)

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18506
>                 URL: https://issues.apache.org/jira/browse/SPARK-18506
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.2
>         Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>            Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driver that 
> shows this behavior.  After spending 2 weeks trying all combinations with a 
> really stripped down driver, we think either there might be a bug in the 
> kafka spark integration or if the kafka 0.10/spark upgrade needs special 
> configuration, it should be fantastic if it was clearer in the documentation. 
> But currently we cannot upgrade.
> {code}
> package com.xxxxx.labs.analytics.diagnostics.spark.drivers
> import org.apache.kafka.common.serialization.StringDeserializer
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.LocationStrategies
> import org.apache.spark.streaming.kafka010.ConsumerStrategies
> /**
>   *
>   * This driver is only for pulling data from the stream and logging to 
> output just to isolate single partition bug
>   */
> object SimpleKafkaLoggingDriver {
>   def main(args: Array[String]) {
>     if (args.length != 4) {
>       System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> 
> <topic> <groupId> <offsetReset>")
>       System.exit(1)
>     }
>     val Array(brokers, topic, groupId, offsetReset) = args
>     val preferredHosts = LocationStrategies.PreferConsistent
>     val topics = List(topic)
>     val kafkaParams = Map(
>       "bootstrap.servers" -> brokers,
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "group.id" -> groupId,
>       "auto.offset.reset" -> offsetReset,
>       "enable.auto.commit" -> (false: java.lang.Boolean)
>     )
>     val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic)
>     val streamingContext = new StreamingContext(sparkConf, Seconds(5))
>     val dstream = KafkaUtils.createDirectStream[String, String](
>       streamingContext,
>       preferredHosts,
>       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
>     dstream.foreachRDD { rdd =>
>       // Get the offset ranges in the RDD and log
>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>       for (o <- offsetRanges) {
>         println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to 
> ${o.untilOffset}")
>       }
>     }
>     streamingContext.start
>     streamingContext.awaitTermination()
>   }
> }
> {code}
> {noformat}
> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = simple_test_group
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> {noformat}
> Below is the output of above driver for 5 partition topic.  Offsets always 
> remain 0 for all but a single partition in this case partition 3
> {noformat}
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531 
> {noformat}
> Producer is posting messages evenly into each partition:
> {noformat}
> devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ 
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> '10.102.22.11:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ 
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> '10.102.22.11:9092' --topic simple_logtest --time -1
> simple_logtest:2:722964
> simple_logtest:4:722864
> simple_logtest:1:722957
> simple_logtest:3:722960
> simple_logtest:0:723021
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to