Ahh I see what you mean....I confused two terminologies....because we were
talking about partitioning and then changed topic to identify changed data
....

For that, you can "construct" a dbtable as an inline view -

viewSQL = "(select * from table where <column> >
'<last_modified_value>')".replace("<column>","inserted_on").replace("<last_modified_value>",checkPointedValue)
dbtable =viewSQL

refer to this
<http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/>
blog...

So, in summary, you have 2 things

1. Identify changed data - my suggestion to use dbtable with inline view
2. parallelism - use numPartition,lowerbound,upper bound to generate number
of partitions

HTH....



On Wed, Jan 4, 2017 at 3:46 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:

> Hi Ayan,
>
> Yeah, I understand your proposal, but according to here
> http://spark.apache.org/docs/latest/sql-programming-
> guide.html#jdbc-to-other-databases, it says
>
> Notice that lowerBound and upperBound are just used to decide the
> partition stride, not for filtering the rows in table. So all rows in the
> table will be partitioned and returned. This option applies only to reading.
>
> So my interpretation is all rows in the table are ingested, and this
> "lowerBound" and "upperBound" is the span of each partition. Well, I am not
> a native English speaker, maybe it means differently?
>
> Best regards,
> Yang
>
> 2017-01-03 17:23 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>
>> Hi
>>
>> You need to store and capture the Max of the column you intend to use for
>> identifying new records (Ex: INSERTED_ON) after every successful run of
>> your job. Then, use the value in lowerBound option.
>>
>> Essentially, you want to create a query like
>>
>> select * from table where INSERTED_ON > lowerBound and
>> INSERTED_ON<upperBound
>>
>> everytime you run the job....
>>
>>
>>
>> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>>
>>> Hi Ayan,
>>>
>>> Thanks a lot for your suggestion. I am currently looking into sqoop.
>>>
>>> Concerning your suggestion for Spark, it is indeed parallelized with
>>> multiple workers, but the job is one-off and cannot keep streaming.
>>> Moreover, I cannot specify any "start row" in the job, it will always
>>> ingest the entire table. So I also cannot simulate a streaming process by
>>> starting the job in fix intervals...
>>>
>>> Best regards,
>>> Yang
>>>
>>> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>>
>>>> Hi
>>>>
>>>> While the solutions provided by others looks promising and I'd like to
>>>> try out few of them, our old pal sqoop already "does" the job. It has a
>>>> incremental mode where you can provide a --check-column and
>>>> --last-modified-value combination to grab the data - and yes, sqoop
>>>> essentially does it by running a MAP-only job which spawns number of
>>>> parallel map task to grab data from DB.
>>>>
>>>> In Spark, you can use sqlContext.load function for JDBC and use
>>>> partitionColumn and numPartition to define parallelism of connection.
>>>>
>>>> Best
>>>> Ayan
>>>>
>>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ayan,
>>>>>
>>>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>>>
>>>>> I think this use case can be generalized, because the data is
>>>>> immutable and append-only. We only need to find one column or timestamp to
>>>>> track the last row consumed in the previous ingestion. This pattern should
>>>>> be common when storing sensor data. If the data is mutable, then the
>>>>> solution will be surely difficult and vendor specific as you said.
>>>>>
>>>>> The workflow you proposed is very useful. The difficulty part is how
>>>>> to parallelize the ingestion task. With Spark when I have multiple workers
>>>>> working on the same job, I don't know if there is a way and how to
>>>>> dynamically change the row range each worker should process in realtime...
>>>>>
>>>>> I tried to find out if there is any candidate available out of the
>>>>> box, instead of reinventing the wheel. At this moment I have not 
>>>>> discovered
>>>>> any existing tool can parallelize ingestion tasks on one database. Is 
>>>>> Sqoop
>>>>> a proper candidate from your knowledge?
>>>>>
>>>>> Thank you again and have a nice day.
>>>>>
>>>>> Best regards,
>>>>> Yang
>>>>>
>>>>>
>>>>>
>>>>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>>>>
>>>>>>
>>>>>>
>>>>>> "If data ingestion speed is faster than data production speed, then
>>>>>> eventually the entire database will be harvested and those workers will
>>>>>> start to "tail" the database for new data streams and the processing
>>>>>> becomes real time."
>>>>>>
>>>>>> This part is really database dependent. So it will be hard to
>>>>>> generalize it. For example, say you have a batch interval of 10
>>>>>> secs....what happens if you get more than one updates on the same row
>>>>>> within 10 secs? You will get a snapshot of every 10 secs. Now, different
>>>>>> databases provide different mechanisms to expose all DML changes, MySQL 
>>>>>> has
>>>>>> binlogs, oracle has log shipping, cdc,golden gate and so on....typically 
>>>>>> it
>>>>>> requires new product or new licenses and most likely new component
>>>>>> installation on production db :)
>>>>>>
>>>>>> So, if we keep real CDC solutions out of scope, a simple snapshot
>>>>>> solution can be achieved fairly easily by
>>>>>>
>>>>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>>>>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>>>>>> 3. Running an extraction/load mechanism which will take data from DB
>>>>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This 
>>>>>> can
>>>>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you 
>>>>>> can
>>>>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
>>>>>> ETL tools would too...
>>>>>> 4. Finally, update check point...
>>>>>>
>>>>>> You may "determine" checkpoint from the data you already have in HDFS
>>>>>> if you create a Hive structure on it.
>>>>>>
>>>>>> Best
>>>>>> AYan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd....@gmail.com> wrote:
>>>>>>
>>>>>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>>>>>> table is append-only), send the log through kafka and then consume it by
>>>>>>> spark streaming?
>>>>>>>
>>>>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>>>>>> mich...@databricks.com> wrote:
>>>>>>>
>>>>>>>> We don't support this yet, but I've opened this JIRA as it sounds
>>>>>>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>>>>>>
>>>>>>>> In the mean time you could try implementing your own Source, but
>>>>>>>> that is pretty low level and is not yet a stable API.
>>>>>>>>
>>>>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
>>>>>>>> yyz1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> Thanks a lot for your contributions to bring us new technologies.
>>>>>>>>>
>>>>>>>>> I don't want to waste your time, so before I write to you, I
>>>>>>>>> googled, checked stackoverflow and mailing list archive with keywords
>>>>>>>>> "streaming" and "jdbc". But I was not able to get any solution to my 
>>>>>>>>> use
>>>>>>>>> case. I hope I can get some clarification from you.
>>>>>>>>>
>>>>>>>>> The use case is quite straightforward, I need to harvest a
>>>>>>>>> relational database via jdbc, do something with data, and store 
>>>>>>>>> result into
>>>>>>>>> Kafka. I am stuck at the first step, and the difficulty is as follows:
>>>>>>>>>
>>>>>>>>> 1. The database is too large to ingest with one thread.
>>>>>>>>> 2. The database is dynamic and time series data comes in
>>>>>>>>> constantly.
>>>>>>>>>
>>>>>>>>> Then an ideal workflow is that multiple workers process partitions
>>>>>>>>> of data incrementally according to a time window. For example, the
>>>>>>>>> processing starts from the earliest data with each batch containing 
>>>>>>>>> data
>>>>>>>>> for one hour. If data ingestion speed is faster than data production 
>>>>>>>>> speed,
>>>>>>>>> then eventually the entire database will be harvested and those 
>>>>>>>>> workers
>>>>>>>>> will start to "tail" the database for new data streams and the 
>>>>>>>>> processing
>>>>>>>>> becomes real time.
>>>>>>>>>
>>>>>>>>> With Spark SQL I can ingest data from a JDBC source with
>>>>>>>>> partitions divided by time windows, but how can I dynamically 
>>>>>>>>> increment the
>>>>>>>>> time windows during execution? Assume that there are two workers 
>>>>>>>>> ingesting
>>>>>>>>> data of 2017-01-01 and 2017-01-02, the one which finishes quicker 
>>>>>>>>> gets next
>>>>>>>>> task for 2017-01-03. But I am not able to find out how to increment 
>>>>>>>>> those
>>>>>>>>> values during execution.
>>>>>>>>>
>>>>>>>>> Then I looked into Structured Streaming. It looks much more
>>>>>>>>> promising because window operations based on event time are considered
>>>>>>>>> during streaming, which could be the solution to my use case. 
>>>>>>>>> However, from
>>>>>>>>> documentation and code example I did not find anything related to 
>>>>>>>>> streaming
>>>>>>>>> data from a growing database. Is there anything I can read to achieve 
>>>>>>>>> my
>>>>>>>>> goal?
>>>>>>>>>
>>>>>>>>> Any suggestion is highly appreciated. Thank you very much and have
>>>>>>>>> a nice day.
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> Yang
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>> ---------
>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to