Cody,

Thank you. Let me see if I can reproduce this. We're not seeing offsets load 
correctly on startup - but perhaps there is an error on my side.

Bryan

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Cody Koeninger <c...@koeninger.org>
Sent: Thursday, June 14, 2018 5:01:01 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Kafka Offset Storage: Fetching Offsets

Offsets are loaded when you instantiate an
org.apache.kafka.clients.consumer.KafkaConsumer, subscribe, and poll.
There's not an explicit api for it.  Have you looked at the output of
kafka-consumer-groups.sh and tried the example code I linked to?


bash-3.2$ ./bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group commitexample --describe
Note: This will only show information about consumers that use the
Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'commitexample' has no active members.
TOPIC                          PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAG        CONSUMER-ID
      HOST                           CLIENT-ID
test                           0          1056            1656
   600        -                                                 -


scala> val c = new KafkaConsumer[String, String](kafkaParams.asJava)
c: org.apache.kafka.clients.consumer.KafkaConsumer[String,String] =
org.apache.kafka.clients.consumer.KafkaConsumer@780cbdf8
scala> c.subscribe(java.util.Arrays.asList("test"))
scala> c.poll(0)
scala> c.position(new TopicPartition("test", 0))
res4: Long = 1056






On Thu, Jun 14, 2018 at 3:33 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
> Cody,
>
> Where is that called in the driver? The only call I see from Subscribe is to
> load the offset from checkpoint.
>
> Get Outlook for Android
>
> ________________________________
> From: Cody Koeninger <c...@koeninger.org>
> Sent: Thursday, June 14, 2018 4:24:58 PM
>
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The code that loads offsets from kafka is in e.g.
> org.apache.kafka.clients.consumer, it's not in spark.
>
> On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>> Cody,
>>
>> Can you point me to the code that loads offsets? As far as I can see with
>> Spark 2.1, the only offset load is from checkpoint.
>>
>> Thank you!
>>
>> Bryan
>>
>> Get Outlook for Android
>>
>> ________________________________
>> From: Cody Koeninger <c...@koeninger.org>
>> Sent: Thursday, June 14, 2018 4:00:31 PM
>> To: Bryan Jeffrey
>> Cc: user
>> Subject: Re: Kafka Offset Storage: Fetching Offsets
>>
>> The expectation is that you shouldn't have to manually load offsets
>> from kafka, because the underlying kafka consumer on the driver will
>> start at the offsets associated with the given group id.
>>
>> That's the behavior I see with this example:
>>
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>>
>> What does bin/kafka-consumer-groups.sh show for your group id?
>>
>> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
>> wrote:
>>> Hello.
>>>
>>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>>> on
>>> the documentation
>>>
>>>
>>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>>> it appears that you can now use Kafka itself to store offsets.
>>>
>>> I've setup a  simple Kafka DStream:
>>> val kafkaParameters = Map[String, String](
>>>   "metadata.broker.list" -> brokers,
>>>   "auto.offset.reset" -> "latest",
>>>   "enable.auto.commit" -> false.toString,
>>>   "key.deserializer" ->
>>>
>>>
>>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>>   "partition.assignment.strategy" ->
>>>
>>>
>>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>>   "group.id" -> applicationName
>>> )
>>>
>>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>>> DecodedData](topics.toSeq, kafkaParameters)
>>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>>
>>>
>>> I then commit the offsets:
>>>
>>> var offsets: Array[OffsetRange] = Array()
>>> stream.foreachRDD(rdd => {
>>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>>> })
>>>
>>> // Future: Move this after we've done processing.
>>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>>
>>> The offsets appear to commit successfully. However, on restart the
>>> streaming
>>> application consistently starts from latest whenever the Spark checkpoint
>>> is
>>> changed.  Drilling into the code it does not appear that re-loading
>>> offset
>>> data is supported in the Spark Streaming Kafka library.  How is this
>>> expected to work?  Is there an example of saving the offsets to Kafka and
>>> then loading them from Kafka?
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey

Reply via email to