Re: DataSourceV2 : Transactional Write support

2019-08-05 Thread Reynold Xin
We can also just write using one partition, which will be sufficient for
most use cases.

On Mon, Aug 5, 2019 at 7:48 PM Matt Cheah  wrote:

> There might be some help from the staging table catalog as well.
>
>
>
> -Matt Cheah
>
>
>
> *From: *Wenchen Fan 
> *Date: *Monday, August 5, 2019 at 7:40 PM
> *To: *Shiv Prashant Sood 
> *Cc: *Ryan Blue , Jungtaek Lim ,
> Spark Dev List 
> *Subject: *Re: DataSourceV2 : Transactional Write support
>
>
>
> I agree with the temp table approach. One idea is: maybe we only need one
> temp table, and each task writes to this temp table. At the end we read the
> data from the temp table and write it to the target table. AFAIK JDBC can
> handle concurrent table writing very well, and it's better than creating
> thousands of temp tables for one write job(assume the input RDD has
> thousands of partitions).
>
>
>
> On Tue, Aug 6, 2019 at 7:57 AM Shiv Prashant Sood 
> wrote:
>
> Thanks all for the clarification.
>
>
>
> Regards,
>
> Shiv
>
>
>
> On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue 
> wrote:
>
> > What you could try instead is intermediate output: inserting into
> temporal table in executors, and move inserted records to the final table
> in driver (must be atomic)
>
>
>
> I think that this is the approach that other systems (maybe sqoop?) have
> taken. Insert into independent temporary tables, which can be done quickly.
> Then for the final commit operation, union and insert into the final table.
> In a lot of cases, JDBC databases can do that quickly as well because the
> data is already on disk and just needs to added to the final table.
>
>
>
> On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim  wrote:
>
> I asked similar question for end-to-end exactly-once with Kafka, and
> you're correct distributed transaction is not supported. Introducing
> distributed transaction like "two-phase commit" requires huge change on
> Spark codebase and the feedback was not positive.
>
>
>
> What you could try instead is intermediate output: inserting into temporal
> table in executors, and move inserted records to the final table in driver
> (must be atomic).
>
>
>
> Thanks,
>
> Jungtaek Lim (HeartSaVioR)
>
>
>
> On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood 
> wrote:
>
> All,
>
>
>
> I understood that DataSourceV2 supports Transactional write and wanted to
> implement that in JDBC DataSource V2 connector ( PR#25211 [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_pull_25211&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=TbEe1ZaDcTvpbtvLL8JF8oMfnmLwFfX_lCKflKkw_NE&s=IjVbWiTrtxbe2vH_RTNmPViR109BVJereAcDYTTGY-s&e=>
> ).
>
>
>
> Don't see how this is feasible for JDBC based connector.  The FW suggest
> that EXECUTOR send a commit message  to DRIVER, and actual commit should
> only be done by DRIVER after receiving all commit confirmations. This will
> not work for JDBC  as commits have to happen on the JDBC Connection which
> is maintained by the EXECUTORS and JDBCConnection  is not serializable that
> it can be sent to the DRIVER.
>
>
>
> Am i right in thinking that this cannot be supported for JDBC? My goal is
> to either fully write or roll back the dataframe write operation.
>
>
>
> Thanks in advance for your help.
>
>
>
> Regards,
>
> Shiv
>
>
>
>
> --
>
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior [medium.com]
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__medium.com_-40heartsavior&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=TbEe1ZaDcTvpbtvLL8JF8oMfnmLwFfX_lCKflKkw_NE&s=eTZBbR8x_ME0IsQJ85CZoF5BYpfWfFqLEJGAPzgBcL4&e=>
> Twitter : http://twitter.com/heartsavior [twitter.com]
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__twitter.com_heartsavior&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=TbEe1ZaDcTvpbtvLL8JF8oMfnmLwFfX_lCKflKkw_NE&s=nF6NaI0uobt-rPMezL7i7qFQZwqVDc8LYQRb35y3Tx4&e=>
>
> LinkedIn : http://www.linkedin.com/in/heartsavior [linkedin.com]
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.linkedin.com_in_heartsavior&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=TbEe1ZaDcTvpbtvLL8JF8oMfnmLwFfX_lCKflKkw_NE&s=bYjYbDYS7MMmqxzjjBA96OtJ0mtoKFFCUp4Pu2krJgU&e=>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>


Re: DataSourceV2 : Transactional Write support

2019-08-05 Thread Matt Cheah
There might be some help from the staging table catalog as well.

 

-Matt Cheah

 

From: Wenchen Fan 
Date: Monday, August 5, 2019 at 7:40 PM
To: Shiv Prashant Sood 
Cc: Ryan Blue , Jungtaek Lim , Spark Dev 
List 
Subject: Re: DataSourceV2 : Transactional Write support

 

I agree with the temp table approach. One idea is: maybe we only need one temp 
table, and each task writes to this temp table. At the end we read the data 
from the temp table and write it to the target table. AFAIK JDBC can handle 
concurrent table writing very well, and it's better than creating thousands of 
temp tables for one write job(assume the input RDD has thousands of partitions).

 

On Tue, Aug 6, 2019 at 7:57 AM Shiv Prashant Sood  
wrote:

Thanks all for the clarification.

 

Regards,

Shiv

 

On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue  wrote:

> What you could try instead is intermediate output: inserting into temporal 
> table in executors, and move inserted records to the final table in driver 
> (must be atomic) 

 

I think that this is the approach that other systems (maybe sqoop?) have taken. 
Insert into independent temporary tables, which can be done quickly. Then for 
the final commit operation, union and insert into the final table. In a lot of 
cases, JDBC databases can do that quickly as well because the data is already 
on disk and just needs to added to the final table.

 

On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim  wrote:

I asked similar question for end-to-end exactly-once with Kafka, and you're 
correct distributed transaction is not supported. Introducing distributed 
transaction like "two-phase commit" requires huge change on Spark codebase and 
the feedback was not positive. 

 

What you could try instead is intermediate output: inserting into temporal 
table in executors, and move inserted records to the final table in driver 
(must be atomic).

 

Thanks,

Jungtaek Lim (HeartSaVioR)

 

On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood  
wrote:

All,

 

I understood that DataSourceV2 supports Transactional write and wanted to  
implement that in JDBC DataSource V2 connector ( PR#25211 [github.com] ). 

 

Don't see how this is feasible for JDBC based connector.  The FW suggest that 
EXECUTOR send a commit message  to DRIVER, and actual commit should only be 
done by DRIVER after receiving all commit confirmations. This will not work for 
JDBC  as commits have to happen on the JDBC Connection which is maintained by 
the EXECUTORS and JDBCConnection  is not serializable that it can be sent to 
the DRIVER. 

 

Am i right in thinking that this cannot be supported for JDBC? My goal is to 
either fully write or roll back the dataframe write operation.

 

Thanks in advance for your help.

 

Regards, 

Shiv


 

-- 

Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior [medium.com]
Twitter : http://twitter.com/heartsavior [twitter.com] 

LinkedIn : http://www.linkedin.com/in/heartsavior [linkedin.com]


 

-- 

Ryan Blue 

Software Engineer

Netflix



smime.p7s
Description: S/MIME cryptographic signature


Re: DataSourceV2 : Transactional Write support

2019-08-05 Thread Wenchen Fan
I agree with the temp table approach. One idea is: maybe we only need one
temp table, and each task writes to this temp table. At the end we read the
data from the temp table and write it to the target table. AFAIK JDBC can
handle concurrent table writing very well, and it's better than creating
thousands of temp tables for one write job(assume the input RDD has
thousands of partitions).

On Tue, Aug 6, 2019 at 7:57 AM Shiv Prashant Sood 
wrote:

> Thanks all for the clarification.
>
> Regards,
> Shiv
>
> On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue 
> wrote:
>
>> > What you could try instead is intermediate output: inserting into
>> temporal table in executors, and move inserted records to the final table
>> in driver (must be atomic)
>>
>> I think that this is the approach that other systems (maybe sqoop?) have
>> taken. Insert into independent temporary tables, which can be done quickly.
>> Then for the final commit operation, union and insert into the final table.
>> In a lot of cases, JDBC databases can do that quickly as well because the
>> data is already on disk and just needs to added to the final table.
>>
>> On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim  wrote:
>>
>>> I asked similar question for end-to-end exactly-once with Kafka, and
>>> you're correct distributed transaction is not supported. Introducing
>>> distributed transaction like "two-phase commit" requires huge change on
>>> Spark codebase and the feedback was not positive.
>>>
>>> What you could try instead is intermediate output: inserting into
>>> temporal table in executors, and move inserted records to the final table
>>> in driver (must be atomic).
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood <
>>> shivprash...@gmail.com> wrote:
>>>
 All,

 I understood that DataSourceV2 supports Transactional write and wanted
 to  implement that in JDBC DataSource V2 connector ( PR#25211
  ).

 Don't see how this is feasible for JDBC based connector.  The FW
 suggest that EXECUTOR send a commit message  to DRIVER, and actual
 commit should only be done by DRIVER after receiving all commit
 confirmations. This will not work for JDBC  as commits have to happen on
 the JDBC Connection which is maintained by the EXECUTORS and
 JDBCConnection  is not serializable that it can be sent to the DRIVER.

 Am i right in thinking that this cannot be supported for JDBC? My goal
 is to either fully write or roll back the dataframe write operation.

 Thanks in advance for your help.

 Regards,
 Shiv

>>>
>>>
>>> --
>>> Name : Jungtaek Lim
>>> Blog : http://medium.com/@heartsavior
>>> Twitter : http://twitter.com/heartsavior
>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: DataSourceV2 : Transactional Write support

2019-08-05 Thread Shiv Prashant Sood
Thanks all for the clarification.

Regards,
Shiv

On Sat, Aug 3, 2019 at 12:49 PM Ryan Blue  wrote:

> > What you could try instead is intermediate output: inserting into
> temporal table in executors, and move inserted records to the final table
> in driver (must be atomic)
>
> I think that this is the approach that other systems (maybe sqoop?) have
> taken. Insert into independent temporary tables, which can be done quickly.
> Then for the final commit operation, union and insert into the final table.
> In a lot of cases, JDBC databases can do that quickly as well because the
> data is already on disk and just needs to added to the final table.
>
> On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim  wrote:
>
>> I asked similar question for end-to-end exactly-once with Kafka, and
>> you're correct distributed transaction is not supported. Introducing
>> distributed transaction like "two-phase commit" requires huge change on
>> Spark codebase and the feedback was not positive.
>>
>> What you could try instead is intermediate output: inserting into
>> temporal table in executors, and move inserted records to the final table
>> in driver (must be atomic).
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood 
>> wrote:
>>
>>> All,
>>>
>>> I understood that DataSourceV2 supports Transactional write and wanted
>>> to  implement that in JDBC DataSource V2 connector ( PR#25211
>>>  ).
>>>
>>> Don't see how this is feasible for JDBC based connector.  The FW suggest
>>> that EXECUTOR send a commit message  to DRIVER, and actual commit
>>> should only be done by DRIVER after receiving all commit confirmations.
>>> This will not work for JDBC  as commits have to happen on the JDBC
>>> Connection which is maintained by the EXECUTORS and JDBCConnection  is not
>>> serializable that it can be sent to the DRIVER.
>>>
>>> Am i right in thinking that this cannot be supported for JDBC? My goal
>>> is to either fully write or roll back the dataframe write operation.
>>>
>>> Thanks in advance for your help.
>>>
>>> Regards,
>>> Shiv
>>>
>>
>>
>> --
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: DataSourceV2 : Transactional Write support

2019-08-03 Thread Ryan Blue
> What you could try instead is intermediate output: inserting into
temporal table in executors, and move inserted records to the final table
in driver (must be atomic)

I think that this is the approach that other systems (maybe sqoop?) have
taken. Insert into independent temporary tables, which can be done quickly.
Then for the final commit operation, union and insert into the final table.
In a lot of cases, JDBC databases can do that quickly as well because the
data is already on disk and just needs to added to the final table.

On Fri, Aug 2, 2019 at 7:25 PM Jungtaek Lim  wrote:

> I asked similar question for end-to-end exactly-once with Kafka, and
> you're correct distributed transaction is not supported. Introducing
> distributed transaction like "two-phase commit" requires huge change on
> Spark codebase and the feedback was not positive.
>
> What you could try instead is intermediate output: inserting into temporal
> table in executors, and move inserted records to the final table in driver
> (must be atomic).
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood 
> wrote:
>
>> All,
>>
>> I understood that DataSourceV2 supports Transactional write and wanted
>> to  implement that in JDBC DataSource V2 connector ( PR#25211
>>  ).
>>
>> Don't see how this is feasible for JDBC based connector.  The FW suggest
>> that EXECUTOR send a commit message  to DRIVER, and actual commit should
>> only be done by DRIVER after receiving all commit confirmations. This will
>> not work for JDBC  as commits have to happen on the JDBC Connection which
>> is maintained by the EXECUTORS and JDBCConnection  is not serializable that
>> it can be sent to the DRIVER.
>>
>> Am i right in thinking that this cannot be supported for JDBC? My goal is
>> to either fully write or roll back the dataframe write operation.
>>
>> Thanks in advance for your help.
>>
>> Regards,
>> Shiv
>>
>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 : Transactional Write support

2019-08-02 Thread Jungtaek Lim
I asked similar question for end-to-end exactly-once with Kafka, and you're
correct distributed transaction is not supported. Introducing distributed
transaction like "two-phase commit" requires huge change on Spark codebase
and the feedback was not positive.

What you could try instead is intermediate output: inserting into temporal
table in executors, and move inserted records to the final table in driver
(must be atomic).

Thanks,
Jungtaek Lim (HeartSaVioR)

On Sat, Aug 3, 2019 at 4:56 AM Shiv Prashant Sood 
wrote:

> All,
>
> I understood that DataSourceV2 supports Transactional write and wanted to
> implement that in JDBC DataSource V2 connector ( PR#25211
>  ).
>
> Don't see how this is feasible for JDBC based connector.  The FW suggest
> that EXECUTOR send a commit message  to DRIVER, and actual commit should
> only be done by DRIVER after receiving all commit confirmations. This will
> not work for JDBC  as commits have to happen on the JDBC Connection which
> is maintained by the EXECUTORS and JDBCConnection  is not serializable that
> it can be sent to the DRIVER.
>
> Am i right in thinking that this cannot be supported for JDBC? My goal is
> to either fully write or roll back the dataframe write operation.
>
> Thanks in advance for your help.
>
> Regards,
> Shiv
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: DataSourceV2 : Transactional Write support

2019-08-02 Thread Matt Cheah
Can we check that the latest staging APIs work for the JDBC use case in a 
single transactional write? See 
https://github.com/apache/spark/pull/24798/files#diff-c9d2f9c9d20452939b7c28ebdae0503dR53

 

But also acknowledge that transactions from a more traditional RDBMS sense tend 
to have pretty specific semantics we don’t support in the V2 API. For example, 
one cannot commit multiple write operations in a single transaction right now. 
That would require changes to the DDL and a pretty substantial change to the 
design of Spark-SQL more broadly.

 

-Matt Cheah

 

From: Shiv Prashant Sood 
Date: Friday, August 2, 2019 at 12:56 PM
To: Spark Dev List 
Subject: DataSourceV2 : Transactional Write support

 

All,

 

I understood that DataSourceV2 supports Transactional write and wanted to  
implement that in JDBC DataSource V2 connector ( PR#25211 [github.com] ). 

 

Don't see how this is feasible for JDBC based connector.  The FW suggest that 
EXECUTOR send a commit message  to DRIVER, and actual commit should only be 
done by DRIVER after receiving all commit confirmations. This will not work for 
JDBC  as commits have to happen on the JDBC Connection which is maintained by 
the EXECUTORS and JDBCConnection  is not serializable that it can be sent to 
the DRIVER. 

 

Am i right in thinking that this cannot be supported for JDBC? My goal is to 
either fully write or roll back the dataframe write operation.

 

Thanks in advance for your help.

 

Regards, 

Shiv



smime.p7s
Description: S/MIME cryptographic signature


DataSourceV2 : Transactional Write support

2019-08-02 Thread Shiv Prashant Sood
 All,

I understood that DataSourceV2 supports Transactional write and wanted to
implement that in JDBC DataSource V2 connector ( PR#25211
 ).

Don't see how this is feasible for JDBC based connector.  The FW suggest
that EXECUTOR send a commit message  to DRIVER, and actual commit should
only be done by DRIVER after receiving all commit confirmations. This will
not work for JDBC  as commits have to happen on the JDBC Connection which
is maintained by the EXECUTORS and JDBCConnection  is not serializable that
it can be sent to the DRIVER.

Am i right in thinking that this cannot be supported for JDBC? My goal is
to either fully write or roll back the dataframe write operation.

Thanks in advance for your help.

Regards,
Shiv