Re: Zero Data Loss in Spark with Kafka

2016-10-26 Thread Cody Koeninger
Honestly, I would stay far away from saving offsets in Zookeeper if at
all possible. It's better to store them alongside your results.

On Wed, Oct 26, 2016 at 10:44 AM, Sunita Arvind  wrote:
> This is enough to get it to work:
>
> df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet",
> SaveMode.Overwrite)
>
> And tests so far (in local env) seem good with the edits. Yet to test on the
> cluster. Cody, appreciate your thoughts on the edits.
>
> Just want to make sure I am not doing an overkill or overseeing a potential
> issue.
>
> regards
>
> Sunita
>
>
> On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind 
> wrote:
>>
>> The error in the file I just shared is here:
>>
>> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
>> partition._2(0);  --> this was just partition and hence there was an error
>>
>> fetching the offset.
>>
>> Still testing. Somehow Cody, your code never lead to file already exists
>> sort of errors (I am saving the output of the dstream
>> as parquet file, after converting it to a dataframe. The batch interval
>> will be 2 hrs)
>>
>> The code in the main is here:
>>
>>   val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>> conf.getString("groupId"), conf.getString("topics"))
>>val storedOffsets = offsetsStore.readOffsets()
>>  LogHandler.log.info("Fetched the offset from zookeeper")
>>
>>  val kafkaArr =  storedOffsets match {
>>case None =>
>>  // start from the initial offsets
>>
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>> kafkaProps, Set(topics))
>>
>>case Some(fromOffsets) =>
>>  // start from previously saved offsets
>>  val messageHandler: MessageAndMetadata[String, Array[Byte]] =>
>> (String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) =>
>> (mmd.key, mmd.message)
>>
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
>> Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
>>
>>  //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage,
>> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
>>  }
>>
>>  kafkaArr.foreachRDD{ (rdd,time) =>
>>
>> val schema =
>> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
>> val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r =>
>> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
>> }
>> val df = sql.createDataFrame(ardd,schema)
>>LogHandler.log.info("Created dataframe")
>>val offsetSaved =
>> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
>>LogHandler.log.info("Saved offset to Zookeeper")
>>df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
>>LogHandler.log.info("Created the parquet file")
>>  }
>>
>> Thanks
>>
>> Sunita
>>
>>
>>
>>
>>
>> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind 
>> wrote:
>>>
>>> Attached is the edited code. Am I heading in right direction? Also, I am
>>> missing something due to which, it seems to work well as long as the
>>> application is running and the files are created right. But as soon as I
>>> restart the application, it goes back to fromOffset as 0. Any thoughts?
>>>
>>> regards
>>> Sunita
>>>
>>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind 
>>> wrote:

 Thanks for confirming Cody.
 To get to use the library, I had to do:

 val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
 "/consumers/topics/"+ topics + "/0")

 It worked well. However, I had to specify the partitionId in the zkPath.
 If I want the library to pick all the partitions for a topic, without me
 specifying the path, is it possible out of the box or I need to tweak?

 regards
 Sunita


 On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger 
 wrote:
>
> You are correct that you shouldn't have to worry about broker id.
>
> I'm honestly not sure specifically what else you are asking at this
> point.
>
> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
> wrote:
> > Just re-read the kafka architecture. Something that slipped my mind
> > is, it
> > is leader based. So topic/partitionId pair will be same on all the
> > brokers.
> > So we do not need to consider brokerid while storing offsets. Still
> > exploring rest of the items.
> > regards
> > Sunita
> >
> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind
> > 
> > wrote:
> >>
> >> Hello Experts,
> >>
> >> I am trying to use the saving to ZK design. Just saw Sudhir's
> >> comments
> >> that it is old approach. Any reasons for that? Any issues observed
> >> with
> >> saving to ZK. 

Re: Zero Data Loss in Spark with Kafka

2016-10-26 Thread Sunita Arvind
This is enough to get it to work:

df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet",
SaveMode.Overwrite)

And tests so far (in local env) seem good with the edits. Yet to test
on the cluster. Cody, appreciate your thoughts on the edits.

Just want to make sure I am not doing an overkill or overseeing a
potential issue.

regards

Sunita


On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind 
wrote:

> The error in the file I just shared is here:
>
> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + 
> partition._2(0);  --> this was just partition and hence there was an error
>
> fetching the offset.
>
> Still testing. Somehow Cody, your code never lead to file already exists sort 
> of errors (I am saving the output of the dstream
> as parquet file, after converting it to a dataframe. The batch interval will 
> be 2 hrs)
>
> The code in the main is here:
>
>   val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> conf.getString("groupId"), conf.getString("topics"))
>val storedOffsets = offsetsStore.readOffsets()
>  LogHandler.log.info("Fetched the offset from zookeeper")
>
>  val kafkaArr =  storedOffsets match {
>case None =>
>  // start from the initial offsets
>  
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaProps, Set(topics))
>
>case Some(fromOffsets) =>
>  // start from previously saved offsets
>  val messageHandler: MessageAndMetadata[String, Array[Byte]] => (String, 
> Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.key, 
> mmd.message)
>  
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
>  Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
>
>  //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage, 
> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
>  }
>
>  kafkaArr.foreachRDD{ (rdd,time) =>
>
> val schema = 
> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
> val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r => 
> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
> }
> val df = sql.createDataFrame(ardd,schema)
>LogHandler.log.info("Created dataframe")
>val offsetSaved =  
> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
>LogHandler.log.info("Saved offset to Zookeeper")
>df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
>LogHandler.log.info("Created the parquet file")
>  }
>
> Thanks
>
> Sunita
>
>
>
>
>
> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind 
> wrote:
>
>> Attached is the edited code. Am I heading in right direction? Also, I am
>> missing something due to which, it seems to work well as long as the
>> application is running and the files are created right. But as soon as I
>> restart the application, it goes back to fromOffset as 0. Any thoughts?
>>
>> regards
>> Sunita
>>
>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind 
>> wrote:
>>
>>> Thanks for confirming Cody.
>>> To get to use the library, I had to do:
>>>
>>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
>>> "/consumers/topics/"+ topics + "/0")
>>>
>>> It worked well. However, I had to specify the partitionId in the
>>> zkPath.  If I want the library to pick all the partitions for a topic,
>>> without me specifying the path, is it possible out of the box or I need to
>>> tweak?
>>>
>>> regards
>>> Sunita
>>>
>>>
>>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger 
>>> wrote:
>>>
 You are correct that you shouldn't have to worry about broker id.

 I'm honestly not sure specifically what else you are asking at this
 point.

 On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
 wrote:
 > Just re-read the kafka architecture. Something that slipped my mind
 is, it
 > is leader based. So topic/partitionId pair will be same on all the
 brokers.
 > So we do not need to consider brokerid while storing offsets. Still
 > exploring rest of the items.
 > regards
 > Sunita
 >
 > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <
 sunitarv...@gmail.com>
 > wrote:
 >>
 >> Hello Experts,
 >>
 >> I am trying to use the saving to ZK design. Just saw Sudhir's
 comments
 >> that it is old approach. Any reasons for that? Any issues observed
 with
 >> saving to ZK. The way we are planning to use it is:
 >> 1. Following
 >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
 g-zero-data-loss.html
 >> 2. Saving to the same file with offsetRange as a part of the file.
 We hope
 >> that there are no partial writes/ overwriting is possible and
 offsetRanges
 >>
 >> 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
The error in the file I just shared is here:

val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
partition._2(0);  --> this was just partition and hence there was an
error

fetching the offset.

Still testing. Somehow Cody, your code never lead to file already
exists sort of errors (I am saving the output of the dstream
as parquet file, after converting it to a dataframe. The batch
interval will be 2 hrs)

The code in the main is here:

  val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"),
conf.getString("groupId"), conf.getString("topics"))
   val storedOffsets = offsetsStore.readOffsets()
 LogHandler.log.info("Fetched the offset from zookeeper")

 val kafkaArr =  storedOffsets match {
   case None =>
 // start from the initial offsets
 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaProps, Set(topics))

   case Some(fromOffsets) =>
 // start from previously saved offsets
 val messageHandler: MessageAndMetadata[String, Array[Byte]] =>
(String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]])
=> (mmd.key, mmd.message)
 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)

 //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage,
(String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
 }

 kafkaArr.foreachRDD{ (rdd,time) =>

val schema =
SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r =>
Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
}
val df = sql.createDataFrame(ardd,schema)
   LogHandler.log.info("Created dataframe")
   val offsetSaved =
offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
   LogHandler.log.info("Saved offset to Zookeeper")
   df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
   LogHandler.log.info("Created the parquet file")
 }

Thanks

Sunita





On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind 
wrote:

> Attached is the edited code. Am I heading in right direction? Also, I am
> missing something due to which, it seems to work well as long as the
> application is running and the files are created right. But as soon as I
> restart the application, it goes back to fromOffset as 0. Any thoughts?
>
> regards
> Sunita
>
> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind 
> wrote:
>
>> Thanks for confirming Cody.
>> To get to use the library, I had to do:
>>
>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
>> "/consumers/topics/"+ topics + "/0")
>>
>> It worked well. However, I had to specify the partitionId in the zkPath.
>> If I want the library to pick all the partitions for a topic, without me
>> specifying the path, is it possible out of the box or I need to tweak?
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger 
>> wrote:
>>
>>> You are correct that you shouldn't have to worry about broker id.
>>>
>>> I'm honestly not sure specifically what else you are asking at this
>>> point.
>>>
>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
>>> wrote:
>>> > Just re-read the kafka architecture. Something that slipped my mind
>>> is, it
>>> > is leader based. So topic/partitionId pair will be same on all the
>>> brokers.
>>> > So we do not need to consider brokerid while storing offsets. Still
>>> > exploring rest of the items.
>>> > regards
>>> > Sunita
>>> >
>>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind >> >
>>> > wrote:
>>> >>
>>> >> Hello Experts,
>>> >>
>>> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>>> >> that it is old approach. Any reasons for that? Any issues observed
>>> with
>>> >> saving to ZK. The way we are planning to use it is:
>>> >> 1. Following
>>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>>> g-zero-data-loss.html
>>> >> 2. Saving to the same file with offsetRange as a part of the file. We
>>> hope
>>> >> that there are no partial writes/ overwriting is possible and
>>> offsetRanges
>>> >>
>>> >> However I have below doubts which I couldnt figure out from the code
>>> here
>>> >> -
>>> >> https://github.com/ippontech/spark-kafka-source/blob/master/
>>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>>> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
>>> and
>>> >> multiple partitions/topic.
>>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>>> using
>>> >> the ZookeeperOffsetStore as is and it required me to specify the
>>> >> partitionId:
>>> >>
>>> >> val offsetsStore = new 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Attached is the edited code. Am I heading in right direction? Also, I am
missing something due to which, it seems to work well as long as the
application is running and the files are created right. But as soon as I
restart the application, it goes back to fromOffset as 0. Any thoughts?

regards
Sunita

On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind 
wrote:

> Thanks for confirming Cody.
> To get to use the library, I had to do:
>
> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> "/consumers/topics/"+ topics + "/0")
>
> It worked well. However, I had to specify the partitionId in the zkPath.
> If I want the library to pick all the partitions for a topic, without me
> specifying the path, is it possible out of the box or I need to tweak?
>
> regards
> Sunita
>
>
> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger 
> wrote:
>
>> You are correct that you shouldn't have to worry about broker id.
>>
>> I'm honestly not sure specifically what else you are asking at this point.
>>
>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
>> wrote:
>> > Just re-read the kafka architecture. Something that slipped my mind is,
>> it
>> > is leader based. So topic/partitionId pair will be same on all the
>> brokers.
>> > So we do not need to consider brokerid while storing offsets. Still
>> > exploring rest of the items.
>> > regards
>> > Sunita
>> >
>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind 
>> > wrote:
>> >>
>> >> Hello Experts,
>> >>
>> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>> >> that it is old approach. Any reasons for that? Any issues observed with
>> >> saving to ZK. The way we are planning to use it is:
>> >> 1. Following
>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>> >> 2. Saving to the same file with offsetRange as a part of the file. We
>> hope
>> >> that there are no partial writes/ overwriting is possible and
>> offsetRanges
>> >>
>> >> However I have below doubts which I couldnt figure out from the code
>> here
>> >> -
>> >> https://github.com/ippontech/spark-kafka-source/blob/master/
>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
>> and
>> >> multiple partitions/topic.
>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>> using
>> >> the ZookeeperOffsetStore as is and it required me to specify the
>> >> partitionId:
>> >>
>> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.get
>> String("zkHosts"),
>> >> "/consumers/topics/"+ topics + "/0")
>> >>
>> >> For our usecases it is too limiting to include partitionId in the path.
>> >> To get it to work by automatically detecting the existing partitions
>> for a
>> >> given topic, I changed it as below (inspired from
>> >> http://www.programcreek.com/java-api-examples/index.php?api=
>> kafka.utils.ZKGroupTopicDirs):
>> >>
>> >> /**
>> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>> >>   * groupID consumer group to get offsets for
>> >>   * topic topic to get offsets for
>> >>   * return - mapping of (topic and) partition to offset
>> >>   */
>> >> private def getOffsets(groupID :String, topic: String):Option[String]
>> = {
>> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>> >>   val topicSeq = List(topic).toSeq
>> >>  // try {
>> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient,
>> topicSeq)
>> >> var partition:Object=null
>> >> for (partition <- partitions) {
>> >>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
>> "/" +
>> >> partition;
>> >>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkCl
>> ient,
>> >> partitionOffsetPath)._1;
>> >>   val offset:Long = if(maybeOffset.isDefined)
>> maybeOffset.get.toLong
>> >> else 0L;
>> >>   val topicAndPartition:TopicAndPartition  = new
>> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>> >>   offsets.put(topicAndPartition, offset)
>> >> }
>> >>   //}
>> >> Option(offsets.mkString(","))
>> >> }
>> >>
>> >> // Read the previously saved offsets from Zookeeper
>> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>> >>
>> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
>> >>
>> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>> >>   val start = System.currentTimeMillis()
>> >>   offsetsRangesStrOpt match {
>> >> case Some(offsetsRangesStr) =>
>> >>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>> >>
>> >>   val offsets = offsetsRangesStr.split(",")
>> >> .map(s => s.split(":"))
>> >> .map { case Array(partitionStr, offsetStr) =>
>> 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Thanks for confirming Cody.
To get to use the library, I had to do:

val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+
topics + "/0")

It worked well. However, I had to specify the partitionId in the zkPath.
If I want the library to pick all the partitions for a topic, without me
specifying the path, is it possible out of the box or I need to tweak?

regards
Sunita


On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger  wrote:

> You are correct that you shouldn't have to worry about broker id.
>
> I'm honestly not sure specifically what else you are asking at this point.
>
> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
> wrote:
> > Just re-read the kafka architecture. Something that slipped my mind is,
> it
> > is leader based. So topic/partitionId pair will be same on all the
> brokers.
> > So we do not need to consider brokerid while storing offsets. Still
> > exploring rest of the items.
> > regards
> > Sunita
> >
> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind 
> > wrote:
> >>
> >> Hello Experts,
> >>
> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
> >> that it is old approach. Any reasons for that? Any issues observed with
> >> saving to ZK. The way we are planning to use it is:
> >> 1. Following
> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
> >> 2. Saving to the same file with offsetRange as a part of the file. We
> hope
> >> that there are no partial writes/ overwriting is possible and
> offsetRanges
> >>
> >> However I have below doubts which I couldnt figure out from the code
> here
> >> -
> >> https://github.com/ippontech/spark-kafka-source/blob/
> master/src/main/scala/com/ippontech/kafka/stores/
> ZooKeeperOffsetsStore.scala
> >> 1. The brokerId is not part of the OffsetRange. How will just the
> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
> and
> >> multiple partitions/topic.
> >> 2. Do we have to specify zkPath to include the partitionid. I tried
> using
> >> the ZookeeperOffsetStore as is and it required me to specify the
> >> partitionId:
> >>
> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
> >> "/consumers/topics/"+ topics + "/0")
> >>
> >> For our usecases it is too limiting to include partitionId in the path.
> >> To get it to work by automatically detecting the existing partitions
> for a
> >> given topic, I changed it as below (inspired from
> >> http://www.programcreek.com/java-api-examples/index.php?
> api=kafka.utils.ZKGroupTopicDirs):
> >>
> >> /**
> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
> >>   * groupID consumer group to get offsets for
> >>   * topic topic to get offsets for
> >>   * return - mapping of (topic and) partition to offset
> >>   */
> >> private def getOffsets(groupID :String, topic: String):Option[String] =
> {
> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
> >>   val topicSeq = List(topic).toSeq
> >>  // try {
> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
> >> var partition:Object=null
> >> for (partition <- partitions) {
> >>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
> "/" +
> >> partition;
> >>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(
> zkClient,
> >> partitionOffsetPath)._1;
> >>   val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong
> >> else 0L;
> >>   val topicAndPartition:TopicAndPartition  = new
> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
> >>   offsets.put(topicAndPartition, offset)
> >> }
> >>   //}
> >> Option(offsets.mkString(","))
> >> }
> >>
> >> // Read the previously saved offsets from Zookeeper
> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
> >>
> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
> >>
> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
> >>   val start = System.currentTimeMillis()
> >>   offsetsRangesStrOpt match {
> >> case Some(offsetsRangesStr) =>
> >>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
> >>
> >>   val offsets = offsetsRangesStr.split(",")
> >> .map(s => s.split(":"))
> >> .map { case Array(partitionStr, offsetStr) =>
> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
> >> .toMap
> >>
> >>   LogHandler.log.info("Done reading offsets from ZooKeeper. Took "
> +
> >> (System.currentTimeMillis() - start))
> >>
> >>   Some(offsets)
> >> case None =>
> >>   LogHandler.log.info("No offsets found in ZooKeeper. Took " +
> >> (System.currentTimeMillis() - start))
> >>   None
> >>   }
> >>
> >> }
> >>
> >> However, I am concerned if the saveOffsets will work well with this
> >> approach. 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Cody Koeninger
You are correct that you shouldn't have to worry about broker id.

I'm honestly not sure specifically what else you are asking at this point.

On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind  wrote:
> Just re-read the kafka architecture. Something that slipped my mind is, it
> is leader based. So topic/partitionId pair will be same on all the brokers.
> So we do not need to consider brokerid while storing offsets. Still
> exploring rest of the items.
> regards
> Sunita
>
> On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind 
> wrote:
>>
>> Hello Experts,
>>
>> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>> that it is old approach. Any reasons for that? Any issues observed with
>> saving to ZK. The way we are planning to use it is:
>> 1. Following
>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
>> 2. Saving to the same file with offsetRange as a part of the file. We hope
>> that there are no partial writes/ overwriting is possible and offsetRanges
>>
>> However I have below doubts which I couldnt figure out from the code here
>> -
>> https://github.com/ippontech/spark-kafka-source/blob/master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>> 1. The brokerId is not part of the OffsetRange. How will just the
>> partitionId:FromOffset stay unique in a cluster with multiple brokers and
>> multiple partitions/topic.
>> 2. Do we have to specify zkPath to include the partitionid. I tried using
>> the ZookeeperOffsetStore as is and it required me to specify the
>> partitionId:
>>
>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>> "/consumers/topics/"+ topics + "/0")
>>
>> For our usecases it is too limiting to include partitionId in the path.
>> To get it to work by automatically detecting the existing partitions for a
>> given topic, I changed it as below (inspired from
>> http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):
>>
>> /**
>>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>>   * groupID consumer group to get offsets for
>>   * topic topic to get offsets for
>>   * return - mapping of (topic and) partition to offset
>>   */
>> private def getOffsets(groupID :String, topic: String):Option[String] = {
>>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>>   val topicSeq = List(topic).toSeq
>>  // try {
>> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
>> var partition:Object=null
>> for (partition <- partitions) {
>>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
>> partition;
>>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkClient,
>> partitionOffsetPath)._1;
>>   val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong
>> else 0L;
>>   val topicAndPartition:TopicAndPartition  = new
>> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>>   offsets.put(topicAndPartition, offset)
>> }
>>   //}
>> Option(offsets.mkString(","))
>> }
>>
>> // Read the previously saved offsets from Zookeeper
>> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>>
>>   LogHandler.log.info("Reading offsets from ZooKeeper")
>>
>>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>>   val start = System.currentTimeMillis()
>>   offsetsRangesStrOpt match {
>> case Some(offsetsRangesStr) =>
>>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>>
>>   val offsets = offsetsRangesStr.split(",")
>> .map(s => s.split(":"))
>> .map { case Array(partitionStr, offsetStr) =>
>> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
>> .toMap
>>
>>   LogHandler.log.info("Done reading offsets from ZooKeeper. Took " +
>> (System.currentTimeMillis() - start))
>>
>>   Some(offsets)
>> case None =>
>>   LogHandler.log.info("No offsets found in ZooKeeper. Took " +
>> (System.currentTimeMillis() - start))
>>   None
>>   }
>>
>> }
>>
>> However, I am concerned if the saveOffsets will work well with this
>> approach. Thats when I realized we are not considering brokerIds which
>> storing offsets and probably the OffsetRanges does not have it either. It
>> can only provide Topic, partition, from and until offsets.
>>
>> I am probably missing something very basic. Probably the library works
>> well by itself. Can someone/ Cody explain?
>>
>> Cody, Thanks a lot for sharing your work.
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger 
>> wrote:
>>>
>>> See
>>> https://github.com/koeninger/kafka-exactly-once
>>>
>>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed"
>>>  wrote:

 Hi Experts,

 I am looking for some information on how to acheive zero data 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Just re-read the kafka architecture. Something that slipped my mind is, it
is leader based. So topic/partitionId pair will be same on all the brokers.
So we do not need to consider brokerid while storing offsets. Still
exploring rest of the items.
regards
Sunita

On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind 
wrote:

> Hello Experts,
>
> I am trying to use the saving to ZK design. Just saw Sudhir's comments
> that it is old approach. Any reasons for that? Any issues observed with
> saving to ZK. The way we are planning to use it is:
> 1. Following http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
> g-zero-data-loss.html
> 2. Saving to the same file with offsetRange as a part of the file. We hope
> that there are no partial writes/ overwriting is possible and offsetRanges
>
> However I have below doubts which I couldnt figure out from the code here
> - https://github.com/ippontech/spark-kafka-source/blob/master/
> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
> 1. The brokerId is not part of the OffsetRange. How will just the
> partitionId:FromOffset stay unique in a cluster with multiple brokers and
> multiple partitions/topic.
> 2. Do we have to specify zkPath to include the partitionid. I tried using
> the ZookeeperOffsetStore as is and it required me to specify the
> partitionId:
>
> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> "/consumers/topics/"+ topics + "/0")
>
> For our usecases it is too limiting to include partitionId in the path.
> To get it to work by automatically detecting the existing partitions for a 
> given topic, I changed it as below (inspired from 
> http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):
>
> /**
>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>   * groupID consumer group to get offsets for
>   * topic topic to get offsets for
>   * return - mapping of (topic and) partition to offset
>   */
> private def getOffsets(groupID :String, topic: String):Option[String] = {
>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>   val topicSeq = List(topic).toSeq
>  // try {
> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
> var partition:Object=null
> for (partition <- partitions) {
>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + 
> partition;
>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkClient, 
> partitionOffsetPath)._1;
>   val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong else 
> 0L;
>   val topicAndPartition:TopicAndPartition  = new TopicAndPartition(topic, 
> Integer.parseInt(partition.toString));
>   offsets.put(topicAndPartition, offset)
> }
>   //}
> Option(offsets.mkString(","))
> }
>
> // Read the previously saved offsets from Zookeeper
> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>
>   LogHandler.log.info("Reading offsets from ZooKeeper")
>
>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>   val start = System.currentTimeMillis()
>   offsetsRangesStrOpt match {
> case Some(offsetsRangesStr) =>
>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>
>   val offsets = offsetsRangesStr.split(",")
> .map(s => s.split(":"))
> .map { case Array(partitionStr, offsetStr) => 
> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
> .toMap
>
>   LogHandler.log.info("Done reading offsets from ZooKeeper. Took " + 
> (System.currentTimeMillis() - start))
>
>   Some(offsets)
> case None =>
>   LogHandler.log.info("No offsets found in ZooKeeper. Took " + 
> (System.currentTimeMillis() - start))
>   None
>   }
>
> }
>
> However, I am concerned if the saveOffsets will work well with this
> approach. Thats when I realized we are not considering brokerIds which
> storing offsets and probably the OffsetRanges does not have it either. It
> can only provide Topic, partition, from and until offsets.
>
> I am probably missing something very basic. Probably the library works
> well by itself. Can someone/ Cody explain?
>
> Cody, Thanks a lot for sharing your work.
>
> regards
> Sunita
>
>
> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger 
> wrote:
>
>> See
>> https://github.com/koeninger/kafka-exactly-once
>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" 
>> wrote:
>>
>>> Hi Experts,
>>>
>>> I am looking for some information on how to acheive zero data loss while
>>> working with kafka and Spark. I have searched online and blogs have
>>> different answer. Please let me know if anyone has idea on this.
>>>
>>> Blog 1:
>>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>>>
>>>
>>> Blog2:
>>> 

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Hello Experts,

I am trying to use the saving to ZK design. Just saw Sudhir's comments that
it is old approach. Any reasons for that? Any issues observed with saving
to ZK. The way we are planning to use it is:
1. Following http://aseigneurin.github.io/2016/05/07/spark-kafka-
achieving-zero-data-loss.html
2. Saving to the same file with offsetRange as a part of the file. We hope
that there are no partial writes/ overwriting is possible and offsetRanges

However I have below doubts which I couldnt figure out from the code here -
https://github.com/ippontech/spark-kafka-source/blob/
master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
1. The brokerId is not part of the OffsetRange. How will just the
partitionId:FromOffset stay unique in a cluster with multiple brokers and
multiple partitions/topic.
2. Do we have to specify zkPath to include the partitionid. I tried using
the ZookeeperOffsetStore as is and it required me to specify the
partitionId:

val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+
topics + "/0")

For our usecases it is too limiting to include partitionId in the path.
To get it to work by automatically detecting the existing partitions
for a given topic, I changed it as below (inspired from
http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):

/**
  * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
  * groupID consumer group to get offsets for
  * topic topic to get offsets for
  * return - mapping of (topic and) partition to offset
  */
private def getOffsets(groupID :String, topic: String):Option[String] = {
  val topicDirs = new ZKGroupTopicDirs(groupID, topic)
  val offsets = new mutable.HashMap[TopicAndPartition,Long]()
  val topicSeq = List(topic).toSeq
 // try {
val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
var partition:Object=null
for (partition <- partitions) {
  val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
"/" + partition;
  val maybeOffset:Option[String] =
ZkUtils.readDataMaybeNull(zkClient, partitionOffsetPath)._1;
  val offset:Long = if(maybeOffset.isDefined)
maybeOffset.get.toLong else 0L;
  val topicAndPartition:TopicAndPartition  = new
TopicAndPartition(topic, Integer.parseInt(partition.toString));
  offsets.put(topicAndPartition, offset)
}
  //}
Option(offsets.mkString(","))
}

// Read the previously saved offsets from Zookeeper
override def readOffsets: Option[Map[TopicAndPartition, Long]] = {

  LogHandler.log.info("Reading offsets from ZooKeeper")

  val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
  val start = System.currentTimeMillis()
  offsetsRangesStrOpt match {
case Some(offsetsRangesStr) =>
  LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")

  val offsets = offsetsRangesStr.split(",")
.map(s => s.split(":"))
.map { case Array(partitionStr, offsetStr) =>
(TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
.toMap

  LogHandler.log.info("Done reading offsets from ZooKeeper. Took "
+ (System.currentTimeMillis() - start))

  Some(offsets)
case None =>
  LogHandler.log.info("No offsets found in ZooKeeper. Took " +
(System.currentTimeMillis() - start))
  None
  }

}

However, I am concerned if the saveOffsets will work well with this
approach. Thats when I realized we are not considering brokerIds which
storing offsets and probably the OffsetRanges does not have it either. It
can only provide Topic, partition, from and until offsets.

I am probably missing something very basic. Probably the library works well
by itself. Can someone/ Cody explain?

Cody, Thanks a lot for sharing your work.

regards
Sunita

On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger  wrote:

> See
> https://github.com/koeninger/kafka-exactly-once
> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" 
> wrote:
>
>> Hi Experts,
>>
>> I am looking for some information on how to acheive zero data loss while
>> working with kafka and Spark. I have searched online and blogs have
>> different answer. Please let me know if anyone has idea on this.
>>
>> Blog 1:
>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>>
>>
>> Blog2:
>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>>
>>
>> Blog one simply says configuration change with checkpoint directory and
>> blog 2 give details about on how to save offsets to zoo keeper. can you
>> please help me out with right approach.
>>
>> Thanks,
>> Asmath
>>
>>
>>


Re: Zero Data Loss in Spark with Kafka

2016-08-23 Thread Cody Koeninger
See
https://github.com/koeninger/kafka-exactly-once
On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" 
wrote:

> Hi Experts,
>
> I am looking for some information on how to acheive zero data loss while
> working with kafka and Spark. I have searched online and blogs have
> different answer. Please let me know if anyone has idea on this.
>
> Blog 1:
> https://databricks.com/blog/2015/01/15/improved-driver-
> fault-tolerance-and-zero-data-loss-in-spark-streaming.html
>
>
> Blog2:
> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
>
>
> Blog one simply says configuration change with checkpoint directory and
> blog 2 give details about on how to save offsets to zoo keeper. can you
> please help me out with right approach.
>
> Thanks,
> Asmath
>
>
>


Re: Zero Data Loss in Spark with Kafka

2016-08-23 Thread Sudhir Babu Pothineni
saving offsets to zookeeper is old approach, check-pointing internally
saves the offsets to HDFS/location of checkpointing.

more details here:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Tue, Aug 23, 2016 at 10:30 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi Experts,
>
> I am looking for some information on how to acheive zero data loss while
> working with kafka and Spark. I have searched online and blogs have
> different answer. Please let me know if anyone has idea on this.
>
> Blog 1:
> https://databricks.com/blog/2015/01/15/improved-driver-
> fault-tolerance-and-zero-data-loss-in-spark-streaming.html
>
>
> Blog2:
> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
>
>
> Blog one simply says configuration change with checkpoint directory and
> blog 2 give details about on how to save offsets to zoo keeper. can you
> please help me out with right approach.
>
> Thanks,
> Asmath
>
>
>


Zero Data Loss in Spark with Kafka

2016-08-23 Thread KhajaAsmath Mohammed
Hi Experts,

I am looking for some information on how to acheive zero data loss while
working with kafka and Spark. I have searched online and blogs have
different answer. Please let me know if anyone has idea on this.

Blog 1:
https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html


Blog2:
http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html


Blog one simply says configuration change with checkpoint directory and
blog 2 give details about on how to save offsets to zoo keeper. can you
please help me out with right approach.

Thanks,
Asmath