[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15684995#comment-15684995
 ] 

Heji Kim commented on SPARK-18506:
----------------------------------

Just confirming that when I use ConsumerStrategy.Assign with all partitions 
starting at 0, everything works as expected.  

[2016-11-21 22:46:15,016] INFO OFFSET: null KafkaRDD[4] at createDirectStream 
at SimpleKafkaLoggingDriverAssignedOffset.java:62 
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 3 1169577 1174129 
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 0 1169615 1174109 
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 1 1169561 1174125 
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 2 1169567 1174132 
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 4 1169628 1174202 
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)

Code below.



package com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka;

import java.util.*;

import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;

/*
   Simplest possible java driver that assigns starting offset
*/


public final class SimpleKafkaLoggingDriverAssignedOffset {
    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleKafkaLoggingDriverAssignedOffset.class);
    public static final String APP_NAME = "TEST_ASSIGNED_OFFSET";


    public static void main(final String[] args) throws Exception {
        if (args.length != 4) {
            throw new IllegalArgumentException("Driver passed in incorrect 
parameters" +
                    "Usage: SimpleKafkaLoggingDriverAssignedOffset <broker 
bootstrap servers> <topic> <groupId> <partitionSize> ");
        }
        String kafka_topic = args[1];
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers",args[0]);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", args[2]);
        // kafkaParams.put("auto.offset.reset",args[3]);
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList(kafka_topic);
        final SparkConf sparkConfiguration = new 
SparkConf().setAppName(APP_NAME+"_"+args[1]);
        int totalPartitions= Integer.valueOf(args[3]);
        // create the streaming context
        final JavaStreamingContext streamingContext = new 
JavaStreamingContext(sparkConfiguration, 
Durations.seconds(Integer.valueOf(args[3])));
        // force log
        streamingContext.ssc().sc().setLogLevel("DEBUG");

        // assign fixed topic partitions starting at 0
        final Map<TopicPartition,Long> partitionStart=new HashedMap();
        for (int i=0; i<totalPartitions; i++ ) {
            partitionStart.put(new TopicPartition(kafka_topic, i), 
Long.valueOf(0));
        }

        Assign fixedAssignment = new Assign 
(partitionStart.keySet(),kafkaParams, partitionStart);
        final JavaInputDStream<ConsumerRecord<String, String>> directStream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        fixedAssignment
                );


         directStream.foreachRDD(rdd -> {
             LOGGER.info("OFFSET: " + rdd.rdd().name() + " " + rdd.toString());
             for (final OffsetRange offset : ((HasOffsetRanges) 
rdd.rdd()).offsetRanges()) {
                 LOGGER.info("OFFSET: " + offset.topic() + ' ' + 
offset.partition() + ' ' + offset.fromOffset() + ' '
                         + offset.untilOffset());
             }
        });

           // Start the streaming context and await termination
        LOGGER.info("KCP: starting SimpleKafkaLoggingDriverAssignedOffset 
Driver with master URL >{}<",
                streamingContext.sparkContext().master());
        streamingContext.start();
        LOGGER.info("KCP: spark state: {}", streamingContext.getState().name());
        streamingContext.awaitTermination();
    }
}


> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18506
>                 URL: https://issues.apache.org/jira/browse/SPARK-18506
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.2
>         Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>            Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driver that 
> shows this behavior.  After spending 2 weeks trying all combinations with a 
> really stripped down driver, we think either there might be a bug in the 
> kafka spark integration or if the kafka 0.10/spark upgrade needs special 
> configuration, it should be fantastic if it was clearer in the documentation. 
> But currently we cannot upgrade.
> {code}
> package com.xxxxx.labs.analytics.diagnostics.spark.drivers
> import org.apache.kafka.common.serialization.StringDeserializer
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.LocationStrategies
> import org.apache.spark.streaming.kafka010.ConsumerStrategies
> /**
>   *
>   * This driver is only for pulling data from the stream and logging to 
> output just to isolate single partition bug
>   */
> object SimpleKafkaLoggingDriver {
>   def main(args: Array[String]) {
>     if (args.length != 4) {
>       System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> 
> <topic> <groupId> <offsetReset>")
>       System.exit(1)
>     }
>     val Array(brokers, topic, groupId, offsetReset) = args
>     val preferredHosts = LocationStrategies.PreferConsistent
>     val topics = List(topic)
>     val kafkaParams = Map(
>       "bootstrap.servers" -> brokers,
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "group.id" -> groupId,
>       "auto.offset.reset" -> offsetReset,
>       "enable.auto.commit" -> (false: java.lang.Boolean)
>     )
>     val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic)
>     val streamingContext = new StreamingContext(sparkConf, Seconds(5))
>     val dstream = KafkaUtils.createDirectStream[String, String](
>       streamingContext,
>       preferredHosts,
>       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
>     dstream.foreachRDD { rdd =>
>       // Get the offset ranges in the RDD and log
>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>       for (o <- offsetRanges) {
>         println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to 
> ${o.untilOffset}")
>       }
>     }
>     streamingContext.start
>     streamingContext.awaitTermination()
>   }
> }
> {code}
> {noformat}
> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = simple_test_group
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> {noformat}
> Below is the output of above driver for 5 partition topic.  Offsets always 
> remain 0 for all but a single partition in this case partition 3
> {noformat}
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531 
> {noformat}
> Producer is posting messages evenly into each partition:
> {noformat}
> devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ 
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> '10.102.22.11:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$ 
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> '10.102.22.11:9092' --topic simple_logtest --time -1
> simple_logtest:2:722964
> simple_logtest:4:722864
> simple_logtest:1:722957
> simple_logtest:3:722960
> simple_logtest:0:723021
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to