Cody, Thank you. Let me see if I can reproduce this. We're not seeing offsets load correctly on startup - but perhaps there is an error on my side.
Bryan Get Outlook for Android<https://aka.ms/ghei36> ________________________________ From: Cody Koeninger <c...@koeninger.org> Sent: Thursday, June 14, 2018 5:01:01 PM To: Bryan Jeffrey Cc: user Subject: Re: Kafka Offset Storage: Fetching Offsets Offsets are loaded when you instantiate an org.apache.kafka.clients.consumer.KafkaConsumer, subscribe, and poll. There's not an explicit api for it. Have you looked at the output of kafka-consumer-groups.sh and tried the example code I linked to? bash-3.2$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group commitexample --describe Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers). Consumer group 'commitexample' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 1056 1656 600 - - scala> val c = new KafkaConsumer[String, String](kafkaParams.asJava) c: org.apache.kafka.clients.consumer.KafkaConsumer[String,String] = org.apache.kafka.clients.consumer.KafkaConsumer@780cbdf8 scala> c.subscribe(java.util.Arrays.asList("test")) scala> c.poll(0) scala> c.position(new TopicPartition("test", 0)) res4: Long = 1056 On Thu, Jun 14, 2018 at 3:33 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Cody, > > Where is that called in the driver? The only call I see from Subscribe is to > load the offset from checkpoint. > > Get Outlook for Android > > ________________________________ > From: Cody Koeninger <c...@koeninger.org> > Sent: Thursday, June 14, 2018 4:24:58 PM > > To: Bryan Jeffrey > Cc: user > Subject: Re: Kafka Offset Storage: Fetching Offsets > > The code that loads offsets from kafka is in e.g. > org.apache.kafka.clients.consumer, it's not in spark. > > On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: >> Cody, >> >> Can you point me to the code that loads offsets? As far as I can see with >> Spark 2.1, the only offset load is from checkpoint. >> >> Thank you! >> >> Bryan >> >> Get Outlook for Android >> >> ________________________________ >> From: Cody Koeninger <c...@koeninger.org> >> Sent: Thursday, June 14, 2018 4:00:31 PM >> To: Bryan Jeffrey >> Cc: user >> Subject: Re: Kafka Offset Storage: Fetching Offsets >> >> The expectation is that you shouldn't have to manually load offsets >> from kafka, because the underlying kafka consumer on the driver will >> start at the offsets associated with the given group id. >> >> That's the behavior I see with this example: >> >> >> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala >> >> What does bin/kafka-consumer-groups.sh show for your group id? >> >> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey <bryan.jeff...@gmail.com> >> wrote: >>> Hello. >>> >>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface. Based >>> on >>> the documentation >>> >>> >>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself), >>> it appears that you can now use Kafka itself to store offsets. >>> >>> I've setup a simple Kafka DStream: >>> val kafkaParameters = Map[String, String]( >>> "metadata.broker.list" -> brokers, >>> "auto.offset.reset" -> "latest", >>> "enable.auto.commit" -> false.toString, >>> "key.deserializer" -> >>> >>> >>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName, >>> "value.deserializer" -> classOf[MyDecoder].getCanonicalName, >>> "partition.assignment.strategy" -> >>> >>> >>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName, >>> "bootstrap.servers" -> brokersToUse.mkString(","), >>> "group.id" -> applicationName >>> ) >>> >>> val consumerStrategy = ConsumerStrategies.Subscribe[String, >>> DecodedData](topics.toSeq, kafkaParameters) >>> KafkaUtils.createDirectStream(ssc, locationStrategy = >>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy) >>> >>> >>> I then commit the offsets: >>> >>> var offsets: Array[OffsetRange] = Array() >>> stream.foreachRDD(rdd => { >>> offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>> logger.info(s"Offsets: ${offsets.mkString("|")}") >>> }) >>> >>> // Future: Move this after we've done processing. >>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) >>> >>> The offsets appear to commit successfully. However, on restart the >>> streaming >>> application consistently starts from latest whenever the Spark checkpoint >>> is >>> changed. Drilling into the code it does not appear that re-loading >>> offset >>> data is supported in the Spark Streaming Kafka library. How is this >>> expected to work? Is there an example of saving the offsets to Kafka and >>> then loading them from Kafka? >>> >>> Regards, >>> >>> Bryan Jeffrey