[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Heji Kim updated SPARK-18506: ----------------------------- Description: Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our drivers to read all partitions of a single stream when kafka auto.offset.reset=earliest running on a real cluster(separate VM nodes). When we run our drivers with auto.offset.reset=latest ingesting from a single kafka topic with multiple partitions (usually 10 but problem shows up with only 3 partitions), the driver reads correctly from all partitions. Unfortunately, we need "earliest" for exactly once semantics. In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using spark-streaming-kafka-0-8_2.11 with the prior setting auto.offset.reset=smallest runs correctly. We have tried the following configurations in trying to isolate our problem but it is only auto.offset.reset=earliest on a "real multi-machine cluster" which causes this problem. 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) instead of YARN 2.7.3. Single partition read problem persists both cases. Please note this problem occurs on an actual cluster of separate VM nodes (but not when our engineer runs in as a cluster on his own Mac.) 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. 3. Turned off checkpointing. Problem persists with or without checkpointing. 4. Turned off backpressure. Problem persists with or without backpressure. 5. Tried both partition.assignment.strategy RangeAssignor and RoundRobinAssignor. Broken with both. 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with both. 7. Tried the simplest scala driver that only logs. (Our team uses java.) Broken with both. 8. Tried increasing GCE capacity for cluster but already we were highly overprovisioned for cores and memory. Also tried ramping up executors and cores. Since driver works with auto.offset.reset=latest, we have ruled out GCP cloud infrastructure issues. When we turn on the debug logs, we sometimes see partitions being set to different offset configuration even though the consumer config correctly indicates auto.offset.reset=earliest. {noformat} 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} to broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} to broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} from broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} from broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 (org.apache.kafka.clients.consumer.internals.Fetcher) {noformat} I've enclosed below the completely stripped down trivial test driver that shows this behavior. After spending 2 weeks trying all combinations with a really stripped down driver, we think either there might be a bug in the kafka spark integration or if the kafka 0.10/spark upgrade needs special configuration, it should be fantastic if it was clearer in the documentation. But currently we cannot upgrade. {code} package com.xxxxx.labs.analytics.diagnostics.spark.drivers import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies /** * * This driver is only for pulling data from the stream and logging to output just to isolate single partition bug */ object SimpleKafkaLoggingDriver { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> <topic> <groupId> <offsetReset>") System.exit(1) } val Array(brokers, topic, groupId, offsetReset) = args val preferredHosts = LocationStrategies.PreferConsistent val topics = List(topic) val kafkaParams = Map( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> offsetReset, "enable.auto.commit" -> (false: java.lang.Boolean) ) val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic) val streamingContext = new StreamingContext(sparkConf, Seconds(5)) val dstream = KafkaUtils.createDirectStream[String, String]( streamingContext, preferredHosts, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) dstream.foreachRDD { rdd => // Get the offset ranges in the RDD and log val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}") } } streamingContext.start streamingContext.awaitTermination() } } {code} {noformat} 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = simple_test_group heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer {noformat} Below is the output of above driver for 5 partition topic. Offsets always remain 0 for all but a single partition in this case partition 3 {noformat} simple_logtest 3 offsets: 1623531 to 1623531 simple_logtest 0 offsets: 0 to 0 simple_logtest 1 offsets: 0 to 0 simple_logtest 2 offsets: 0 to 0 simple_logtest 4 offsets: 0 to 0 simple_logtest 3 offsets: 1623531 to 1623531 simple_logtest 0 offsets: 0 to 0 simple_logtest 1 offsets: 0 to 0 simple_logtest 2 offsets: 0 to 0 simple_logtest 4 offsets: 0 to 0 simple_logtest 3 offsets: 1623531 to 1623531 {noformat} Producer is posting messages evenly into each partition: {noformat} devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '10.102.22.11:9092' --topic simple_logtest --time -2 simple_logtest:2:0 simple_logtest:4:0 simple_logtest:1:0 simple_logtest:3:0 simple_logtest:0:0 devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '10.102.22.11:9092' --topic simple_logtest --time -1 simple_logtest:2:722964 simple_logtest:4:722864 simple_logtest:1:722957 simple_logtest:3:722960 simple_logtest:0:723021 {noformat} was: Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our drivers to read all partitions of a single stream when kafka auto.offset.reset=earliest. When we run our drivers with auto.offset.reset=latest ingesting from a single kafka topic with multiple partitions (usually 10 but problem shows up with only 3 partitions), the driver reads correctly from all partitions. Unfortunately, we need "earliest" for exactly once semantics. In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using spark-streaming-kafka-0-8_2.11 with the prior setting auto.offset.reset=smallest runs correctly. We have tried the following configurations in trying to isolate our problem but it is only auto.offset.reset=earliest on a "real multi-machine cluster" which causes this problem. 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) instead of YARN 2.7.3. Single partition read problem persists both cases. Please note this problem occurs on an actual cluster of separate VM nodes (but not when our engineer runs in as a cluster on his own Mac.) 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. 3. Turned off checkpointing. Problem persists with or without checkpointing. 4. Turned off backpressure. Problem persists with or without backpressure. 5. Tried both partition.assignment.strategy RangeAssignor and RoundRobinAssignor. Broken with both. 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with both. 7. Tried the simplest scala driver that only logs. (Our team uses java.) Broken with both. 8. Tried increasing GCE capacity for cluster but already we were highly overprovisioned for cores and memory. Also tried ramping up executors and cores. Since driver works with auto.offset.reset=latest, we have ruled out GCP cloud infrastructure issues. When we turn on the debug logs, we sometimes see partitions being set to different offset configuration even though the consumer config correctly indicates auto.offset.reset=earliest. {noformat} 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} to broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} to broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} from broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} from broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 (org.apache.kafka.clients.consumer.internals.Fetcher) {noformat} I've enclosed below the completely stripped down trivial test driver that shows this behavior. After spending 2 weeks trying all combinations with a really stripped down driver, we think either there might be a bug in the kafka spark integration or if the kafka 0.10/spark upgrade needs special configuration, it should be fantastic if it was clearer in the documentation. But currently we cannot upgrade. {code} package com.xxxxx.labs.analytics.diagnostics.spark.drivers import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies /** * * This driver is only for pulling data from the stream and logging to output just to isolate single partition bug */ object SimpleKafkaLoggingDriver { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> <topic> <groupId> <offsetReset>") System.exit(1) } val Array(brokers, topic, groupId, offsetReset) = args val preferredHosts = LocationStrategies.PreferConsistent val topics = List(topic) val kafkaParams = Map( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> offsetReset, "enable.auto.commit" -> (false: java.lang.Boolean) ) val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic) val streamingContext = new StreamingContext(sparkConf, Seconds(5)) val dstream = KafkaUtils.createDirectStream[String, String]( streamingContext, preferredHosts, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) dstream.foreachRDD { rdd => // Get the offset ranges in the RDD and log val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}") } } streamingContext.start streamingContext.awaitTermination() } } {code} {noformat} 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = simple_test_group heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer {noformat} Below is the output of above driver for 5 partition topic. Offsets always remain 0 for all but a single partition in this case partition 3 {noformat} simple_logtest 3 offsets: 1623531 to 1623531 simple_logtest 0 offsets: 0 to 0 simple_logtest 1 offsets: 0 to 0 simple_logtest 2 offsets: 0 to 0 simple_logtest 4 offsets: 0 to 0 simple_logtest 3 offsets: 1623531 to 1623531 simple_logtest 0 offsets: 0 to 0 simple_logtest 1 offsets: 0 to 0 simple_logtest 2 offsets: 0 to 0 simple_logtest 4 offsets: 0 to 0 simple_logtest 3 offsets: 1623531 to 1623531 {noformat} Producer is posting messages evenly into each partition: {noformat} devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '10.102.22.11:9092' --topic simple_logtest --time -2 simple_logtest:2:0 simple_logtest:4:0 simple_logtest:1:0 simple_logtest:3:0 simple_logtest:0:0 devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '10.102.22.11:9092' --topic simple_logtest --time -1 simple_logtest:2:722964 simple_logtest:4:722864 simple_logtest:1:722957 simple_logtest:3:722960 simple_logtest:0:723021 {noformat} > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > ----------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. > Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) > {noformat} > I've enclosed below the completely stripped down trivial test driver that > shows this behavior. After spending 2 weeks trying all combinations with a > really stripped down driver, we think either there might be a bug in the > kafka spark integration or if the kafka 0.10/spark upgrade needs special > configuration, it should be fantastic if it was clearer in the documentation. > But currently we cannot upgrade. > {code} > package com.xxxxx.labs.analytics.diagnostics.spark.drivers > import org.apache.kafka.common.serialization.StringDeserializer > import org.apache.spark.SparkConf > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.streaming.kafka010._ > import org.apache.spark.streaming.kafka010.LocationStrategies > import org.apache.spark.streaming.kafka010.ConsumerStrategies > /** > * > * This driver is only for pulling data from the stream and logging to > output just to isolate single partition bug > */ > object SimpleKafkaLoggingDriver { > def main(args: Array[String]) { > if (args.length != 4) { > System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> > <topic> <groupId> <offsetReset>") > System.exit(1) > } > val Array(brokers, topic, groupId, offsetReset) = args > val preferredHosts = LocationStrategies.PreferConsistent > val topics = List(topic) > val kafkaParams = Map( > "bootstrap.servers" -> brokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> groupId, > "auto.offset.reset" -> offsetReset, > "enable.auto.commit" -> (false: java.lang.Boolean) > ) > val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic) > val streamingContext = new StreamingContext(sparkConf, Seconds(5)) > val dstream = KafkaUtils.createDirectStream[String, String]( > streamingContext, > preferredHosts, > ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) > dstream.foreachRDD { rdd => > // Get the offset ranges in the RDD and log > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > for (o <- offsetRanges) { > println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to > ${o.untilOffset}") > } > } > streamingContext.start > streamingContext.awaitTermination() > } > } > {code} > {noformat} > 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092] > check.crcs = true > client.id = > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = simple_test_group > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 500 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > {noformat} > Below is the output of above driver for 5 partition topic. Offsets always > remain 0 for all but a single partition in this case partition 3 > {noformat} > simple_logtest 3 offsets: 1623531 to 1623531 > simple_logtest 0 offsets: 0 to 0 > simple_logtest 1 offsets: 0 to 0 > simple_logtest 2 offsets: 0 to 0 > simple_logtest 4 offsets: 0 to 0 > simple_logtest 3 offsets: 1623531 to 1623531 > simple_logtest 0 offsets: 0 to 0 > simple_logtest 1 offsets: 0 to 0 > simple_logtest 2 offsets: 0 to 0 > simple_logtest 4 offsets: 0 to 0 > simple_logtest 3 offsets: 1623531 to 1623531 > {noformat} > Producer is posting messages evenly into each partition: > {noformat} > devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ > kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > '10.102.22.11:9092' --topic simple_logtest --time -2 > simple_logtest:2:0 > simple_logtest:4:0 > simple_logtest:1:0 > simple_logtest:3:0 > simple_logtest:0:0 > devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ > kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > '10.102.22.11:9092' --topic simple_logtest --time -1 > simple_logtest:2:722964 > simple_logtest:4:722864 > simple_logtest:1:722957 > simple_logtest:3:722960 > simple_logtest:0:723021 > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org