Ahh I see what you mean....I confused two terminologies....because we were talking about partitioning and then changed topic to identify changed data ....
For that, you can "construct" a dbtable as an inline view - viewSQL = "(select * from table where <column> > '<last_modified_value>')".replace("<column>","inserted_on").replace("<last_modified_value>",checkPointedValue) dbtable =viewSQL refer to this <http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/> blog... So, in summary, you have 2 things 1. Identify changed data - my suggestion to use dbtable with inline view 2. parallelism - use numPartition,lowerbound,upper bound to generate number of partitions HTH.... On Wed, Jan 4, 2017 at 3:46 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote: > Hi Ayan, > > Yeah, I understand your proposal, but according to here > http://spark.apache.org/docs/latest/sql-programming- > guide.html#jdbc-to-other-databases, it says > > Notice that lowerBound and upperBound are just used to decide the > partition stride, not for filtering the rows in table. So all rows in the > table will be partitioned and returned. This option applies only to reading. > > So my interpretation is all rows in the table are ingested, and this > "lowerBound" and "upperBound" is the span of each partition. Well, I am not > a native English speaker, maybe it means differently? > > Best regards, > Yang > > 2017-01-03 17:23 GMT+01:00 ayan guha <guha.a...@gmail.com>: > >> Hi >> >> You need to store and capture the Max of the column you intend to use for >> identifying new records (Ex: INSERTED_ON) after every successful run of >> your job. Then, use the value in lowerBound option. >> >> Essentially, you want to create a query like >> >> select * from table where INSERTED_ON > lowerBound and >> INSERTED_ON<upperBound >> >> everytime you run the job.... >> >> >> >> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote: >> >>> Hi Ayan, >>> >>> Thanks a lot for your suggestion. I am currently looking into sqoop. >>> >>> Concerning your suggestion for Spark, it is indeed parallelized with >>> multiple workers, but the job is one-off and cannot keep streaming. >>> Moreover, I cannot specify any "start row" in the job, it will always >>> ingest the entire table. So I also cannot simulate a streaming process by >>> starting the job in fix intervals... >>> >>> Best regards, >>> Yang >>> >>> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>: >>> >>>> Hi >>>> >>>> While the solutions provided by others looks promising and I'd like to >>>> try out few of them, our old pal sqoop already "does" the job. It has a >>>> incremental mode where you can provide a --check-column and >>>> --last-modified-value combination to grab the data - and yes, sqoop >>>> essentially does it by running a MAP-only job which spawns number of >>>> parallel map task to grab data from DB. >>>> >>>> In Spark, you can use sqlContext.load function for JDBC and use >>>> partitionColumn and numPartition to define parallelism of connection. >>>> >>>> Best >>>> Ayan >>>> >>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com> >>>> wrote: >>>> >>>>> Hi Ayan, >>>>> >>>>> Thanks a lot for such a detailed response. I really appreciate it! >>>>> >>>>> I think this use case can be generalized, because the data is >>>>> immutable and append-only. We only need to find one column or timestamp to >>>>> track the last row consumed in the previous ingestion. This pattern should >>>>> be common when storing sensor data. If the data is mutable, then the >>>>> solution will be surely difficult and vendor specific as you said. >>>>> >>>>> The workflow you proposed is very useful. The difficulty part is how >>>>> to parallelize the ingestion task. With Spark when I have multiple workers >>>>> working on the same job, I don't know if there is a way and how to >>>>> dynamically change the row range each worker should process in realtime... >>>>> >>>>> I tried to find out if there is any candidate available out of the >>>>> box, instead of reinventing the wheel. At this moment I have not >>>>> discovered >>>>> any existing tool can parallelize ingestion tasks on one database. Is >>>>> Sqoop >>>>> a proper candidate from your knowledge? >>>>> >>>>> Thank you again and have a nice day. >>>>> >>>>> Best regards, >>>>> Yang >>>>> >>>>> >>>>> >>>>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>: >>>>> >>>>>> >>>>>> >>>>>> "If data ingestion speed is faster than data production speed, then >>>>>> eventually the entire database will be harvested and those workers will >>>>>> start to "tail" the database for new data streams and the processing >>>>>> becomes real time." >>>>>> >>>>>> This part is really database dependent. So it will be hard to >>>>>> generalize it. For example, say you have a batch interval of 10 >>>>>> secs....what happens if you get more than one updates on the same row >>>>>> within 10 secs? You will get a snapshot of every 10 secs. Now, different >>>>>> databases provide different mechanisms to expose all DML changes, MySQL >>>>>> has >>>>>> binlogs, oracle has log shipping, cdc,golden gate and so on....typically >>>>>> it >>>>>> requires new product or new licenses and most likely new component >>>>>> installation on production db :) >>>>>> >>>>>> So, if we keep real CDC solutions out of scope, a simple snapshot >>>>>> solution can be achieved fairly easily by >>>>>> >>>>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s). >>>>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX) >>>>>> 3. Running an extraction/load mechanism which will take data from DB >>>>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This >>>>>> can >>>>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you >>>>>> can >>>>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the >>>>>> ETL tools would too... >>>>>> 4. Finally, update check point... >>>>>> >>>>>> You may "determine" checkpoint from the data you already have in HDFS >>>>>> if you create a Hive structure on it. >>>>>> >>>>>> Best >>>>>> AYan >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd....@gmail.com> wrote: >>>>>> >>>>>>> why not sync binlog of mysql(hopefully the data is immutable and the >>>>>>> table is append-only), send the log through kafka and then consume it by >>>>>>> spark streaming? >>>>>>> >>>>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust < >>>>>>> mich...@databricks.com> wrote: >>>>>>> >>>>>>>> We don't support this yet, but I've opened this JIRA as it sounds >>>>>>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031 >>>>>>>> >>>>>>>> In the mean time you could try implementing your own Source, but >>>>>>>> that is pretty low level and is not yet a stable API. >>>>>>>> >>>>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" < >>>>>>>> yyz1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> Thanks a lot for your contributions to bring us new technologies. >>>>>>>>> >>>>>>>>> I don't want to waste your time, so before I write to you, I >>>>>>>>> googled, checked stackoverflow and mailing list archive with keywords >>>>>>>>> "streaming" and "jdbc". But I was not able to get any solution to my >>>>>>>>> use >>>>>>>>> case. I hope I can get some clarification from you. >>>>>>>>> >>>>>>>>> The use case is quite straightforward, I need to harvest a >>>>>>>>> relational database via jdbc, do something with data, and store >>>>>>>>> result into >>>>>>>>> Kafka. I am stuck at the first step, and the difficulty is as follows: >>>>>>>>> >>>>>>>>> 1. The database is too large to ingest with one thread. >>>>>>>>> 2. The database is dynamic and time series data comes in >>>>>>>>> constantly. >>>>>>>>> >>>>>>>>> Then an ideal workflow is that multiple workers process partitions >>>>>>>>> of data incrementally according to a time window. For example, the >>>>>>>>> processing starts from the earliest data with each batch containing >>>>>>>>> data >>>>>>>>> for one hour. If data ingestion speed is faster than data production >>>>>>>>> speed, >>>>>>>>> then eventually the entire database will be harvested and those >>>>>>>>> workers >>>>>>>>> will start to "tail" the database for new data streams and the >>>>>>>>> processing >>>>>>>>> becomes real time. >>>>>>>>> >>>>>>>>> With Spark SQL I can ingest data from a JDBC source with >>>>>>>>> partitions divided by time windows, but how can I dynamically >>>>>>>>> increment the >>>>>>>>> time windows during execution? Assume that there are two workers >>>>>>>>> ingesting >>>>>>>>> data of 2017-01-01 and 2017-01-02, the one which finishes quicker >>>>>>>>> gets next >>>>>>>>> task for 2017-01-03. But I am not able to find out how to increment >>>>>>>>> those >>>>>>>>> values during execution. >>>>>>>>> >>>>>>>>> Then I looked into Structured Streaming. It looks much more >>>>>>>>> promising because window operations based on event time are considered >>>>>>>>> during streaming, which could be the solution to my use case. >>>>>>>>> However, from >>>>>>>>> documentation and code example I did not find anything related to >>>>>>>>> streaming >>>>>>>>> data from a growing database. Is there anything I can read to achieve >>>>>>>>> my >>>>>>>>> goal? >>>>>>>>> >>>>>>>>> Any suggestion is highly appreciated. Thank you very much and have >>>>>>>>> a nice day. >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> Yang >>>>>>>>> ------------------------------------------------------------ >>>>>>>>> --------- >>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> Ayan Guha >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards, >>>> Ayan Guha >>>> >>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha