[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15685190#comment-15685190 ]
Cody Koeninger commented on SPARK-18506: ---------------------------------------- I'd try to isolate aws vs gce as a possible cause before filing a bug against the kafka consumer. I don't get paid to work on Spark, so most any time spent on it is during non-work hours regardless :) > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > ----------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. > Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) > {noformat} > I've enclosed below the completely stripped down trivial test driver that > shows this behavior. After spending 2 weeks trying all combinations with a > really stripped down driver, we think either there might be a bug in the > kafka spark integration or if the kafka 0.10/spark upgrade needs special > configuration, it should be fantastic if it was clearer in the documentation. > But currently we cannot upgrade. > {code} > package com.xxxxx.labs.analytics.diagnostics.spark.drivers > import org.apache.kafka.common.serialization.StringDeserializer > import org.apache.spark.SparkConf > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.streaming.kafka010._ > import org.apache.spark.streaming.kafka010.LocationStrategies > import org.apache.spark.streaming.kafka010.ConsumerStrategies > /** > * > * This driver is only for pulling data from the stream and logging to > output just to isolate single partition bug > */ > object SimpleKafkaLoggingDriver { > def main(args: Array[String]) { > if (args.length != 4) { > System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> > <topic> <groupId> <offsetReset>") > System.exit(1) > } > val Array(brokers, topic, groupId, offsetReset) = args > val preferredHosts = LocationStrategies.PreferConsistent > val topics = List(topic) > val kafkaParams = Map( > "bootstrap.servers" -> brokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> groupId, > "auto.offset.reset" -> offsetReset, > "enable.auto.commit" -> (false: java.lang.Boolean) > ) > val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic) > val streamingContext = new StreamingContext(sparkConf, Seconds(5)) > val dstream = KafkaUtils.createDirectStream[String, String]( > streamingContext, > preferredHosts, > ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) > dstream.foreachRDD { rdd => > // Get the offset ranges in the RDD and log > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > for (o <- offsetRanges) { > println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to > ${o.untilOffset}") > } > } > streamingContext.start > streamingContext.awaitTermination() > } > } > {code} > {noformat} > 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092] > check.crcs = true > client.id = > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = simple_test_group > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 500 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > {noformat} > Below is the output of above driver for 5 partition topic. Offsets always > remain 0 for all but a single partition in this case partition 3 > {noformat} > simple_logtest 3 offsets: 1623531 to 1623531 > simple_logtest 0 offsets: 0 to 0 > simple_logtest 1 offsets: 0 to 0 > simple_logtest 2 offsets: 0 to 0 > simple_logtest 4 offsets: 0 to 0 > simple_logtest 3 offsets: 1623531 to 1623531 > simple_logtest 0 offsets: 0 to 0 > simple_logtest 1 offsets: 0 to 0 > simple_logtest 2 offsets: 0 to 0 > simple_logtest 4 offsets: 0 to 0 > simple_logtest 3 offsets: 1623531 to 1623531 > {noformat} > Producer is posting messages evenly into each partition: > {noformat} > devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ > kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > '10.102.22.11:9092' --topic simple_logtest --time -2 > simple_logtest:2:0 > simple_logtest:4:0 > simple_logtest:1:0 > simple_logtest:3:0 > simple_logtest:0:0 > devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ > kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > '10.102.22.11:9092' --topic simple_logtest --time -1 > simple_logtest:2:722964 > simple_logtest:4:722864 > simple_logtest:1:722957 > simple_logtest:3:722960 > simple_logtest:0:723021 > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org