Hi You need to store and capture the Max of the column you intend to use for identifying new records (Ex: INSERTED_ON) after every successful run of your job. Then, use the value in lowerBound option.
Essentially, you want to create a query like select * from table where INSERTED_ON > lowerBound and INSERTED_ON<upperBound everytime you run the job.... On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote: > Hi Ayan, > > Thanks a lot for your suggestion. I am currently looking into sqoop. > > Concerning your suggestion for Spark, it is indeed parallelized with > multiple workers, but the job is one-off and cannot keep streaming. > Moreover, I cannot specify any "start row" in the job, it will always > ingest the entire table. So I also cannot simulate a streaming process by > starting the job in fix intervals... > > Best regards, > Yang > > 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>: > >> Hi >> >> While the solutions provided by others looks promising and I'd like to >> try out few of them, our old pal sqoop already "does" the job. It has a >> incremental mode where you can provide a --check-column and >> --last-modified-value combination to grab the data - and yes, sqoop >> essentially does it by running a MAP-only job which spawns number of >> parallel map task to grab data from DB. >> >> In Spark, you can use sqlContext.load function for JDBC and use >> partitionColumn and numPartition to define parallelism of connection. >> >> Best >> Ayan >> >> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote: >> >>> Hi Ayan, >>> >>> Thanks a lot for such a detailed response. I really appreciate it! >>> >>> I think this use case can be generalized, because the data is immutable >>> and append-only. We only need to find one column or timestamp to track the >>> last row consumed in the previous ingestion. This pattern should be common >>> when storing sensor data. If the data is mutable, then the solution will be >>> surely difficult and vendor specific as you said. >>> >>> The workflow you proposed is very useful. The difficulty part is how to >>> parallelize the ingestion task. With Spark when I have multiple workers >>> working on the same job, I don't know if there is a way and how to >>> dynamically change the row range each worker should process in realtime... >>> >>> I tried to find out if there is any candidate available out of the box, >>> instead of reinventing the wheel. At this moment I have not discovered any >>> existing tool can parallelize ingestion tasks on one database. Is Sqoop a >>> proper candidate from your knowledge? >>> >>> Thank you again and have a nice day. >>> >>> Best regards, >>> Yang >>> >>> >>> >>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>: >>> >>>> >>>> >>>> "If data ingestion speed is faster than data production speed, then >>>> eventually the entire database will be harvested and those workers will >>>> start to "tail" the database for new data streams and the processing >>>> becomes real time." >>>> >>>> This part is really database dependent. So it will be hard to >>>> generalize it. For example, say you have a batch interval of 10 >>>> secs....what happens if you get more than one updates on the same row >>>> within 10 secs? You will get a snapshot of every 10 secs. Now, different >>>> databases provide different mechanisms to expose all DML changes, MySQL has >>>> binlogs, oracle has log shipping, cdc,golden gate and so on....typically it >>>> requires new product or new licenses and most likely new component >>>> installation on production db :) >>>> >>>> So, if we keep real CDC solutions out of scope, a simple snapshot >>>> solution can be achieved fairly easily by >>>> >>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s). >>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX) >>>> 3. Running an extraction/load mechanism which will take data from DB >>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can >>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can >>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the >>>> ETL tools would too... >>>> 4. Finally, update check point... >>>> >>>> You may "determine" checkpoint from the data you already have in HDFS >>>> if you create a Hive structure on it. >>>> >>>> Best >>>> AYan >>>> >>>> >>>> >>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd....@gmail.com> wrote: >>>> >>>>> why not sync binlog of mysql(hopefully the data is immutable and the >>>>> table is append-only), send the log through kafka and then consume it by >>>>> spark streaming? >>>>> >>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust < >>>>> mich...@databricks.com> wrote: >>>>> >>>>>> We don't support this yet, but I've opened this JIRA as it sounds >>>>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031 >>>>>> >>>>>> In the mean time you could try implementing your own Source, but that >>>>>> is pretty low level and is not yet a stable API. >>>>>> >>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" < >>>>>> yyz1...@gmail.com> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> Thanks a lot for your contributions to bring us new technologies. >>>>>>> >>>>>>> I don't want to waste your time, so before I write to you, I >>>>>>> googled, checked stackoverflow and mailing list archive with keywords >>>>>>> "streaming" and "jdbc". But I was not able to get any solution to my use >>>>>>> case. I hope I can get some clarification from you. >>>>>>> >>>>>>> The use case is quite straightforward, I need to harvest a >>>>>>> relational database via jdbc, do something with data, and store result >>>>>>> into >>>>>>> Kafka. I am stuck at the first step, and the difficulty is as follows: >>>>>>> >>>>>>> 1. The database is too large to ingest with one thread. >>>>>>> 2. The database is dynamic and time series data comes in constantly. >>>>>>> >>>>>>> Then an ideal workflow is that multiple workers process partitions >>>>>>> of data incrementally according to a time window. For example, the >>>>>>> processing starts from the earliest data with each batch containing data >>>>>>> for one hour. If data ingestion speed is faster than data production >>>>>>> speed, >>>>>>> then eventually the entire database will be harvested and those workers >>>>>>> will start to "tail" the database for new data streams and the >>>>>>> processing >>>>>>> becomes real time. >>>>>>> >>>>>>> With Spark SQL I can ingest data from a JDBC source with partitions >>>>>>> divided by time windows, but how can I dynamically increment the time >>>>>>> windows during execution? Assume that there are two workers ingesting >>>>>>> data >>>>>>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next >>>>>>> task >>>>>>> for 2017-01-03. But I am not able to find out how to increment those >>>>>>> values >>>>>>> during execution. >>>>>>> >>>>>>> Then I looked into Structured Streaming. It looks much more >>>>>>> promising because window operations based on event time are considered >>>>>>> during streaming, which could be the solution to my use case. However, >>>>>>> from >>>>>>> documentation and code example I did not find anything related to >>>>>>> streaming >>>>>>> data from a growing database. Is there anything I can read to achieve my >>>>>>> goal? >>>>>>> >>>>>>> Any suggestion is highly appreciated. Thank you very much and have a >>>>>>> nice day. >>>>>>> >>>>>>> Best regards, >>>>>>> Yang >>>>>>> ------------------------------------------------------------ >>>>>>> --------- >>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards, >>>> Ayan Guha >>>> >>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha