I'm still not sure how the staging table helps for databases which do not have such atomicity guarantees. For example in Cassandra if you wrote all of the data temporarily to a staging table, we would still have the same problem in moving the data from the staging table into the real table. We would likely have as similar a chance of failing and we still have no way of making the entire staging set simultaneously visible.
On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan <ar...@apache.org> wrote: > >Some being said it is exactly-once when the output is eventually > exactly-once, whereas others being said there should be no side effect, > like consumer shouldn't see partial write. I guess 2PC is former, since > some partitions can commit earlier while other partitions fail to commit > for some time. > Yes its more about guaranteeing atomicity like all partitions eventually > commit or none commits. The visibility of the data for the readers is > orthogonal (e.g setting the isolation levels like serializable for XA) and > in general its difficult to guarantee that data across partitions are > visible at once. The approach like staging table and global commit works in > a centralized set up but can be difficult to do in a distributed manner > across partitions (e.g each partition output goes to a different database) > > On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim <kabh...@gmail.com> wrote: > >> IMHO that's up to how we would like to be strict about "exactly-once". >> >> Some being said it is exactly-once when the output is eventually >> exactly-once, whereas others being said there should be no side effect, >> like consumer shouldn't see partial write. I guess 2PC is former, since >> some partitions can commit earlier while other partitions fail to commit >> for some time. >> >> Being said, there may be couple of alternatives other than the contract >> Spark provides/requires, and I'd like to see how Spark community wants to >> deal with others. Would we want to disallow alternatives, like "replay + >> deduplicate write (per a batch/partition)" which ensures "eventually" >> exactly-once but cannot ensure the contract? >> >> Btw, unless achieving exactly-once is light enough for given sink, I >> think the sink should provide both at-least-once (also optimized for the >> semantic) vs exactly-once, and let end users pick one. >> >> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer <russell.spit...@gmail.com>님이 >> 작성: >> >>> Why is atomic operations a requirement? I feel like doubling the amount >>> of writes (with staging tables) is probably a tradeoff that the end user >>> should make. >>> >>> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan <cloud0...@gmail.com> wrote: >>> >>>> Regardless the API, to use Spark to write data atomically, it requires >>>> 1. Write data distributedly, with a central coordinator at Spark driver. >>>> 2. The distributed writers are not guaranteed to run together at the >>>> same time. (This can be relaxed if we can extend the barrier scheduling >>>> feature) >>>> 3. The new data is visible if and only if all distributed writers >>>> success. >>>> >>>> According to these requirements, I think using a staging table is the >>>> most common way and maybe the only way. I'm not sure how 2PC can help, we >>>> don't want users to read partial data, so we need a final step to commit >>>> all the data together. >>>> >>>> For RDBMS data sources, I think a simple solution is to ask users to >>>> coalesce the input RDD/DataFrame into one partition, then we don't need to >>>> care about multi-client transaction. Or using a staging table like Ryan >>>> described before. >>>> >>>> >>>> >>>> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim <kabh...@gmail.com> wrote: >>>> >>>>> > And regarding the issue that Jungtaek brought up, 2PC doesn't >>>>> require tasks to be running at the same time, we need a mechanism to take >>>>> down tasks after they have prepared and bring up the tasks during the >>>>> commit phase. >>>>> >>>>> I guess we already got into too much details here, but if it is based >>>>> on client transaction Spark must assign "commit" tasks to the executor >>>>> which task was finished "prepare", and if it loses executor it is not >>>>> feasible to force committing. Staging should come into play for that. >>>>> >>>>> We should also have mechanism for "recovery": Spark needs to ensure it >>>>> finalizes "commit" even in case of failures before starting a new batch. >>>>> >>>>> So not an easy thing to integrate correctly. >>>>> >>>>> 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan <ar...@apache.org>님이 작성: >>>>> >>>>>> >Well almost all relational databases you can move data in a >>>>>> transactional way. That’s what transactions are for. >>>>>> >>>>>> It would work, but I suspect in most cases it would involve moving >>>>>> data from temporary tables to the final tables >>>>>> >>>>>> Right now theres no mechanisms to let the individual tasks commit in >>>>>> a two-phase manner (Not sure if the CommitCordinator might help). If such >>>>>> an API is provided, the sources could use it as they wish (e.g. use XA >>>>>> support provided by mysql to implement it in a more efficient way than >>>>>> the >>>>>> driver moving from temp tables to destination tables). >>>>>> >>>>>> Definitely there are complexities involved, but I am not sure if the >>>>>> network partitioning comes into play here since the driver can act as the >>>>>> co-ordinator and can run in HA mode. And regarding the issue that >>>>>> Jungtaek >>>>>> brought up, 2PC doesn't require tasks to be running at the same time, we >>>>>> need a mechanism to take down tasks after they have prepared and bring up >>>>>> the tasks during the commit phase. >>>>>> >>>>>> Most of the sources would not need any of the above and just need a >>>>>> way to support Idempotent writes and like Ryan suggested we can enable >>>>>> this >>>>>> (if there are gaps in the current APIs). >>>>>> >>>>>> >>>>>> On Mon, 10 Sep 2018 at 13:43, Reynold Xin <r...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Well almost all relational databases you can move data in a >>>>>>> transactional way. That’s what transactions are for. >>>>>>> >>>>>>> For just straight HDFS, the move is a pretty fast operation so while >>>>>>> it is not completely transactional, the window of potential failure is >>>>>>> pretty short for appends. For writers at the partition level it is fine >>>>>>> because it is just renaming directory, which is atomic. >>>>>>> >>>>>>> On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim <kabh...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> When network partitioning happens it is pretty OK for me to see 2PC >>>>>>>> not working, cause we deal with global transaction. Recovery should be >>>>>>>> hard >>>>>>>> thing to get it correctly though. I completely agree it would require >>>>>>>> massive changes to Spark. >>>>>>>> >>>>>>>> What I couldn't find for underlying storages is moving data from >>>>>>>> staging table to final table in transactional way. I'm not fully sure >>>>>>>> but >>>>>>>> as I'm aware of, many storages would not support moving data, and even >>>>>>>> HDFS >>>>>>>> sink it is not strictly done in transactional way since we move >>>>>>>> multiple >>>>>>>> files with multiple operations. If coordinator just crashes it leaves >>>>>>>> partial write, and among writers and coordinator need to deal with >>>>>>>> ensuring >>>>>>>> it will not be going to be duplicated. >>>>>>>> >>>>>>>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us >>>>>>>> to implement "commit" (his reply didn't hit dev. mailing list though) >>>>>>>> but >>>>>>>> I'm not an expert of both twos and I couldn't still imagine it can deal >>>>>>>> with various crash cases. >>>>>>>> >>>>>>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin <r...@databricks.com>님이 작성: >>>>>>>> >>>>>>>>> I don't think two phase commit would work here at all. >>>>>>>>> >>>>>>>>> 1. It'd require massive changes to Spark. >>>>>>>>> >>>>>>>>> 2. Unless the underlying data source can provide an API to >>>>>>>>> coordinate commits (which few data sources I know provide something >>>>>>>>> like >>>>>>>>> that), 2PC wouldn't work in the presence of network partitioning. You >>>>>>>>> can't >>>>>>>>> defy the law of physics. >>>>>>>>> >>>>>>>>> Really the most common and simple way I've seen this working is >>>>>>>>> through staging tables and a final transaction to move data from >>>>>>>>> staging >>>>>>>>> table to final table. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim <kabh...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I guess we all are aware of limitation of contract on DSv2 >>>>>>>>>> writer. Actually it can be achieved only with HDFS sink (or other >>>>>>>>>> filesystem based sinks) and other external storage are normally not >>>>>>>>>> feasible to implement it because there's no way to couple a >>>>>>>>>> transaction >>>>>>>>>> with multiple clients as well as coordinator can't take over >>>>>>>>>> transactions >>>>>>>>>> from writers to do the final commit. >>>>>>>>>> >>>>>>>>>> XA is also not a trivial one to get it correctly with current >>>>>>>>>> execution model: Spark doesn't require writer tasks to run at the >>>>>>>>>> same time >>>>>>>>>> but to achieve 2PC they should run until end of transaction (closing >>>>>>>>>> client >>>>>>>>>> before transaction ends normally means aborting transaction). Spark >>>>>>>>>> should >>>>>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee >>>>>>>>>> completeness of batch. And it might require different integration for >>>>>>>>>> continuous mode. >>>>>>>>>> >>>>>>>>>> Jungtaek Lim (HeartSaVioR) >>>>>>>>>> >>>>>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan <ar...@apache.org>님이 작성: >>>>>>>>>> >>>>>>>>>>> In some cases the implementations may be ok with eventual >>>>>>>>>>> consistency (and does not care if the output is written out >>>>>>>>>>> atomically) >>>>>>>>>>> >>>>>>>>>>> XA can be one option for datasources that supports it and >>>>>>>>>>> requires atomicity but I am not sure how would one implement it >>>>>>>>>>> with the >>>>>>>>>>> current API. >>>>>>>>>>> >>>>>>>>>>> May be we need to discuss improvements at the Datasource V2 API >>>>>>>>>>> level (e.g. individual tasks would "prepare" for commit and once >>>>>>>>>>> the driver >>>>>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked >>>>>>>>>>> at each >>>>>>>>>>> of the individual tasks). Right now the responsibility of the final >>>>>>>>>>> "commit" is with the driver and it may not always be possible for >>>>>>>>>>> the >>>>>>>>>>> driver to take over the transactions started by the tasks. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal <dbis...@us.ibm.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> This is a pretty big challenge in general for data sources -- >>>>>>>>>>>> for the vast majority of data stores, the boundary of a >>>>>>>>>>>> transaction is per >>>>>>>>>>>> client. That is, you can't have two clients doing writes and >>>>>>>>>>>> coordinating a >>>>>>>>>>>> single transaction. That's certainly the case for almost all >>>>>>>>>>>> relational >>>>>>>>>>>> databases. Spark, on the other hand, will have multiple clients >>>>>>>>>>>> (consider >>>>>>>>>>>> each task a client) writing to the same underlying data store. >>>>>>>>>>>> >>>>>>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) >>>>>>>>>>>> for this ? Not sure how easy it is to implement this though :-) >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Dilip Biswal >>>>>>>>>>>> Tel: 408-463-4980 <(408)%20463-4980> >>>>>>>>>>>> dbis...@us.ibm.com >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> ----- Original message ----- >>>>>>>>>>>> From: Reynold Xin <r...@databricks.com> >>>>>>>>>>>> To: Ryan Blue <rb...@netflix.com> >>>>>>>>>>>> Cc: ross.law...@gmail.com, dev <dev@spark.apache.org> >>>>>>>>>>>> Subject: Re: DataSourceWriter V2 Api questions >>>>>>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM >>>>>>>>>>>> >>>>>>>>>>>> I don't think the problem is just whether we have a starting >>>>>>>>>>>> point for write. As a matter of fact there's always a starting >>>>>>>>>>>> point for >>>>>>>>>>>> write, whether it is explicit or implicit. >>>>>>>>>>>> >>>>>>>>>>>> This is a pretty big challenge in general for data sources -- >>>>>>>>>>>> for the vast majority of data stores, the boundary of a >>>>>>>>>>>> transaction is per >>>>>>>>>>>> client. That is, you can't have two clients doing writes and >>>>>>>>>>>> coordinating a >>>>>>>>>>>> single transaction. That's certainly the case for almost all >>>>>>>>>>>> relational >>>>>>>>>>>> databases. Spark, on the other hand, will have multiple clients >>>>>>>>>>>> (consider >>>>>>>>>>>> each task a client) writing to the same underlying data store. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue <rb...@netflix.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Ross, I think the intent is to create a single transaction on >>>>>>>>>>>> the driver, write as part of it in each task, and then commit the >>>>>>>>>>>> transaction once the tasks complete. Is that possible in your >>>>>>>>>>>> implementation? >>>>>>>>>>>> >>>>>>>>>>>> I think that part of this is made more difficult by not having >>>>>>>>>>>> a clear starting point for a write, which we are fixing in the >>>>>>>>>>>> redesign of >>>>>>>>>>>> the v2 API. That will have a method that creates a Write to track >>>>>>>>>>>> the >>>>>>>>>>>> operation. That can create your transaction when it is created and >>>>>>>>>>>> commit >>>>>>>>>>>> the transaction when commit is called on it. >>>>>>>>>>>> >>>>>>>>>>>> rb >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin < >>>>>>>>>>>> r...@databricks.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Typically people do it via transactions, or staging tables. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley < >>>>>>>>>>>> ross.law...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi all, >>>>>>>>>>>> >>>>>>>>>>>> I've been prototyping an implementation of the DataSource V2 >>>>>>>>>>>> writer for the MongoDB Spark Connector and I have a couple of >>>>>>>>>>>> questions >>>>>>>>>>>> about how its intended to be used with database systems. According >>>>>>>>>>>> to the >>>>>>>>>>>> Javadoc for DataWriter.commit(): >>>>>>>>>>>> >>>>>>>>>>>> *"this method should still "hide" the written data and ask the >>>>>>>>>>>> DataSourceWriter at driver side to do the final commit via >>>>>>>>>>>> WriterCommitMessage"* >>>>>>>>>>>> >>>>>>>>>>>> Although, MongoDB now has transactions, it doesn't have a way >>>>>>>>>>>> to "hide" the data once it has been written. So as soon as the >>>>>>>>>>>> DataWriter >>>>>>>>>>>> has committed the data, it has been inserted/updated in the >>>>>>>>>>>> collection and >>>>>>>>>>>> is discoverable - thereby breaking the documented contract. >>>>>>>>>>>> >>>>>>>>>>>> I was wondering how other databases systems plan to implement >>>>>>>>>>>> this API and meet the contract as per the Javadoc? >>>>>>>>>>>> >>>>>>>>>>>> Many thanks >>>>>>>>>>>> >>>>>>>>>>>> Ross >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Ryan Blue >>>>>>>>>>>> Software Engineer >>>>>>>>>>>> Netflix >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>>>>> >>>>>>>>>>>