Hi

You need to store and capture the Max of the column you intend to use for
identifying new records (Ex: INSERTED_ON) after every successful run of
your job. Then, use the value in lowerBound option.

Essentially, you want to create a query like

select * from table where INSERTED_ON > lowerBound and
INSERTED_ON<upperBound

everytime you run the job....



On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:

> Hi Ayan,
>
> Thanks a lot for your suggestion. I am currently looking into sqoop.
>
> Concerning your suggestion for Spark, it is indeed parallelized with
> multiple workers, but the job is one-off and cannot keep streaming.
> Moreover, I cannot specify any "start row" in the job, it will always
> ingest the entire table. So I also cannot simulate a streaming process by
> starting the job in fix intervals...
>
> Best regards,
> Yang
>
> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>
>> Hi
>>
>> While the solutions provided by others looks promising and I'd like to
>> try out few of them, our old pal sqoop already "does" the job. It has a
>> incremental mode where you can provide a --check-column and
>> --last-modified-value combination to grab the data - and yes, sqoop
>> essentially does it by running a MAP-only job which spawns number of
>> parallel map task to grab data from DB.
>>
>> In Spark, you can use sqlContext.load function for JDBC and use
>> partitionColumn and numPartition to define parallelism of connection.
>>
>> Best
>> Ayan
>>
>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>>
>>> Hi Ayan,
>>>
>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>
>>> I think this use case can be generalized, because the data is immutable
>>> and append-only. We only need to find one column or timestamp to track the
>>> last row consumed in the previous ingestion. This pattern should be common
>>> when storing sensor data. If the data is mutable, then the solution will be
>>> surely difficult and vendor specific as you said.
>>>
>>> The workflow you proposed is very useful. The difficulty part is how to
>>> parallelize the ingestion task. With Spark when I have multiple workers
>>> working on the same job, I don't know if there is a way and how to
>>> dynamically change the row range each worker should process in realtime...
>>>
>>> I tried to find out if there is any candidate available out of the box,
>>> instead of reinventing the wheel. At this moment I have not discovered any
>>> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
>>> proper candidate from your knowledge?
>>>
>>> Thank you again and have a nice day.
>>>
>>> Best regards,
>>> Yang
>>>
>>>
>>>
>>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>>
>>>>
>>>>
>>>> "If data ingestion speed is faster than data production speed, then
>>>> eventually the entire database will be harvested and those workers will
>>>> start to "tail" the database for new data streams and the processing
>>>> becomes real time."
>>>>
>>>> This part is really database dependent. So it will be hard to
>>>> generalize it. For example, say you have a batch interval of 10
>>>> secs....what happens if you get more than one updates on the same row
>>>> within 10 secs? You will get a snapshot of every 10 secs. Now, different
>>>> databases provide different mechanisms to expose all DML changes, MySQL has
>>>> binlogs, oracle has log shipping, cdc,golden gate and so on....typically it
>>>> requires new product or new licenses and most likely new component
>>>> installation on production db :)
>>>>
>>>> So, if we keep real CDC solutions out of scope, a simple snapshot
>>>> solution can be achieved fairly easily by
>>>>
>>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>>>> 3. Running an extraction/load mechanism which will take data from DB
>>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
>>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
>>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
>>>> ETL tools would too...
>>>> 4. Finally, update check point...
>>>>
>>>> You may "determine" checkpoint from the data you already have in HDFS
>>>> if you create a Hive structure on it.
>>>>
>>>> Best
>>>> AYan
>>>>
>>>>
>>>>
>>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd....@gmail.com> wrote:
>>>>
>>>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>>>> table is append-only), send the log through kafka and then consume it by
>>>>> spark streaming?
>>>>>
>>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> We don't support this yet, but I've opened this JIRA as it sounds
>>>>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>>>>
>>>>>> In the mean time you could try implementing your own Source, but that
>>>>>> is pretty low level and is not yet a stable API.
>>>>>>
>>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
>>>>>> yyz1...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Thanks a lot for your contributions to bring us new technologies.
>>>>>>>
>>>>>>> I don't want to waste your time, so before I write to you, I
>>>>>>> googled, checked stackoverflow and mailing list archive with keywords
>>>>>>> "streaming" and "jdbc". But I was not able to get any solution to my use
>>>>>>> case. I hope I can get some clarification from you.
>>>>>>>
>>>>>>> The use case is quite straightforward, I need to harvest a
>>>>>>> relational database via jdbc, do something with data, and store result 
>>>>>>> into
>>>>>>> Kafka. I am stuck at the first step, and the difficulty is as follows:
>>>>>>>
>>>>>>> 1. The database is too large to ingest with one thread.
>>>>>>> 2. The database is dynamic and time series data comes in constantly.
>>>>>>>
>>>>>>> Then an ideal workflow is that multiple workers process partitions
>>>>>>> of data incrementally according to a time window. For example, the
>>>>>>> processing starts from the earliest data with each batch containing data
>>>>>>> for one hour. If data ingestion speed is faster than data production 
>>>>>>> speed,
>>>>>>> then eventually the entire database will be harvested and those workers
>>>>>>> will start to "tail" the database for new data streams and the 
>>>>>>> processing
>>>>>>> becomes real time.
>>>>>>>
>>>>>>> With Spark SQL I can ingest data from a JDBC source with partitions
>>>>>>> divided by time windows, but how can I dynamically increment the time
>>>>>>> windows during execution? Assume that there are two workers ingesting 
>>>>>>> data
>>>>>>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next 
>>>>>>> task
>>>>>>> for 2017-01-03. But I am not able to find out how to increment those 
>>>>>>> values
>>>>>>> during execution.
>>>>>>>
>>>>>>> Then I looked into Structured Streaming. It looks much more
>>>>>>> promising because window operations based on event time are considered
>>>>>>> during streaming, which could be the solution to my use case. However, 
>>>>>>> from
>>>>>>> documentation and code example I did not find anything related to 
>>>>>>> streaming
>>>>>>> data from a growing database. Is there anything I can read to achieve my
>>>>>>> goal?
>>>>>>>
>>>>>>> Any suggestion is highly appreciated. Thank you very much and have a
>>>>>>> nice day.
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Yang
>>>>>>> ------------------------------------------------------------
>>>>>>> ---------
>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to