Cody,

Where is that called in the driver? The only call I see from Subscribe is to 
load the offset from checkpoint.

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Cody Koeninger <c...@koeninger.org>
Sent: Thursday, June 14, 2018 4:24:58 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

The code that loads offsets from kafka is in e.g.
org.apache.kafka.clients.consumer, it's not in spark.

On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
> Cody,
>
> Can you point me to the code that loads offsets? As far as I can see with
> Spark 2.1, the only offset load is from checkpoint.
>
> Thank you!
>
> Bryan
>
> Get Outlook for Android
>
> ________________________________
> From: Cody Koeninger <c...@koeninger.org>
> Sent: Thursday, June 14, 2018 4:00:31 PM
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The expectation is that you shouldn't have to manually load offsets
> from kafka, because the underlying kafka consumer on the driver will
> start at the offsets associated with the given group id.
>
> That's the behavior I see with this example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>
> What does bin/kafka-consumer-groups.sh show for your group id?
>
> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>> Hello.
>>
>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>> on
>> the documentation
>>
>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>> it appears that you can now use Kafka itself to store offsets.
>>
>> I've setup a  simple Kafka DStream:
>> val kafkaParameters = Map[String, String](
>>   "metadata.broker.list" -> brokers,
>>   "auto.offset.reset" -> "latest",
>>   "enable.auto.commit" -> false.toString,
>>   "key.deserializer" ->
>>
>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>   "partition.assignment.strategy" ->
>>
>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>   "group.id" -> applicationName
>> )
>>
>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> DecodedData](topics.toSeq, kafkaParameters)
>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>
>>
>> I then commit the offsets:
>>
>> var offsets: Array[OffsetRange] = Array()
>> stream.foreachRDD(rdd => {
>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>> })
>>
>> // Future: Move this after we've done processing.
>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>
>> The offsets appear to commit successfully. However, on restart the
>> streaming
>> application consistently starts from latest whenever the Spark checkpoint
>> is
>> changed.  Drilling into the code it does not appear that re-loading offset
>> data is supported in the Spark Streaming Kafka library.  How is this
>> expected to work?  Is there an example of saving the offsets to Kafka and
>> then loading them from Kafka?
>>
>> Regards,
>>
>> Bryan Jeffrey

Reply via email to