just doing a bit of research, seems weve been beaten to the punch, theres
already a connector you can use here
<https://github.com/maropu/spark-kinesis-sql-asl/issues/4>

Give it a go and feel free to give the commiter feedback or better yet send
some PRs if it needs them :)

On Sun, Feb 19, 2017 at 9:23 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Hey Neil
>
> No worries! Happy to help you write it if you want, just link me to the
> repo and we can write it together
>
> Would be fun!
>
>
> Regards
> Sam
> On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <neil.v.maheshw...@gmail.com>
> wrote:
>
>> Thanks for the advice Sam. I will look into implementing a structured
>> streaming connector.
>>
>> On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote:
>>
>> HI Niel,
>>
>> My advice would be to write a structured streaming connector. The new
>> structured streaming APIs were brought in to handle exactly the issues you
>> describe
>>
>> See this blog
>> <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html>
>>
>> There isnt a structured streaming connector as of yet, but you can easily
>> write one that uses the underlying batch methods to read/write to Kinesis
>>
>> Have a look at how I wrote my bigquery connector here
>> <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we
>> get a new connector to a highly used datasource/sink
>>
>> Hope that helps
>>
>> Regards
>> Sam
>>
>> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
>> neil.v.maheshw...@gmail.com> wrote:
>>
>> Thanks for your response Ayan.
>>
>> This could be an option. One complication I see with that approach is
>> that I do not want to miss any records that are between the data we have
>> batched to the data store and the checkpoint. I would still need a
>> mechanism for recording the sequence number of the last time the data was
>> batched, so I could start the streaming application after that sequence
>> number.
>>
>> A similar approach could be to batch our data periodically, recording the
>> last sequence number of the batch. Then, fetch data from Kinesis using the
>> low level API to read data from the latest sequence number of the batched
>> data up until the sequence number of the latest checkpoint from our spark
>> app. I could merge batched dataset and the dataset fetched from Kinesis’s
>> lower level API, and use that dataset as an RDD to prep the job.
>>
>> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>> Hi
>>
>> AFAIK, Kinesis does not provide any mechanism other than check point to
>> restart. That makes sense as it makes it so generic.
>>
>> Question: why cant you warm up your data from a data store? Say every 30
>> mins you run a job to aggregate your data to a data store for that hour.
>> When you restart the streaming app it would read from dynamo check point,
>> but it would also preps an initial rdd from data store?
>>
>> Best
>> Ayan
>> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
>> neil.v.maheshw...@gmail.com> wrote:
>>
>> Hello,
>>
>> I am building a Spark streaming application that ingests data from an
>> Amazon Kinesis stream. My application keeps track of the minimum price over
>> a window for groups of similar tickets. When I deploy the application, I
>> would like it to start processing at the start of the previous hours data.
>> This will warm up the state of the application and allow us to deploy our
>> application faster. For example, if I start the application at 3 PM, I
>> would like to process the data retained by Kinesis from 2PM to 3PM, and
>> then continue receiving data going forward. Spark Streaming’s Kinesis
>> receiver, which relies on the Amazon Kinesis Client Library, seems to give
>> me three options for choosing where to read from the stream:
>>
>>    - read from the latest checkpointed sequence number in Dynamo
>>    - start from the oldest record in the stream (TRIM_HORIZON shard
>>    iterator type)
>>    - start from the most recent record in the stream (LATEST shard
>>    iterator type)
>>
>>
>> Do you have any suggestions on how we could start our application at a
>> specific timestamp or sequence number in the Kinesis stream? Some ideas I
>> had were:
>>
>>    - Create a KCL application that fetches the previous hour data and
>>    writes it to HDFS. We can create an RDD from that dataset and initialize
>>    our Spark Streaming job with it. The spark streaming job’s Kinesis 
>> receiver
>>    can have the same name as the initial KCL application, and use that
>>    applications checkpoint as the starting point. We’re writing our spark 
>> jobs
>>    in Python, so this would require launching the java MultiLang daemon, or
>>    writing that portion of the application in Java/Scala.
>>    - Before the Spark streaming application starts, we could fetch a
>>    shard iterator using the AT_TIMESTAMP shard iterator type. We could record
>>    the sequence number of the first record returned by this iterator, and
>>    create an entry in Dynamo for our application for that sequence number. 
>> Our
>>    Kinesis receiver would pick up from this checkpoint. It makes me a little
>>    nervous that we would be faking Kinesis Client Library's protocol by
>>    writing a checkpoint into Dynamo
>>
>>
>> Thanks in advance!
>>
>> Neil
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>>

Reply via email to