Thanks for the advice Sam. I will look into implementing a structured streaming connector.
> On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote: > > HI Niel, > > My advice would be to write a structured streaming connector. The new > structured streaming APIs were brought in to handle exactly the issues you > describe > > See this blog > > There isnt a structured streaming connector as of yet, but you can easily > write one that uses the underlying batch methods to read/write to Kinesis > > Have a look at how I wrote my bigquery connector here. Plus the best thing is > we get a new connector to a highly used datasource/sink > > Hope that helps > > Regards > Sam > >> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari >> <neil.v.maheshw...@gmail.com> wrote: >> Thanks for your response Ayan. >> >> This could be an option. One complication I see with that approach is that I >> do not want to miss any records that are between the data we have batched to >> the data store and the checkpoint. I would still need a mechanism for >> recording the sequence number of the last time the data was batched, so I >> could start the streaming application after that sequence number. >> >> A similar approach could be to batch our data periodically, recording the >> last sequence number of the batch. Then, fetch data from Kinesis using the >> low level API to read data from the latest sequence number of the batched >> data up until the sequence number of the latest checkpoint from our spark >> app. I could merge batched dataset and the dataset fetched from Kinesis’s >> lower level API, and use that dataset as an RDD to prep the job. >> >>> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote: >>> >>> Hi >>> >>> AFAIK, Kinesis does not provide any mechanism other than check point to >>> restart. That makes sense as it makes it so generic. >>> >>> Question: why cant you warm up your data from a data store? Say every 30 >>> mins you run a job to aggregate your data to a data store for that hour. >>> When you restart the streaming app it would read from dynamo check point, >>> but it would also preps an initial rdd from data store? >>> >>> Best >>> Ayan >>>> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari >>>> <neil.v.maheshw...@gmail.com> wrote: >>>> Hello, >>>> >>>> I am building a Spark streaming application that ingests data from an >>>> Amazon Kinesis stream. My application keeps track of the minimum price >>>> over a window for groups of similar tickets. When I deploy the >>>> application, I would like it to start processing at the start of the >>>> previous hours data. This will warm up the state of the application and >>>> allow us to deploy our application faster. For example, if I start the >>>> application at 3 PM, I would like to process the data retained by Kinesis >>>> from 2PM to 3PM, and then continue receiving data going forward. Spark >>>> Streaming’s Kinesis receiver, which relies on the Amazon Kinesis Client >>>> Library, seems to give me three options for choosing where to read from >>>> the stream: >>>> read from the latest checkpointed sequence number in Dynamo >>>> start from the oldest record in the stream (TRIM_HORIZON shard iterator >>>> type) >>>> start from the most recent record in the stream (LATEST shard iterator >>>> type) >>>> >>>> Do you have any suggestions on how we could start our application at a >>>> specific timestamp or sequence number in the Kinesis stream? Some ideas I >>>> had were: >>>> Create a KCL application that fetches the previous hour data and writes it >>>> to HDFS. We can create an RDD from that dataset and initialize our Spark >>>> Streaming job with it. The spark streaming job’s Kinesis receiver can have >>>> the same name as the initial KCL application, and use that applications >>>> checkpoint as the starting point. We’re writing our spark jobs in Python, >>>> so this would require launching the java MultiLang daemon, or writing that >>>> portion of the application in Java/Scala. >>>> Before the Spark streaming application starts, we could fetch a shard >>>> iterator using the AT_TIMESTAMP shard iterator type. We could record the >>>> sequence number of the first record returned by this iterator, and create >>>> an entry in Dynamo for our application for that sequence number. Our >>>> Kinesis receiver would pick up from this checkpoint. It makes me a little >>>> nervous that we would be faking Kinesis Client Library's protocol by >>>> writing a checkpoint into Dynamo >>>> >>>> Thanks in advance! >>>> >>>> Neil >>> >>> -- >>> Best Regards, >>> Ayan Guha >> >