Hey Neil

No worries! Happy to help you write it if you want, just link me to the
repo and we can write it together

Would be fun!


Regards
Sam
On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <neil.v.maheshw...@gmail.com>
wrote:

> Thanks for the advice Sam. I will look into implementing a structured
> streaming connector.
>
> On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> HI Niel,
>
> My advice would be to write a structured streaming connector. The new
> structured streaming APIs were brought in to handle exactly the issues you
> describe
>
> See this blog
> <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html>
>
> There isnt a structured streaming connector as of yet, but you can easily
> write one that uses the underlying batch methods to read/write to Kinesis
>
> Have a look at how I wrote my bigquery connector here
> <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we
> get a new connector to a highly used datasource/sink
>
> Hope that helps
>
> Regards
> Sam
>
> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
> Thanks for your response Ayan.
>
> This could be an option. One complication I see with that approach is that
> I do not want to miss any records that are between the data we have batched
> to the data store and the checkpoint. I would still need a mechanism for
> recording the sequence number of the last time the data was batched, so I
> could start the streaming application after that sequence number.
>
> A similar approach could be to batch our data periodically, recording the
> last sequence number of the batch. Then, fetch data from Kinesis using the
> low level API to read data from the latest sequence number of the batched
> data up until the sequence number of the latest checkpoint from our spark
> app. I could merge batched dataset and the dataset fetched from Kinesis’s
> lower level API, and use that dataset as an RDD to prep the job.
>
> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> Hi
>
> AFAIK, Kinesis does not provide any mechanism other than check point to
> restart. That makes sense as it makes it so generic.
>
> Question: why cant you warm up your data from a data store? Say every 30
> mins you run a job to aggregate your data to a data store for that hour.
> When you restart the streaming app it would read from dynamo check point,
> but it would also preps an initial rdd from data store?
>
> Best
> Ayan
> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
> Hello,
>
> I am building a Spark streaming application that ingests data from an
> Amazon Kinesis stream. My application keeps track of the minimum price over
> a window for groups of similar tickets. When I deploy the application, I
> would like it to start processing at the start of the previous hours data.
> This will warm up the state of the application and allow us to deploy our
> application faster. For example, if I start the application at 3 PM, I
> would like to process the data retained by Kinesis from 2PM to 3PM, and
> then continue receiving data going forward. Spark Streaming’s Kinesis
> receiver, which relies on the Amazon Kinesis Client Library, seems to give
> me three options for choosing where to read from the stream:
>
>    - read from the latest checkpointed sequence number in Dynamo
>    - start from the oldest record in the stream (TRIM_HORIZON shard
>    iterator type)
>    - start from the most recent record in the stream (LATEST shard
>    iterator type)
>
>
> Do you have any suggestions on how we could start our application at a
> specific timestamp or sequence number in the Kinesis stream? Some ideas I
> had were:
>
>    - Create a KCL application that fetches the previous hour data and
>    writes it to HDFS. We can create an RDD from that dataset and initialize
>    our Spark Streaming job with it. The spark streaming job’s Kinesis receiver
>    can have the same name as the initial KCL application, and use that
>    applications checkpoint as the starting point. We’re writing our spark jobs
>    in Python, so this would require launching the java MultiLang daemon, or
>    writing that portion of the application in Java/Scala.
>    - Before the Spark streaming application starts, we could fetch a
>    shard iterator using the AT_TIMESTAMP shard iterator type. We could record
>    the sequence number of the first record returned by this iterator, and
>    create an entry in Dynamo for our application for that sequence number. Our
>    Kinesis receiver would pick up from this checkpoint. It makes me a little
>    nervous that we would be faking Kinesis Client Library's protocol by
>    writing a checkpoint into Dynamo
>
>
> Thanks in advance!
>
> Neil
>
> --
> Best Regards,
> Ayan Guha
>
>
>
>

Reply via email to