Why is atomic operations a requirement? I feel like doubling the amount of
writes (with staging tables) is probably a tradeoff that the end user
should make.

On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan <cloud0...@gmail.com> wrote:

> Regardless the API, to use Spark to write data atomically, it requires
> 1. Write data distributedly, with a central coordinator at Spark driver.
> 2. The distributed writers are not guaranteed to run together at the same
> time. (This can be relaxed if we can extend the barrier scheduling feature)
> 3. The new data is visible if and only if all distributed writers success.
>
> According to these requirements, I think using a staging table is the most
> common way and maybe the only way. I'm not sure how 2PC can help, we don't
> want users to read partial data, so we need a final step to commit all the
> data together.
>
> For RDBMS data sources, I think a simple solution is to ask users to
> coalesce the input RDD/DataFrame into one partition, then we don't need to
> care about multi-client transaction. Or using a staging table like Ryan
> described before.
>
>
>
> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim <kabh...@gmail.com> wrote:
>
>> > And regarding the issue that Jungtaek brought up, 2PC doesn't require
>> tasks to be running at the same time, we need a mechanism to take down
>> tasks after they have prepared and bring up the tasks during the commit
>> phase.
>>
>> I guess we already got into too much details here, but if it is based on
>> client transaction Spark must assign "commit" tasks to the executor which
>> task was finished "prepare", and if it loses executor it is not feasible to
>> force committing. Staging should come into play for that.
>>
>> We should also have mechanism for "recovery": Spark needs to ensure it
>> finalizes "commit" even in case of failures before starting a new batch.
>>
>> So not an easy thing to integrate correctly.
>>
>> 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan <ar...@apache.org>님이 작성:
>>
>>> >Well almost all relational databases you can move data in a
>>> transactional way. That’s what transactions are for.
>>>
>>> It would work, but I suspect in most cases it would involve moving data
>>> from temporary tables to the final tables
>>>
>>> Right now theres no mechanisms to let the individual tasks commit in a
>>> two-phase manner (Not sure if the CommitCordinator might help). If such an
>>> API is provided, the sources could use it as they wish (e.g. use XA support
>>> provided by mysql to implement it in a more efficient way than the driver
>>> moving from temp tables to destination tables).
>>>
>>> Definitely there are complexities involved, but I am not sure if the
>>> network partitioning comes into play here since the driver can act as the
>>> co-ordinator and can run in HA mode. And regarding the issue that Jungtaek
>>> brought up, 2PC doesn't require tasks to be running at the same time, we
>>> need a mechanism to take down tasks after they have prepared and bring up
>>> the tasks during the commit phase.
>>>
>>> Most of the sources would not need any of the above and just need a way
>>> to support Idempotent writes and like Ryan suggested we can enable this (if
>>> there are gaps in the current APIs).
>>>
>>>
>>> On Mon, 10 Sep 2018 at 13:43, Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> Well almost all relational databases you can move data in a
>>>> transactional way. That’s what transactions are for.
>>>>
>>>> For just straight HDFS, the move is a pretty fast operation so while it
>>>> is not completely transactional, the window of potential failure is pretty
>>>> short for appends. For writers at the partition level it is fine because it
>>>> is just renaming directory, which is atomic.
>>>>
>>>> On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim <kabh...@gmail.com> wrote:
>>>>
>>>>> When network partitioning happens it is pretty OK for me to see 2PC
>>>>> not working, cause we deal with global transaction. Recovery should be 
>>>>> hard
>>>>> thing to get it correctly though. I completely agree it would require
>>>>> massive changes to Spark.
>>>>>
>>>>> What I couldn't find for underlying storages is moving data from
>>>>> staging table to final table in transactional way. I'm not fully sure but
>>>>> as I'm aware of, many storages would not support moving data, and even 
>>>>> HDFS
>>>>> sink it is not strictly done in transactional way since we move multiple
>>>>> files with multiple operations. If coordinator just crashes it leaves
>>>>> partial write, and among writers and coordinator need to deal with 
>>>>> ensuring
>>>>> it will not be going to be duplicated.
>>>>>
>>>>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to
>>>>> implement "commit" (his reply didn't hit dev. mailing list though) but I'm
>>>>> not an expert of both twos and I couldn't still imagine it can deal with
>>>>> various crash cases.
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin <r...@databricks.com>님이 작성:
>>>>>
>>>>>> I don't think two phase commit would work here at all.
>>>>>>
>>>>>> 1. It'd require massive changes to Spark.
>>>>>>
>>>>>> 2. Unless the underlying data source can provide an API to coordinate
>>>>>> commits (which few data sources I know provide something like that), 2PC
>>>>>> wouldn't work in the presence of network partitioning. You can't defy the
>>>>>> law of physics.
>>>>>>
>>>>>> Really the most common and simple way I've seen this working is
>>>>>> through staging tables and a final transaction to move data from staging
>>>>>> table to final table.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim <kabh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>>>>>> Actually it can be achieved only with HDFS sink (or other filesystem 
>>>>>>> based
>>>>>>> sinks) and other external storage are normally not feasible to 
>>>>>>> implement it
>>>>>>> because there's no way to couple a transaction with multiple clients as
>>>>>>> well as coordinator can't take over transactions from writers to do the
>>>>>>> final commit.
>>>>>>>
>>>>>>> XA is also not a trivial one to get it correctly with current
>>>>>>> execution model: Spark doesn't require writer tasks to run at the same 
>>>>>>> time
>>>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>>>> client
>>>>>>> before transaction ends normally means aborting transaction). Spark 
>>>>>>> should
>>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>>>> completeness of batch. And it might require different integration for
>>>>>>> continuous mode.
>>>>>>>
>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan <ar...@apache.org>님이 작성:
>>>>>>>
>>>>>>>> In some cases the implementations may be ok with eventual
>>>>>>>> consistency (and does not care if the output is written out atomically)
>>>>>>>>
>>>>>>>> XA can be one option for datasources that supports it and requires
>>>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>>>> API.
>>>>>>>>
>>>>>>>> May be we need to discuss improvements at the Datasource V2 API
>>>>>>>> level (e.g. individual tasks would "prepare" for commit and once the 
>>>>>>>> driver
>>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>>>>> each
>>>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>>>> driver to take over the transactions started by the tasks.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal <dbis...@us.ibm.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>>> coordinating a
>>>>>>>>> single transaction. That's certainly the case for almost all 
>>>>>>>>> relational
>>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>>> (consider
>>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>>
>>>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Dilip Biswal
>>>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>>>> dbis...@us.ibm.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ----- Original message -----
>>>>>>>>> From: Reynold Xin <r...@databricks.com>
>>>>>>>>> To: Ryan Blue <rb...@netflix.com>
>>>>>>>>> Cc: ross.law...@gmail.com, dev <dev@spark.apache.org>
>>>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>>>
>>>>>>>>> I don't think the problem is just whether we have a starting point
>>>>>>>>> for write. As a matter of fact there's always a starting point for 
>>>>>>>>> write,
>>>>>>>>> whether it is explicit or implicit.
>>>>>>>>>
>>>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>>> coordinating a
>>>>>>>>> single transaction. That's certainly the case for almost all 
>>>>>>>>> relational
>>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>>> (consider
>>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>>
>>>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue <rb...@netflix.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Ross, I think the intent is to create a single transaction on the
>>>>>>>>> driver, write as part of it in each task, and then commit the 
>>>>>>>>> transaction
>>>>>>>>> once the tasks complete. Is that possible in your implementation?
>>>>>>>>>
>>>>>>>>> I think that part of this is made more difficult by not having a
>>>>>>>>> clear starting point for a write, which we are fixing in the redesign 
>>>>>>>>> of
>>>>>>>>> the v2 API. That will have a method that creates a Write to track the
>>>>>>>>> operation. That can create your transaction when it is created and 
>>>>>>>>> commit
>>>>>>>>> the transaction when commit is called on it.
>>>>>>>>>
>>>>>>>>> rb
>>>>>>>>>
>>>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin <r...@databricks.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley <ross.law...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I've been prototyping an implementation of the DataSource V2
>>>>>>>>> writer for the MongoDB Spark Connector and I have a couple of 
>>>>>>>>> questions
>>>>>>>>> about how its intended to be used with database systems. According to 
>>>>>>>>> the
>>>>>>>>> Javadoc for DataWriter.commit():
>>>>>>>>>
>>>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>>>> WriterCommitMessage"*
>>>>>>>>>
>>>>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>>>>> "hide" the data once it has been written. So as soon as the 
>>>>>>>>> DataWriter has
>>>>>>>>> committed the data, it has been inserted/updated in the collection 
>>>>>>>>> and is
>>>>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>>>>
>>>>>>>>> I was wondering how other databases systems plan to implement this
>>>>>>>>> API and meet the contract as per the Javadoc?
>>>>>>>>>
>>>>>>>>> Many thanks
>>>>>>>>>
>>>>>>>>> Ross
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>
>>>>>>>>

Reply via email to