HI Niel,

My advice would be to write a structured streaming connector. The new
structured streaming APIs were brought in to handle exactly the issues you
describe

See this blog
<https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html>

There isnt a structured streaming connector as of yet, but you can easily
write one that uses the underlying batch methods to read/write to Kinesis

Have a look at how I wrote my bigquery connector here
<http://github.com/samelamin/spark-bigquery>. Plus the best thing is we get
a new connector to a highly used datasource/sink

Hope that helps

Regards
Sam

On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
neil.v.maheshw...@gmail.com> wrote:

> Thanks for your response Ayan.
>
> This could be an option. One complication I see with that approach is that
> I do not want to miss any records that are between the data we have batched
> to the data store and the checkpoint. I would still need a mechanism for
> recording the sequence number of the last time the data was batched, so I
> could start the streaming application after that sequence number.
>
> A similar approach could be to batch our data periodically, recording the
> last sequence number of the batch. Then, fetch data from Kinesis using the
> low level API to read data from the latest sequence number of the batched
> data up until the sequence number of the latest checkpoint from our spark
> app. I could merge batched dataset and the dataset fetched from Kinesis’s
> lower level API, and use that dataset as an RDD to prep the job.
>
> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> Hi
>
> AFAIK, Kinesis does not provide any mechanism other than check point to
> restart. That makes sense as it makes it so generic.
>
> Question: why cant you warm up your data from a data store? Say every 30
> mins you run a job to aggregate your data to a data store for that hour.
> When you restart the streaming app it would read from dynamo check point,
> but it would also preps an initial rdd from data store?
>
> Best
> Ayan
> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
>> Hello,
>>
>> I am building a Spark streaming application that ingests data from an
>> Amazon Kinesis stream. My application keeps track of the minimum price over
>> a window for groups of similar tickets. When I deploy the application, I
>> would like it to start processing at the start of the previous hours data.
>> This will warm up the state of the application and allow us to deploy our
>> application faster. For example, if I start the application at 3 PM, I
>> would like to process the data retained by Kinesis from 2PM to 3PM, and
>> then continue receiving data going forward. Spark Streaming’s Kinesis
>> receiver, which relies on the Amazon Kinesis Client Library, seems to give
>> me three options for choosing where to read from the stream:
>>
>>    - read from the latest checkpointed sequence number in Dynamo
>>    - start from the oldest record in the stream (TRIM_HORIZON shard
>>    iterator type)
>>    - start from the most recent record in the stream (LATEST shard
>>    iterator type)
>>
>>
>> Do you have any suggestions on how we could start our application at a
>> specific timestamp or sequence number in the Kinesis stream? Some ideas I
>> had were:
>>
>>    - Create a KCL application that fetches the previous hour data and
>>    writes it to HDFS. We can create an RDD from that dataset and initialize
>>    our Spark Streaming job with it. The spark streaming job’s Kinesis 
>> receiver
>>    can have the same name as the initial KCL application, and use that
>>    applications checkpoint as the starting point. We’re writing our spark 
>> jobs
>>    in Python, so this would require launching the java MultiLang daemon, or
>>    writing that portion of the application in Java/Scala.
>>    - Before the Spark streaming application starts, we could fetch a
>>    shard iterator using the AT_TIMESTAMP shard iterator type. We could record
>>    the sequence number of the first record returned by this iterator, and
>>    create an entry in Dynamo for our application for that sequence number. 
>> Our
>>    Kinesis receiver would pick up from this checkpoint. It makes me a little
>>    nervous that we would be faking Kinesis Client Library's protocol by
>>    writing a checkpoint into Dynamo
>>
>>
>> Thanks in advance!
>>
>> Neil
>>
> --
> Best Regards,
> Ayan Guha
>
>
>

Reply via email to