Hi Ayan, This "inline view" idea is really awesome and enlightens me! Finally I have a plan to move on. I greatly appreciate your help!
Best regards, Yang 2017-01-03 18:14 GMT+01:00 ayan guha <guha.a...@gmail.com>: > Ahh I see what you mean....I confused two terminologies....because we were > talking about partitioning and then changed topic to identify changed data > .... > > For that, you can "construct" a dbtable as an inline view - > > viewSQL = "(select * from table where <column> > '<last_modified_value>')". > replace("<column>","inserted_on").replace("<last_modified_ > value>",checkPointedValue) > dbtable =viewSQL > > refer to this > <http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/> > blog... > > So, in summary, you have 2 things > > 1. Identify changed data - my suggestion to use dbtable with inline view > 2. parallelism - use numPartition,lowerbound,upper bound to generate > number of partitions > > HTH.... > > > > On Wed, Jan 4, 2017 at 3:46 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote: > >> Hi Ayan, >> >> Yeah, I understand your proposal, but according to here >> http://spark.apache.org/docs/latest/sql-programming-gui >> de.html#jdbc-to-other-databases, it says >> >> Notice that lowerBound and upperBound are just used to decide the >> partition stride, not for filtering the rows in table. So all rows in the >> table will be partitioned and returned. This option applies only to reading. >> >> So my interpretation is all rows in the table are ingested, and this >> "lowerBound" and "upperBound" is the span of each partition. Well, I am not >> a native English speaker, maybe it means differently? >> >> Best regards, >> Yang >> >> 2017-01-03 17:23 GMT+01:00 ayan guha <guha.a...@gmail.com>: >> >>> Hi >>> >>> You need to store and capture the Max of the column you intend to use >>> for identifying new records (Ex: INSERTED_ON) after every successful run of >>> your job. Then, use the value in lowerBound option. >>> >>> Essentially, you want to create a query like >>> >>> select * from table where INSERTED_ON > lowerBound and >>> INSERTED_ON<upperBound >>> >>> everytime you run the job.... >>> >>> >>> >>> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote: >>> >>>> Hi Ayan, >>>> >>>> Thanks a lot for your suggestion. I am currently looking into sqoop. >>>> >>>> Concerning your suggestion for Spark, it is indeed parallelized with >>>> multiple workers, but the job is one-off and cannot keep streaming. >>>> Moreover, I cannot specify any "start row" in the job, it will always >>>> ingest the entire table. So I also cannot simulate a streaming process by >>>> starting the job in fix intervals... >>>> >>>> Best regards, >>>> Yang >>>> >>>> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>: >>>> >>>>> Hi >>>>> >>>>> While the solutions provided by others looks promising and I'd like to >>>>> try out few of them, our old pal sqoop already "does" the job. It has a >>>>> incremental mode where you can provide a --check-column and >>>>> --last-modified-value combination to grab the data - and yes, sqoop >>>>> essentially does it by running a MAP-only job which spawns number of >>>>> parallel map task to grab data from DB. >>>>> >>>>> In Spark, you can use sqlContext.load function for JDBC and use >>>>> partitionColumn and numPartition to define parallelism of connection. >>>>> >>>>> Best >>>>> Ayan >>>>> >>>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Ayan, >>>>>> >>>>>> Thanks a lot for such a detailed response. I really appreciate it! >>>>>> >>>>>> I think this use case can be generalized, because the data is >>>>>> immutable and append-only. We only need to find one column or timestamp >>>>>> to >>>>>> track the last row consumed in the previous ingestion. This pattern >>>>>> should >>>>>> be common when storing sensor data. If the data is mutable, then the >>>>>> solution will be surely difficult and vendor specific as you said. >>>>>> >>>>>> The workflow you proposed is very useful. The difficulty part is how >>>>>> to parallelize the ingestion task. With Spark when I have multiple >>>>>> workers >>>>>> working on the same job, I don't know if there is a way and how to >>>>>> dynamically change the row range each worker should process in >>>>>> realtime... >>>>>> >>>>>> I tried to find out if there is any candidate available out of the >>>>>> box, instead of reinventing the wheel. At this moment I have not >>>>>> discovered >>>>>> any existing tool can parallelize ingestion tasks on one database. Is >>>>>> Sqoop >>>>>> a proper candidate from your knowledge? >>>>>> >>>>>> Thank you again and have a nice day. >>>>>> >>>>>> Best regards, >>>>>> Yang >>>>>> >>>>>> >>>>>> >>>>>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>: >>>>>> >>>>>>> >>>>>>> >>>>>>> "If data ingestion speed is faster than data production speed, then >>>>>>> eventually the entire database will be harvested and those workers will >>>>>>> start to "tail" the database for new data streams and the processing >>>>>>> becomes real time." >>>>>>> >>>>>>> This part is really database dependent. So it will be hard to >>>>>>> generalize it. For example, say you have a batch interval of 10 >>>>>>> secs....what happens if you get more than one updates on the same row >>>>>>> within 10 secs? You will get a snapshot of every 10 secs. Now, different >>>>>>> databases provide different mechanisms to expose all DML changes, MySQL >>>>>>> has >>>>>>> binlogs, oracle has log shipping, cdc,golden gate and so >>>>>>> on....typically it >>>>>>> requires new product or new licenses and most likely new component >>>>>>> installation on production db :) >>>>>>> >>>>>>> So, if we keep real CDC solutions out of scope, a simple snapshot >>>>>>> solution can be achieved fairly easily by >>>>>>> >>>>>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s). >>>>>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX) >>>>>>> 3. Running an extraction/load mechanism which will take data from DB >>>>>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This >>>>>>> can >>>>>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you >>>>>>> can >>>>>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of >>>>>>> the >>>>>>> ETL tools would too... >>>>>>> 4. Finally, update check point... >>>>>>> >>>>>>> You may "determine" checkpoint from the data you already have in >>>>>>> HDFS if you create a Hive structure on it. >>>>>>> >>>>>>> Best >>>>>>> AYan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd....@gmail.com> wrote: >>>>>>> >>>>>>>> why not sync binlog of mysql(hopefully the data is immutable and >>>>>>>> the table is append-only), send the log through kafka and then consume >>>>>>>> it >>>>>>>> by spark streaming? >>>>>>>> >>>>>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust < >>>>>>>> mich...@databricks.com> wrote: >>>>>>>> >>>>>>>>> We don't support this yet, but I've opened this JIRA as it sounds >>>>>>>>> generally useful: https://issues.apache. >>>>>>>>> org/jira/browse/SPARK-19031 >>>>>>>>> >>>>>>>>> In the mean time you could try implementing your own Source, but >>>>>>>>> that is pretty low level and is not yet a stable API. >>>>>>>>> >>>>>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" < >>>>>>>>> yyz1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> Thanks a lot for your contributions to bring us new technologies. >>>>>>>>>> >>>>>>>>>> I don't want to waste your time, so before I write to you, I >>>>>>>>>> googled, checked stackoverflow and mailing list archive with keywords >>>>>>>>>> "streaming" and "jdbc". But I was not able to get any solution to my >>>>>>>>>> use >>>>>>>>>> case. I hope I can get some clarification from you. >>>>>>>>>> >>>>>>>>>> The use case is quite straightforward, I need to harvest a >>>>>>>>>> relational database via jdbc, do something with data, and store >>>>>>>>>> result into >>>>>>>>>> Kafka. I am stuck at the first step, and the difficulty is as >>>>>>>>>> follows: >>>>>>>>>> >>>>>>>>>> 1. The database is too large to ingest with one thread. >>>>>>>>>> 2. The database is dynamic and time series data comes in >>>>>>>>>> constantly. >>>>>>>>>> >>>>>>>>>> Then an ideal workflow is that multiple workers process >>>>>>>>>> partitions of data incrementally according to a time window. For >>>>>>>>>> example, >>>>>>>>>> the processing starts from the earliest data with each batch >>>>>>>>>> containing >>>>>>>>>> data for one hour. If data ingestion speed is faster than data >>>>>>>>>> production >>>>>>>>>> speed, then eventually the entire database will be harvested and >>>>>>>>>> those >>>>>>>>>> workers will start to "tail" the database for new data streams and >>>>>>>>>> the >>>>>>>>>> processing becomes real time. >>>>>>>>>> >>>>>>>>>> With Spark SQL I can ingest data from a JDBC source with >>>>>>>>>> partitions divided by time windows, but how can I dynamically >>>>>>>>>> increment the >>>>>>>>>> time windows during execution? Assume that there are two workers >>>>>>>>>> ingesting >>>>>>>>>> data of 2017-01-01 and 2017-01-02, the one which finishes quicker >>>>>>>>>> gets next >>>>>>>>>> task for 2017-01-03. But I am not able to find out how to increment >>>>>>>>>> those >>>>>>>>>> values during execution. >>>>>>>>>> >>>>>>>>>> Then I looked into Structured Streaming. It looks much more >>>>>>>>>> promising because window operations based on event time are >>>>>>>>>> considered >>>>>>>>>> during streaming, which could be the solution to my use case. >>>>>>>>>> However, from >>>>>>>>>> documentation and code example I did not find anything related to >>>>>>>>>> streaming >>>>>>>>>> data from a growing database. Is there anything I can read to >>>>>>>>>> achieve my >>>>>>>>>> goal? >>>>>>>>>> >>>>>>>>>> Any suggestion is highly appreciated. Thank you very much and >>>>>>>>>> have a nice day. >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> Yang >>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>> --------- >>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best Regards, >>>>>>> Ayan Guha >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Ayan Guha >>>>> >>>> >>>> >>> >>> >>> -- >>> Best Regards, >>> Ayan Guha >>> >> >> > > > -- > Best Regards, > Ayan Guha >