Why is atomic operations a requirement? I feel like doubling the amount of writes (with staging tables) is probably a tradeoff that the end user should make.
On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan <cloud0...@gmail.com> wrote: > Regardless the API, to use Spark to write data atomically, it requires > 1. Write data distributedly, with a central coordinator at Spark driver. > 2. The distributed writers are not guaranteed to run together at the same > time. (This can be relaxed if we can extend the barrier scheduling feature) > 3. The new data is visible if and only if all distributed writers success. > > According to these requirements, I think using a staging table is the most > common way and maybe the only way. I'm not sure how 2PC can help, we don't > want users to read partial data, so we need a final step to commit all the > data together. > > For RDBMS data sources, I think a simple solution is to ask users to > coalesce the input RDD/DataFrame into one partition, then we don't need to > care about multi-client transaction. Or using a staging table like Ryan > described before. > > > > On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim <kabh...@gmail.com> wrote: > >> > And regarding the issue that Jungtaek brought up, 2PC doesn't require >> tasks to be running at the same time, we need a mechanism to take down >> tasks after they have prepared and bring up the tasks during the commit >> phase. >> >> I guess we already got into too much details here, but if it is based on >> client transaction Spark must assign "commit" tasks to the executor which >> task was finished "prepare", and if it loses executor it is not feasible to >> force committing. Staging should come into play for that. >> >> We should also have mechanism for "recovery": Spark needs to ensure it >> finalizes "commit" even in case of failures before starting a new batch. >> >> So not an easy thing to integrate correctly. >> >> 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan <ar...@apache.org>님이 작성: >> >>> >Well almost all relational databases you can move data in a >>> transactional way. That’s what transactions are for. >>> >>> It would work, but I suspect in most cases it would involve moving data >>> from temporary tables to the final tables >>> >>> Right now theres no mechanisms to let the individual tasks commit in a >>> two-phase manner (Not sure if the CommitCordinator might help). If such an >>> API is provided, the sources could use it as they wish (e.g. use XA support >>> provided by mysql to implement it in a more efficient way than the driver >>> moving from temp tables to destination tables). >>> >>> Definitely there are complexities involved, but I am not sure if the >>> network partitioning comes into play here since the driver can act as the >>> co-ordinator and can run in HA mode. And regarding the issue that Jungtaek >>> brought up, 2PC doesn't require tasks to be running at the same time, we >>> need a mechanism to take down tasks after they have prepared and bring up >>> the tasks during the commit phase. >>> >>> Most of the sources would not need any of the above and just need a way >>> to support Idempotent writes and like Ryan suggested we can enable this (if >>> there are gaps in the current APIs). >>> >>> >>> On Mon, 10 Sep 2018 at 13:43, Reynold Xin <r...@databricks.com> wrote: >>> >>>> Well almost all relational databases you can move data in a >>>> transactional way. That’s what transactions are for. >>>> >>>> For just straight HDFS, the move is a pretty fast operation so while it >>>> is not completely transactional, the window of potential failure is pretty >>>> short for appends. For writers at the partition level it is fine because it >>>> is just renaming directory, which is atomic. >>>> >>>> On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim <kabh...@gmail.com> wrote: >>>> >>>>> When network partitioning happens it is pretty OK for me to see 2PC >>>>> not working, cause we deal with global transaction. Recovery should be >>>>> hard >>>>> thing to get it correctly though. I completely agree it would require >>>>> massive changes to Spark. >>>>> >>>>> What I couldn't find for underlying storages is moving data from >>>>> staging table to final table in transactional way. I'm not fully sure but >>>>> as I'm aware of, many storages would not support moving data, and even >>>>> HDFS >>>>> sink it is not strictly done in transactional way since we move multiple >>>>> files with multiple operations. If coordinator just crashes it leaves >>>>> partial write, and among writers and coordinator need to deal with >>>>> ensuring >>>>> it will not be going to be duplicated. >>>>> >>>>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to >>>>> implement "commit" (his reply didn't hit dev. mailing list though) but I'm >>>>> not an expert of both twos and I couldn't still imagine it can deal with >>>>> various crash cases. >>>>> >>>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin <r...@databricks.com>님이 작성: >>>>> >>>>>> I don't think two phase commit would work here at all. >>>>>> >>>>>> 1. It'd require massive changes to Spark. >>>>>> >>>>>> 2. Unless the underlying data source can provide an API to coordinate >>>>>> commits (which few data sources I know provide something like that), 2PC >>>>>> wouldn't work in the presence of network partitioning. You can't defy the >>>>>> law of physics. >>>>>> >>>>>> Really the most common and simple way I've seen this working is >>>>>> through staging tables and a final transaction to move data from staging >>>>>> table to final table. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim <kabh...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I guess we all are aware of limitation of contract on DSv2 writer. >>>>>>> Actually it can be achieved only with HDFS sink (or other filesystem >>>>>>> based >>>>>>> sinks) and other external storage are normally not feasible to >>>>>>> implement it >>>>>>> because there's no way to couple a transaction with multiple clients as >>>>>>> well as coordinator can't take over transactions from writers to do the >>>>>>> final commit. >>>>>>> >>>>>>> XA is also not a trivial one to get it correctly with current >>>>>>> execution model: Spark doesn't require writer tasks to run at the same >>>>>>> time >>>>>>> but to achieve 2PC they should run until end of transaction (closing >>>>>>> client >>>>>>> before transaction ends normally means aborting transaction). Spark >>>>>>> should >>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee >>>>>>> completeness of batch. And it might require different integration for >>>>>>> continuous mode. >>>>>>> >>>>>>> Jungtaek Lim (HeartSaVioR) >>>>>>> >>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan <ar...@apache.org>님이 작성: >>>>>>> >>>>>>>> In some cases the implementations may be ok with eventual >>>>>>>> consistency (and does not care if the output is written out atomically) >>>>>>>> >>>>>>>> XA can be one option for datasources that supports it and requires >>>>>>>> atomicity but I am not sure how would one implement it with the current >>>>>>>> API. >>>>>>>> >>>>>>>> May be we need to discuss improvements at the Datasource V2 API >>>>>>>> level (e.g. individual tasks would "prepare" for commit and once the >>>>>>>> driver >>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at >>>>>>>> each >>>>>>>> of the individual tasks). Right now the responsibility of the final >>>>>>>> "commit" is with the driver and it may not always be possible for the >>>>>>>> driver to take over the transactions started by the tasks. >>>>>>>> >>>>>>>> >>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal <dbis...@us.ibm.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This is a pretty big challenge in general for data sources -- for >>>>>>>>> the vast majority of data stores, the boundary of a transaction is per >>>>>>>>> client. That is, you can't have two clients doing writes and >>>>>>>>> coordinating a >>>>>>>>> single transaction. That's certainly the case for almost all >>>>>>>>> relational >>>>>>>>> databases. Spark, on the other hand, will have multiple clients >>>>>>>>> (consider >>>>>>>>> each task a client) writing to the same underlying data store. >>>>>>>>> >>>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for >>>>>>>>> this ? Not sure how easy it is to implement this though :-) >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Dilip Biswal >>>>>>>>> Tel: 408-463-4980 <(408)%20463-4980> >>>>>>>>> dbis...@us.ibm.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> ----- Original message ----- >>>>>>>>> From: Reynold Xin <r...@databricks.com> >>>>>>>>> To: Ryan Blue <rb...@netflix.com> >>>>>>>>> Cc: ross.law...@gmail.com, dev <dev@spark.apache.org> >>>>>>>>> Subject: Re: DataSourceWriter V2 Api questions >>>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM >>>>>>>>> >>>>>>>>> I don't think the problem is just whether we have a starting point >>>>>>>>> for write. As a matter of fact there's always a starting point for >>>>>>>>> write, >>>>>>>>> whether it is explicit or implicit. >>>>>>>>> >>>>>>>>> This is a pretty big challenge in general for data sources -- for >>>>>>>>> the vast majority of data stores, the boundary of a transaction is per >>>>>>>>> client. That is, you can't have two clients doing writes and >>>>>>>>> coordinating a >>>>>>>>> single transaction. That's certainly the case for almost all >>>>>>>>> relational >>>>>>>>> databases. Spark, on the other hand, will have multiple clients >>>>>>>>> (consider >>>>>>>>> each task a client) writing to the same underlying data store. >>>>>>>>> >>>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue <rb...@netflix.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Ross, I think the intent is to create a single transaction on the >>>>>>>>> driver, write as part of it in each task, and then commit the >>>>>>>>> transaction >>>>>>>>> once the tasks complete. Is that possible in your implementation? >>>>>>>>> >>>>>>>>> I think that part of this is made more difficult by not having a >>>>>>>>> clear starting point for a write, which we are fixing in the redesign >>>>>>>>> of >>>>>>>>> the v2 API. That will have a method that creates a Write to track the >>>>>>>>> operation. That can create your transaction when it is created and >>>>>>>>> commit >>>>>>>>> the transaction when commit is called on it. >>>>>>>>> >>>>>>>>> rb >>>>>>>>> >>>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin <r...@databricks.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Typically people do it via transactions, or staging tables. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley <ross.law...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I've been prototyping an implementation of the DataSource V2 >>>>>>>>> writer for the MongoDB Spark Connector and I have a couple of >>>>>>>>> questions >>>>>>>>> about how its intended to be used with database systems. According to >>>>>>>>> the >>>>>>>>> Javadoc for DataWriter.commit(): >>>>>>>>> >>>>>>>>> *"this method should still "hide" the written data and ask the >>>>>>>>> DataSourceWriter at driver side to do the final commit via >>>>>>>>> WriterCommitMessage"* >>>>>>>>> >>>>>>>>> Although, MongoDB now has transactions, it doesn't have a way to >>>>>>>>> "hide" the data once it has been written. So as soon as the >>>>>>>>> DataWriter has >>>>>>>>> committed the data, it has been inserted/updated in the collection >>>>>>>>> and is >>>>>>>>> discoverable - thereby breaking the documented contract. >>>>>>>>> >>>>>>>>> I was wondering how other databases systems plan to implement this >>>>>>>>> API and meet the contract as per the Javadoc? >>>>>>>>> >>>>>>>>> Many thanks >>>>>>>>> >>>>>>>>> Ross >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Ryan Blue >>>>>>>>> Software Engineer >>>>>>>>> Netflix >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>> >>>>>>>>