Hello.

I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
on the documentation (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
it appears that you can now use Kafka itself to store offsets.

I've setup a  simple Kafka DStream:
val kafkaParameters = Map[String, String](
  "metadata.broker.list" -> brokers,
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> false.toString,
  "key.deserializer" ->
classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
  "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
  "partition.assignment.strategy" ->
classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
  "bootstrap.servers" -> brokersToUse.mkString(","),
  "group.id" -> applicationName
)

val consumerStrategy = ConsumerStrategies.Subscribe[String,
DecodedData](topics.toSeq, kafkaParameters)
KafkaUtils.createDirectStream(ssc, locationStrategy =
LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)


I then commit the offsets:

var offsets: Array[OffsetRange] = Array()
stream.foreachRDD(rdd => {
  offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  logger.info(s"Offsets: ${offsets.mkString("|")}")
})

// Future: Move this after we've done processing.
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)

The offsets appear to commit successfully. However, on restart the
streaming application consistently starts from latest whenever the Spark
checkpoint is changed.  Drilling into the code it does not appear that
re-loading offset data is supported in the Spark Streaming Kafka library.
How is this expected to work?  Is there an example of saving the offsets to
Kafka and then loading them from Kafka?

Regards,

Bryan Jeffrey

Reply via email to