The code that loads offsets from kafka is in e.g. org.apache.kafka.clients.consumer, it's not in spark.
On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Cody, > > Can you point me to the code that loads offsets? As far as I can see with > Spark 2.1, the only offset load is from checkpoint. > > Thank you! > > Bryan > > Get Outlook for Android > > ________________________________ > From: Cody Koeninger <c...@koeninger.org> > Sent: Thursday, June 14, 2018 4:00:31 PM > To: Bryan Jeffrey > Cc: user > Subject: Re: Kafka Offset Storage: Fetching Offsets > > The expectation is that you shouldn't have to manually load offsets > from kafka, because the underlying kafka consumer on the driver will > start at the offsets associated with the given group id. > > That's the behavior I see with this example: > > https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala > > What does bin/kafka-consumer-groups.sh show for your group id? > > On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: >> Hello. >> >> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface. Based >> on >> the documentation >> >> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself), >> it appears that you can now use Kafka itself to store offsets. >> >> I've setup a simple Kafka DStream: >> val kafkaParameters = Map[String, String]( >> "metadata.broker.list" -> brokers, >> "auto.offset.reset" -> "latest", >> "enable.auto.commit" -> false.toString, >> "key.deserializer" -> >> >> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName, >> "value.deserializer" -> classOf[MyDecoder].getCanonicalName, >> "partition.assignment.strategy" -> >> >> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName, >> "bootstrap.servers" -> brokersToUse.mkString(","), >> "group.id" -> applicationName >> ) >> >> val consumerStrategy = ConsumerStrategies.Subscribe[String, >> DecodedData](topics.toSeq, kafkaParameters) >> KafkaUtils.createDirectStream(ssc, locationStrategy = >> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy) >> >> >> I then commit the offsets: >> >> var offsets: Array[OffsetRange] = Array() >> stream.foreachRDD(rdd => { >> offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> logger.info(s"Offsets: ${offsets.mkString("|")}") >> }) >> >> // Future: Move this after we've done processing. >> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) >> >> The offsets appear to commit successfully. However, on restart the >> streaming >> application consistently starts from latest whenever the Spark checkpoint >> is >> changed. Drilling into the code it does not appear that re-loading offset >> data is supported in the Spark Streaming Kafka library. How is this >> expected to work? Is there an example of saving the offsets to Kafka and >> then loading them from Kafka? >> >> Regards, >> >> Bryan Jeffrey --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org