Cody,

Can you point me to the code that loads offsets? As far as I can see with Spark 
2.1, the only offset load is from checkpoint.

Thank you!

Bryan

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Cody Koeninger <c...@koeninger.org>
Sent: Thursday, June 14, 2018 4:00:31 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

The expectation is that you shouldn't have to manually load offsets
from kafka, because the underlying kafka consumer on the driver will
start at the offsets associated with the given group id.

That's the behavior I see with this example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala

What does bin/kafka-consumer-groups.sh show for your group id?

On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
> Hello.
>
> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based on
> the documentation
> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
> it appears that you can now use Kafka itself to store offsets.
>
> I've setup a  simple Kafka DStream:
> val kafkaParameters = Map[String, String](
>   "metadata.broker.list" -> brokers,
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> false.toString,
>   "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>   "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>   "bootstrap.servers" -> brokersToUse.mkString(","),
>   "group.id" -> applicationName
> )
>
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
> KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>
>
> I then commit the offsets:
>
> var offsets: Array[OffsetRange] = Array()
> stream.foreachRDD(rdd => {
>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   logger.info(s"Offsets: ${offsets.mkString("|")}")
> })
>
> // Future: Move this after we've done processing.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>
> The offsets appear to commit successfully. However, on restart the streaming
> application consistently starts from latest whenever the Spark checkpoint is
> changed.  Drilling into the code it does not appear that re-loading offset
> data is supported in the Spark Streaming Kafka library.  How is this
> expected to work?  Is there an example of saving the offsets to Kafka and
> then loading them from Kafka?
>
> Regards,
>
> Bryan Jeffrey

Reply via email to