Re: Spark File Output Committer algorithm for GCS

2023-07-21 Thread Mich Talebzadeh
this link might help

https://stackoverflow.com/questions/46929351/spark-reading-orc-file-in-driver-not-in-executors

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 21 Jul 2023 at 08:37, Dipayan Dev  wrote:

> I used the following config and the performance has improved a lot.
> .config("spark.sql.orc.splits.include.file.footer", true)
>
> I am not able to find the default value of this config anywhere? Can
> someone please share what's the default config of this- is it false?
> Also just curious what this actually does?
>
>
> With Best Regards,
>
> Dipayan Dev
>
>
> On Wed, Jul 19, 2023 at 2:25 PM Dipayan Dev 
> wrote:
>
>> Thank you. Will try out these options.
>>
>>
>>
>> With Best Regards,
>>
>>
>>
>> On Wed, Jul 19, 2023 at 1:40 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Sounds like if the mv command is inherently slow, there is little that
>>> can be done.
>>>
>>> The only suggestion I can make is to create the staging table as
>>> compressed to reduce its size and hence mv? Is that feasible? Also the
>>> managed table can be created with SNAPPY compression
>>>
>>> STORED AS ORC
>>> TBLPROPERTIES (
>>> "orc.create.index"="true",
>>> "orc.bloom.filter.columns"="KEY",
>>> "orc.bloom.filter.fpp"="0.05",
>>> "*orc.compress"="SNAPPY",*
>>> "orc.stripe.size"="16777216",
>>> "orc.row.index.stride"="1" )
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 19 Jul 2023 at 02:35, Dipayan Dev 
>>> wrote:
>>>
 Hi Mich,
 Ok, my use-case is a bit different.
 I have a Hive table partitioned by dates and need to do dynamic
 partition updates(insert overwrite) daily for the last 30 days
 (partitions).
 The ETL inside the staging directories is completed in hardly 5minutes,
 but then renaming takes a lot of time as it deletes and copies the
 partitions.
 My issue is something related to this -
 https://groups.google.com/g/cloud-dataproc-discuss/c/neMyhytlfyg?pli=1



 With Best Regards,

 Dipayan Dev



 On Wed, Jul 19, 2023 at 12:06 AM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Spark has no role in creating that hive staging directory. That
> directory belongs to Hive and Spark simply does ETL there, loading to the
> Hive managed table in your case which ends up in saging directory
>
> I suggest that you review your design and use an external hive table
> with explicit location on GCS with the date the data loaded. Then push 
> that
> data into the Hive managed table for today's partition.
>
> This was written in bash for Hive HQL itself but you can easily adapt
> it for Spark
>
> TODAY="`date +%Y-%m-%d`"
> DateStamp="${TODAY}"
> CREATE EXTERNAL TABLE IF NOT EXISTS EXTERNALMARKETDATA (
>  KEY string
>, TICKER string
>, TIMECREATED string
>, PRICE float
> )
> COMMENT 'From prices using Kafka delivered by Flume location by day'
> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
> STORED AS TEXTFILE
> LOCATION 'gs://etcbucket/cloud_data_fusion/hive.../';
>
> --Keep track of daily ingestion into the external table.
> ALTER TABLE EXTERNALMARKETDATA set location
> 'gs://etcbucket/cloud_data_fusion/hive.../${TODAY}';
>
> -- create your managed table here and populate it from the Hive
> external table
> CREATE TABLE IF NOT EXISTS MARKETDATA (
>  KEY string
>, TICKER string
>, TIMECREATED string
>, PRICE float
>, op_type int
>, op_time timestamp
> )
> PARTITIONED BY (DateStamp  string)
> STORED AS ORC
> TBLPROPERTIES (
> "orc.create.index"="true",
> "orc.bloom.filter.columns"="KEY",
> 

Re: Spark File Output Committer algorithm for GCS

2023-07-21 Thread Dipayan Dev
I used the following config and the performance has improved a lot.
.config("spark.sql.orc.splits.include.file.footer", true)

I am not able to find the default value of this config anywhere? Can
someone please share what's the default config of this- is it false?
Also just curious what this actually does?


With Best Regards,

Dipayan Dev


On Wed, Jul 19, 2023 at 2:25 PM Dipayan Dev  wrote:

> Thank you. Will try out these options.
>
>
>
> With Best Regards,
>
>
>
> On Wed, Jul 19, 2023 at 1:40 PM Mich Talebzadeh 
> wrote:
>
>> Sounds like if the mv command is inherently slow, there is little that
>> can be done.
>>
>> The only suggestion I can make is to create the staging table as
>> compressed to reduce its size and hence mv? Is that feasible? Also the
>> managed table can be created with SNAPPY compression
>>
>> STORED AS ORC
>> TBLPROPERTIES (
>> "orc.create.index"="true",
>> "orc.bloom.filter.columns"="KEY",
>> "orc.bloom.filter.fpp"="0.05",
>> "*orc.compress"="SNAPPY",*
>> "orc.stripe.size"="16777216",
>> "orc.row.index.stride"="1" )
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 19 Jul 2023 at 02:35, Dipayan Dev 
>> wrote:
>>
>>> Hi Mich,
>>> Ok, my use-case is a bit different.
>>> I have a Hive table partitioned by dates and need to do dynamic
>>> partition updates(insert overwrite) daily for the last 30 days
>>> (partitions).
>>> The ETL inside the staging directories is completed in hardly 5minutes,
>>> but then renaming takes a lot of time as it deletes and copies the
>>> partitions.
>>> My issue is something related to this -
>>> https://groups.google.com/g/cloud-dataproc-discuss/c/neMyhytlfyg?pli=1
>>>
>>>
>>>
>>> With Best Regards,
>>>
>>> Dipayan Dev
>>>
>>>
>>>
>>> On Wed, Jul 19, 2023 at 12:06 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Spark has no role in creating that hive staging directory. That
 directory belongs to Hive and Spark simply does ETL there, loading to the
 Hive managed table in your case which ends up in saging directory

 I suggest that you review your design and use an external hive table
 with explicit location on GCS with the date the data loaded. Then push that
 data into the Hive managed table for today's partition.

 This was written in bash for Hive HQL itself but you can easily adapt
 it for Spark

 TODAY="`date +%Y-%m-%d`"
 DateStamp="${TODAY}"
 CREATE EXTERNAL TABLE IF NOT EXISTS EXTERNALMARKETDATA (
  KEY string
, TICKER string
, TIMECREATED string
, PRICE float
 )
 COMMENT 'From prices using Kafka delivered by Flume location by day'
 ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
 STORED AS TEXTFILE
 LOCATION 'gs://etcbucket/cloud_data_fusion/hive.../';

 --Keep track of daily ingestion into the external table.
 ALTER TABLE EXTERNALMARKETDATA set location
 'gs://etcbucket/cloud_data_fusion/hive.../${TODAY}';

 -- create your managed table here and populate it from the Hive
 external table
 CREATE TABLE IF NOT EXISTS MARKETDATA (
  KEY string
, TICKER string
, TIMECREATED string
, PRICE float
, op_type int
, op_time timestamp
 )
 PARTITIONED BY (DateStamp  string)
 STORED AS ORC
 TBLPROPERTIES (
 "orc.create.index"="true",
 "orc.bloom.filter.columns"="KEY",
 "orc.bloom.filter.fpp"="0.05",
 "orc.compress"="SNAPPY",
 "orc.stripe.size"="16777216",
 "orc.row.index.stride"="1" )
 ;

 --Populate target table
 INSERT OVERWRITE TABLE MARKETDATA PARTITION (DateStamp = "${TODAY}")
 SELECT
   KEY
 , TICKER
 , TIMECREATED
 , PRICE
 , 1
 , CAST(from_unixtime(unix_timestamp()) AS timestamp)
 FROM EXTERNALMARKETDATA;

 ANALYZE TABLE MARKETDATA PARTITION (DateStamp) COMPUTE STATISTICS;

 HTH

 Mich Talebzadeh,
 Solutions Architect/Engineering Lead
 Palantir Technologies Limited
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction 

Re: Spark File Output Committer algorithm for GCS

2023-07-19 Thread Dipayan Dev
Thank you. Will try out these options.



With Best Regards,



On Wed, Jul 19, 2023 at 1:40 PM Mich Talebzadeh 
wrote:

> Sounds like if the mv command is inherently slow, there is little that can
> be done.
>
> The only suggestion I can make is to create the staging table as
> compressed to reduce its size and hence mv? Is that feasible? Also the
> managed table can be created with SNAPPY compression
>
> STORED AS ORC
> TBLPROPERTIES (
> "orc.create.index"="true",
> "orc.bloom.filter.columns"="KEY",
> "orc.bloom.filter.fpp"="0.05",
> "*orc.compress"="SNAPPY",*
> "orc.stripe.size"="16777216",
> "orc.row.index.stride"="1" )
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 19 Jul 2023 at 02:35, Dipayan Dev  wrote:
>
>> Hi Mich,
>> Ok, my use-case is a bit different.
>> I have a Hive table partitioned by dates and need to do dynamic partition
>> updates(insert overwrite) daily for the last 30 days (partitions).
>> The ETL inside the staging directories is completed in hardly 5minutes,
>> but then renaming takes a lot of time as it deletes and copies the
>> partitions.
>> My issue is something related to this -
>> https://groups.google.com/g/cloud-dataproc-discuss/c/neMyhytlfyg?pli=1
>>
>>
>>
>> With Best Regards,
>>
>> Dipayan Dev
>>
>>
>>
>> On Wed, Jul 19, 2023 at 12:06 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Spark has no role in creating that hive staging directory. That
>>> directory belongs to Hive and Spark simply does ETL there, loading to the
>>> Hive managed table in your case which ends up in saging directory
>>>
>>> I suggest that you review your design and use an external hive table
>>> with explicit location on GCS with the date the data loaded. Then push that
>>> data into the Hive managed table for today's partition.
>>>
>>> This was written in bash for Hive HQL itself but you can easily adapt it
>>> for Spark
>>>
>>> TODAY="`date +%Y-%m-%d`"
>>> DateStamp="${TODAY}"
>>> CREATE EXTERNAL TABLE IF NOT EXISTS EXTERNALMARKETDATA (
>>>  KEY string
>>>, TICKER string
>>>, TIMECREATED string
>>>, PRICE float
>>> )
>>> COMMENT 'From prices using Kafka delivered by Flume location by day'
>>> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
>>> STORED AS TEXTFILE
>>> LOCATION 'gs://etcbucket/cloud_data_fusion/hive.../';
>>>
>>> --Keep track of daily ingestion into the external table.
>>> ALTER TABLE EXTERNALMARKETDATA set location
>>> 'gs://etcbucket/cloud_data_fusion/hive.../${TODAY}';
>>>
>>> -- create your managed table here and populate it from the Hive external
>>> table
>>> CREATE TABLE IF NOT EXISTS MARKETDATA (
>>>  KEY string
>>>, TICKER string
>>>, TIMECREATED string
>>>, PRICE float
>>>, op_type int
>>>, op_time timestamp
>>> )
>>> PARTITIONED BY (DateStamp  string)
>>> STORED AS ORC
>>> TBLPROPERTIES (
>>> "orc.create.index"="true",
>>> "orc.bloom.filter.columns"="KEY",
>>> "orc.bloom.filter.fpp"="0.05",
>>> "orc.compress"="SNAPPY",
>>> "orc.stripe.size"="16777216",
>>> "orc.row.index.stride"="1" )
>>> ;
>>>
>>> --Populate target table
>>> INSERT OVERWRITE TABLE MARKETDATA PARTITION (DateStamp = "${TODAY}")
>>> SELECT
>>>   KEY
>>> , TICKER
>>> , TIMECREATED
>>> , PRICE
>>> , 1
>>> , CAST(from_unixtime(unix_timestamp()) AS timestamp)
>>> FROM EXTERNALMARKETDATA;
>>>
>>> ANALYZE TABLE MARKETDATA PARTITION (DateStamp) COMPUTE STATISTICS;
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 18 Jul 2023 at 18:22, Dipayan Dev 
>>> wrote:
>>>
 It does help performance but not significantly.

 I am just wondering, once Spark creates that staging directory along
 with the SUCCESS file, can we just do a gsutil rsync command and move these
 files to original directory? 

Re: Spark File Output Committer algorithm for GCS

2023-07-19 Thread Mich Talebzadeh
Sounds like if the mv command is inherently slow, there is little that can
be done.

The only suggestion I can make is to create the staging table as compressed
to reduce its size and hence mv? Is that feasible? Also the managed table
can be created with SNAPPY compression

STORED AS ORC
TBLPROPERTIES (
"orc.create.index"="true",
"orc.bloom.filter.columns"="KEY",
"orc.bloom.filter.fpp"="0.05",
"*orc.compress"="SNAPPY",*
"orc.stripe.size"="16777216",
"orc.row.index.stride"="1" )

HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 19 Jul 2023 at 02:35, Dipayan Dev  wrote:

> Hi Mich,
> Ok, my use-case is a bit different.
> I have a Hive table partitioned by dates and need to do dynamic partition
> updates(insert overwrite) daily for the last 30 days (partitions).
> The ETL inside the staging directories is completed in hardly 5minutes,
> but then renaming takes a lot of time as it deletes and copies the
> partitions.
> My issue is something related to this -
> https://groups.google.com/g/cloud-dataproc-discuss/c/neMyhytlfyg?pli=1
>
>
>
> With Best Regards,
>
> Dipayan Dev
>
>
>
> On Wed, Jul 19, 2023 at 12:06 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Spark has no role in creating that hive staging directory. That directory
>> belongs to Hive and Spark simply does ETL there, loading to the Hive
>> managed table in your case which ends up in saging directory
>>
>> I suggest that you review your design and use an external hive table with
>> explicit location on GCS with the date the data loaded. Then push that data
>> into the Hive managed table for today's partition.
>>
>> This was written in bash for Hive HQL itself but you can easily adapt it
>> for Spark
>>
>> TODAY="`date +%Y-%m-%d`"
>> DateStamp="${TODAY}"
>> CREATE EXTERNAL TABLE IF NOT EXISTS EXTERNALMARKETDATA (
>>  KEY string
>>, TICKER string
>>, TIMECREATED string
>>, PRICE float
>> )
>> COMMENT 'From prices using Kafka delivered by Flume location by day'
>> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
>> STORED AS TEXTFILE
>> LOCATION 'gs://etcbucket/cloud_data_fusion/hive.../';
>>
>> --Keep track of daily ingestion into the external table.
>> ALTER TABLE EXTERNALMARKETDATA set location
>> 'gs://etcbucket/cloud_data_fusion/hive.../${TODAY}';
>>
>> -- create your managed table here and populate it from the Hive external
>> table
>> CREATE TABLE IF NOT EXISTS MARKETDATA (
>>  KEY string
>>, TICKER string
>>, TIMECREATED string
>>, PRICE float
>>, op_type int
>>, op_time timestamp
>> )
>> PARTITIONED BY (DateStamp  string)
>> STORED AS ORC
>> TBLPROPERTIES (
>> "orc.create.index"="true",
>> "orc.bloom.filter.columns"="KEY",
>> "orc.bloom.filter.fpp"="0.05",
>> "orc.compress"="SNAPPY",
>> "orc.stripe.size"="16777216",
>> "orc.row.index.stride"="1" )
>> ;
>>
>> --Populate target table
>> INSERT OVERWRITE TABLE MARKETDATA PARTITION (DateStamp = "${TODAY}")
>> SELECT
>>   KEY
>> , TICKER
>> , TIMECREATED
>> , PRICE
>> , 1
>> , CAST(from_unixtime(unix_timestamp()) AS timestamp)
>> FROM EXTERNALMARKETDATA;
>>
>> ANALYZE TABLE MARKETDATA PARTITION (DateStamp) COMPUTE STATISTICS;
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 18 Jul 2023 at 18:22, Dipayan Dev 
>> wrote:
>>
>>> It does help performance but not significantly.
>>>
>>> I am just wondering, once Spark creates that staging directory along
>>> with the SUCCESS file, can we just do a gsutil rsync command and move these
>>> files to original directory? Anyone tried this approach or foresee any
>>> concern?
>>>
>>>
>>>
>>> On Mon, 17 Jul 2023 at 9:47 PM, Dipayan Dev 
>>> wrote:
>>>
 Thanks Jay, is there any suggestion how much I can increase those
 parameters?

 On Mon, 17 Jul 2023 at 8:25 PM, Jay 
 wrote:

> Fileoutputcommitter 

Re: Spark File Output Committer algorithm for GCS

2023-07-18 Thread Dipayan Dev
Hi Mich,
Ok, my use-case is a bit different.
I have a Hive table partitioned by dates and need to do dynamic partition
updates(insert overwrite) daily for the last 30 days (partitions).
The ETL inside the staging directories is completed in hardly 5minutes, but
then renaming takes a lot of time as it deletes and copies the partitions.
My issue is something related to this -
https://groups.google.com/g/cloud-dataproc-discuss/c/neMyhytlfyg?pli=1



With Best Regards,

Dipayan Dev



On Wed, Jul 19, 2023 at 12:06 AM Mich Talebzadeh 
wrote:

> Spark has no role in creating that hive staging directory. That directory
> belongs to Hive and Spark simply does ETL there, loading to the Hive
> managed table in your case which ends up in saging directory
>
> I suggest that you review your design and use an external hive table with
> explicit location on GCS with the date the data loaded. Then push that data
> into the Hive managed table for today's partition.
>
> This was written in bash for Hive HQL itself but you can easily adapt it
> for Spark
>
> TODAY="`date +%Y-%m-%d`"
> DateStamp="${TODAY}"
> CREATE EXTERNAL TABLE IF NOT EXISTS EXTERNALMARKETDATA (
>  KEY string
>, TICKER string
>, TIMECREATED string
>, PRICE float
> )
> COMMENT 'From prices using Kafka delivered by Flume location by day'
> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
> STORED AS TEXTFILE
> LOCATION 'gs://etcbucket/cloud_data_fusion/hive.../';
>
> --Keep track of daily ingestion into the external table.
> ALTER TABLE EXTERNALMARKETDATA set location
> 'gs://etcbucket/cloud_data_fusion/hive.../${TODAY}';
>
> -- create your managed table here and populate it from the Hive external
> table
> CREATE TABLE IF NOT EXISTS MARKETDATA (
>  KEY string
>, TICKER string
>, TIMECREATED string
>, PRICE float
>, op_type int
>, op_time timestamp
> )
> PARTITIONED BY (DateStamp  string)
> STORED AS ORC
> TBLPROPERTIES (
> "orc.create.index"="true",
> "orc.bloom.filter.columns"="KEY",
> "orc.bloom.filter.fpp"="0.05",
> "orc.compress"="SNAPPY",
> "orc.stripe.size"="16777216",
> "orc.row.index.stride"="1" )
> ;
>
> --Populate target table
> INSERT OVERWRITE TABLE MARKETDATA PARTITION (DateStamp = "${TODAY}")
> SELECT
>   KEY
> , TICKER
> , TIMECREATED
> , PRICE
> , 1
> , CAST(from_unixtime(unix_timestamp()) AS timestamp)
> FROM EXTERNALMARKETDATA;
>
> ANALYZE TABLE MARKETDATA PARTITION (DateStamp) COMPUTE STATISTICS;
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 18 Jul 2023 at 18:22, Dipayan Dev  wrote:
>
>> It does help performance but not significantly.
>>
>> I am just wondering, once Spark creates that staging directory along with
>> the SUCCESS file, can we just do a gsutil rsync command and move these
>> files to original directory? Anyone tried this approach or foresee any
>> concern?
>>
>>
>>
>> On Mon, 17 Jul 2023 at 9:47 PM, Dipayan Dev 
>> wrote:
>>
>>> Thanks Jay, is there any suggestion how much I can increase those
>>> parameters?
>>>
>>> On Mon, 17 Jul 2023 at 8:25 PM, Jay 
>>> wrote:
>>>
 Fileoutputcommitter v2 is supported in GCS but the rename is a metadata
 copy and delete operation in GCS and therefore if there are many number of
 files it will take a long time to perform this step. One workaround will be
 to create smaller number of larger files if that is possible from Spark and
 if this is not possible then those configurations allow for configuring the
 threadpool which does the metadata copy.

 You can go through this table
 
 to understand GCS performance implications.



 On Mon, 17 Jul 2023 at 20:12, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> You said this Hive table was a managed table partitioned by date
> -->${TODAY}
>
> How  do you define your Hive managed table?
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all 

Re: Spark File Output Committer algorithm for GCS

2023-07-18 Thread Mich Talebzadeh
Spark has no role in creating that hive staging directory. That directory
belongs to Hive and Spark simply does ETL there, loading to the Hive
managed table in your case which ends up in saging directory

I suggest that you review your design and use an external hive table with
explicit location on GCS with the date the data loaded. Then push that data
into the Hive managed table for today's partition.

This was written in bash for Hive HQL itself but you can easily adapt it
for Spark

TODAY="`date +%Y-%m-%d`"
DateStamp="${TODAY}"
CREATE EXTERNAL TABLE IF NOT EXISTS EXTERNALMARKETDATA (
 KEY string
   , TICKER string
   , TIMECREATED string
   , PRICE float
)
COMMENT 'From prices using Kafka delivered by Flume location by day'
ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'gs://etcbucket/cloud_data_fusion/hive.../';

--Keep track of daily ingestion into the external table.
ALTER TABLE EXTERNALMARKETDATA set location
'gs://etcbucket/cloud_data_fusion/hive.../${TODAY}';

-- create your managed table here and populate it from the Hive external
table
CREATE TABLE IF NOT EXISTS MARKETDATA (
 KEY string
   , TICKER string
   , TIMECREATED string
   , PRICE float
   , op_type int
   , op_time timestamp
)
PARTITIONED BY (DateStamp  string)
STORED AS ORC
TBLPROPERTIES (
"orc.create.index"="true",
"orc.bloom.filter.columns"="KEY",
"orc.bloom.filter.fpp"="0.05",
"orc.compress"="SNAPPY",
"orc.stripe.size"="16777216",
"orc.row.index.stride"="1" )
;

--Populate target table
INSERT OVERWRITE TABLE MARKETDATA PARTITION (DateStamp = "${TODAY}")
SELECT
  KEY
, TICKER
, TIMECREATED
, PRICE
, 1
, CAST(from_unixtime(unix_timestamp()) AS timestamp)
FROM EXTERNALMARKETDATA;

ANALYZE TABLE MARKETDATA PARTITION (DateStamp) COMPUTE STATISTICS;

HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 18 Jul 2023 at 18:22, Dipayan Dev  wrote:

> It does help performance but not significantly.
>
> I am just wondering, once Spark creates that staging directory along with
> the SUCCESS file, can we just do a gsutil rsync command and move these
> files to original directory? Anyone tried this approach or foresee any
> concern?
>
>
>
> On Mon, 17 Jul 2023 at 9:47 PM, Dipayan Dev 
> wrote:
>
>> Thanks Jay, is there any suggestion how much I can increase those
>> parameters?
>>
>> On Mon, 17 Jul 2023 at 8:25 PM, Jay  wrote:
>>
>>> Fileoutputcommitter v2 is supported in GCS but the rename is a metadata
>>> copy and delete operation in GCS and therefore if there are many number of
>>> files it will take a long time to perform this step. One workaround will be
>>> to create smaller number of larger files if that is possible from Spark and
>>> if this is not possible then those configurations allow for configuring the
>>> threadpool which does the metadata copy.
>>>
>>> You can go through this table
>>> 
>>> to understand GCS performance implications.
>>>
>>>
>>>
>>> On Mon, 17 Jul 2023 at 20:12, Mich Talebzadeh 
>>> wrote:
>>>
 You said this Hive table was a managed table partitioned by date
 -->${TODAY}

 How  do you define your Hive managed table?

 HTH

 Mich Talebzadeh,
 Solutions Architect/Engineering Lead
 Palantir Technologies Limited
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Mon, 17 Jul 2023 at 15:29, Dipayan Dev 
 wrote:

> It does support- It doesn’t error out for me atleast. But it took
> around 4 hours to finish the job.
>
> Interestingly, it took only 10 minutes to write the output in the
> staging directory and rest of the time it took to rename the objects. 
> Thats
> the concern.
>
> Looks like a known issue as spark behaves with GCS but not getting any
> workaround for this.
>

Re: Spark File Output Committer algorithm for GCS

2023-07-18 Thread Dipayan Dev
It does help performance but not significantly.

I am just wondering, once Spark creates that staging directory along with
the SUCCESS file, can we just do a gsutil rsync command and move these
files to original directory? Anyone tried this approach or foresee any
concern?



On Mon, 17 Jul 2023 at 9:47 PM, Dipayan Dev  wrote:

> Thanks Jay, is there any suggestion how much I can increase those
> parameters?
>
> On Mon, 17 Jul 2023 at 8:25 PM, Jay  wrote:
>
>> Fileoutputcommitter v2 is supported in GCS but the rename is a metadata
>> copy and delete operation in GCS and therefore if there are many number of
>> files it will take a long time to perform this step. One workaround will be
>> to create smaller number of larger files if that is possible from Spark and
>> if this is not possible then those configurations allow for configuring the
>> threadpool which does the metadata copy.
>>
>> You can go through this table
>> 
>> to understand GCS performance implications.
>>
>>
>>
>> On Mon, 17 Jul 2023 at 20:12, Mich Talebzadeh 
>> wrote:
>>
>>> You said this Hive table was a managed table partitioned by date
>>> -->${TODAY}
>>>
>>> How  do you define your Hive managed table?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 17 Jul 2023 at 15:29, Dipayan Dev 
>>> wrote:
>>>
 It does support- It doesn’t error out for me atleast. But it took
 around 4 hours to finish the job.

 Interestingly, it took only 10 minutes to write the output in the
 staging directory and rest of the time it took to rename the objects. Thats
 the concern.

 Looks like a known issue as spark behaves with GCS but not getting any
 workaround for this.


 On Mon, 17 Jul 2023 at 7:55 PM, Yeachan Park 
 wrote:

> Did you check if mapreduce.fileoutputcommitter.algorithm.version 2 is
> supported on GCS? IIRC it wasn't, but you could check with GCP support
>
>
> On Mon, Jul 17, 2023 at 3:54 PM Dipayan Dev 
> wrote:
>
>> Thanks Jay,
>>
>> I will try that option.
>>
>> Any insight on the file committer algorithms?
>>
>> I tried v2 algorithm but its not enhancing the runtime. What’s the
>> best practice in Dataproc for dynamic updates in Spark.
>>
>>
>> On Mon, 17 Jul 2023 at 7:05 PM, Jay 
>> wrote:
>>
>>> You can try increasing fs.gs.batch.threads and
>>> fs.gs.max.requests.per.batch.
>>>
>>> The definitions for these flags are available here -
>>> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
>>>
>>> On Mon, 17 Jul 2023 at 14:59, Dipayan Dev 
>>> wrote:
>>>
 No, I am using Spark 2.4 to update the GCS partitions . I have a
 managed Hive table on top of this.
 [image: image.png]
 When I do a dynamic partition update of Spark, it creates the new
 file in a Staging area as shown here.
 But the GCS blob renaming takes a lot of time. I have a partition
 based on dates and I need to update around 3 years of data. It usually
 takes 3 hours to finish the process. Anyway to speed up this?
 With Best Regards,

 Dipayan Dev

 On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> So you are using GCP and your Hive is installed on Dataproc which
> happens to run your Spark as well. Is that correct?
>
> What version of Hive are you using?
>
> HTH
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility
> for any loss, damage or destruction of data or any other property 
> which may
> arise from relying on this email's technical content is explicitly
> 

Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Dipayan Dev
Thanks Jay, is there any suggestion how much I can increase those
parameters?

On Mon, 17 Jul 2023 at 8:25 PM, Jay  wrote:

> Fileoutputcommitter v2 is supported in GCS but the rename is a metadata
> copy and delete operation in GCS and therefore if there are many number of
> files it will take a long time to perform this step. One workaround will be
> to create smaller number of larger files if that is possible from Spark and
> if this is not possible then those configurations allow for configuring the
> threadpool which does the metadata copy.
>
> You can go through this table
> 
> to understand GCS performance implications.
>
>
>
> On Mon, 17 Jul 2023 at 20:12, Mich Talebzadeh 
> wrote:
>
>> You said this Hive table was a managed table partitioned by date
>> -->${TODAY}
>>
>> How  do you define your Hive managed table?
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 17 Jul 2023 at 15:29, Dipayan Dev 
>> wrote:
>>
>>> It does support- It doesn’t error out for me atleast. But it took around
>>> 4 hours to finish the job.
>>>
>>> Interestingly, it took only 10 minutes to write the output in the
>>> staging directory and rest of the time it took to rename the objects. Thats
>>> the concern.
>>>
>>> Looks like a known issue as spark behaves with GCS but not getting any
>>> workaround for this.
>>>
>>>
>>> On Mon, 17 Jul 2023 at 7:55 PM, Yeachan Park 
>>> wrote:
>>>
 Did you check if mapreduce.fileoutputcommitter.algorithm.version 2 is
 supported on GCS? IIRC it wasn't, but you could check with GCP support


 On Mon, Jul 17, 2023 at 3:54 PM Dipayan Dev 
 wrote:

> Thanks Jay,
>
> I will try that option.
>
> Any insight on the file committer algorithms?
>
> I tried v2 algorithm but its not enhancing the runtime. What’s the
> best practice in Dataproc for dynamic updates in Spark.
>
>
> On Mon, 17 Jul 2023 at 7:05 PM, Jay 
> wrote:
>
>> You can try increasing fs.gs.batch.threads and
>> fs.gs.max.requests.per.batch.
>>
>> The definitions for these flags are available here -
>> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
>>
>> On Mon, 17 Jul 2023 at 14:59, Dipayan Dev 
>> wrote:
>>
>>> No, I am using Spark 2.4 to update the GCS partitions . I have a
>>> managed Hive table on top of this.
>>> [image: image.png]
>>> When I do a dynamic partition update of Spark, it creates the new
>>> file in a Staging area as shown here.
>>> But the GCS blob renaming takes a lot of time. I have a partition
>>> based on dates and I need to update around 3 years of data. It usually
>>> takes 3 hours to finish the process. Anyway to speed up this?
>>> With Best Regards,
>>>
>>> Dipayan Dev
>>>
>>> On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 So you are using GCP and your Hive is installed on Dataproc which
 happens to run your Spark as well. Is that correct?

 What version of Hive are you using?

 HTH


 Mich Talebzadeh,
 Solutions Architect/Engineering Lead
 Palantir Technologies Limited
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility
 for any loss, damage or destruction of data or any other property 
 which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary 
 damages
 arising from such loss, damage or destruction.




 On Mon, 17 Jul 2023 at 09:16, Dipayan Dev 
 wrote:

> Hi All,
>
> Of late, I have encountered the issue where I have to overwrite a
> lot of partitions of the Hive table through Spark. It looks like 
> writing to
> 

Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Jay
Fileoutputcommitter v2 is supported in GCS but the rename is a metadata
copy and delete operation in GCS and therefore if there are many number of
files it will take a long time to perform this step. One workaround will be
to create smaller number of larger files if that is possible from Spark and
if this is not possible then those configurations allow for configuring the
threadpool which does the metadata copy.

You can go through this table

to understand GCS performance implications.



On Mon, 17 Jul 2023 at 20:12, Mich Talebzadeh 
wrote:

> You said this Hive table was a managed table partitioned by date
> -->${TODAY}
>
> How  do you define your Hive managed table?
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 17 Jul 2023 at 15:29, Dipayan Dev  wrote:
>
>> It does support- It doesn’t error out for me atleast. But it took around
>> 4 hours to finish the job.
>>
>> Interestingly, it took only 10 minutes to write the output in the staging
>> directory and rest of the time it took to rename the objects. Thats the
>> concern.
>>
>> Looks like a known issue as spark behaves with GCS but not getting any
>> workaround for this.
>>
>>
>> On Mon, 17 Jul 2023 at 7:55 PM, Yeachan Park 
>> wrote:
>>
>>> Did you check if mapreduce.fileoutputcommitter.algorithm.version 2 is
>>> supported on GCS? IIRC it wasn't, but you could check with GCP support
>>>
>>>
>>> On Mon, Jul 17, 2023 at 3:54 PM Dipayan Dev 
>>> wrote:
>>>
 Thanks Jay,

 I will try that option.

 Any insight on the file committer algorithms?

 I tried v2 algorithm but its not enhancing the runtime. What’s the best
 practice in Dataproc for dynamic updates in Spark.


 On Mon, 17 Jul 2023 at 7:05 PM, Jay 
 wrote:

> You can try increasing fs.gs.batch.threads and
> fs.gs.max.requests.per.batch.
>
> The definitions for these flags are available here -
> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
>
> On Mon, 17 Jul 2023 at 14:59, Dipayan Dev 
> wrote:
>
>> No, I am using Spark 2.4 to update the GCS partitions . I have a
>> managed Hive table on top of this.
>> [image: image.png]
>> When I do a dynamic partition update of Spark, it creates the new
>> file in a Staging area as shown here.
>> But the GCS blob renaming takes a lot of time. I have a partition
>> based on dates and I need to update around 3 years of data. It usually
>> takes 3 hours to finish the process. Anyway to speed up this?
>> With Best Regards,
>>
>> Dipayan Dev
>>
>> On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> So you are using GCP and your Hive is installed on Dataproc which
>>> happens to run your Spark as well. Is that correct?
>>>
>>> What version of Hive are you using?
>>>
>>> HTH
>>>
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for any loss, damage or destruction of data or any other property which 
>>> may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary 
>>> damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 17 Jul 2023 at 09:16, Dipayan Dev 
>>> wrote:
>>>
 Hi All,

 Of late, I have encountered the issue where I have to overwrite a
 lot of partitions of the Hive table through Spark. It looks like 
 writing to
 hive_staging_directory takes 25% of the total time, whereas 75% or more
 time goes in moving the ORC files from staging directory to the final
 partitioned directory structure.

 I got some reference where it's mentioned to use this config during
 the Spark write.

Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Mich Talebzadeh
You said this Hive table was a managed table partitioned by date -->${TODAY}

How  do you define your Hive managed table?

HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 17 Jul 2023 at 15:29, Dipayan Dev  wrote:

> It does support- It doesn’t error out for me atleast. But it took around 4
> hours to finish the job.
>
> Interestingly, it took only 10 minutes to write the output in the staging
> directory and rest of the time it took to rename the objects. Thats the
> concern.
>
> Looks like a known issue as spark behaves with GCS but not getting any
> workaround for this.
>
>
> On Mon, 17 Jul 2023 at 7:55 PM, Yeachan Park  wrote:
>
>> Did you check if mapreduce.fileoutputcommitter.algorithm.version 2 is
>> supported on GCS? IIRC it wasn't, but you could check with GCP support
>>
>>
>> On Mon, Jul 17, 2023 at 3:54 PM Dipayan Dev 
>> wrote:
>>
>>> Thanks Jay,
>>>
>>> I will try that option.
>>>
>>> Any insight on the file committer algorithms?
>>>
>>> I tried v2 algorithm but its not enhancing the runtime. What’s the best
>>> practice in Dataproc for dynamic updates in Spark.
>>>
>>>
>>> On Mon, 17 Jul 2023 at 7:05 PM, Jay 
>>> wrote:
>>>
 You can try increasing fs.gs.batch.threads and
 fs.gs.max.requests.per.batch.

 The definitions for these flags are available here -
 https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md

 On Mon, 17 Jul 2023 at 14:59, Dipayan Dev 
 wrote:

> No, I am using Spark 2.4 to update the GCS partitions . I have a
> managed Hive table on top of this.
> [image: image.png]
> When I do a dynamic partition update of Spark, it creates the new file
> in a Staging area as shown here.
> But the GCS blob renaming takes a lot of time. I have a partition
> based on dates and I need to update around 3 years of data. It usually
> takes 3 hours to finish the process. Anyway to speed up this?
> With Best Regards,
>
> Dipayan Dev
>
> On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> So you are using GCP and your Hive is installed on Dataproc which
>> happens to run your Spark as well. Is that correct?
>>
>> What version of Hive are you using?
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>> for any loss, damage or destruction of data or any other property which 
>> may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 17 Jul 2023 at 09:16, Dipayan Dev 
>> wrote:
>>
>>> Hi All,
>>>
>>> Of late, I have encountered the issue where I have to overwrite a
>>> lot of partitions of the Hive table through Spark. It looks like 
>>> writing to
>>> hive_staging_directory takes 25% of the total time, whereas 75% or more
>>> time goes in moving the ORC files from staging directory to the final
>>> partitioned directory structure.
>>>
>>> I got some reference where it's mentioned to use this config during
>>> the Spark write.
>>> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>>>
>>> However, it's also mentioned it's not safe as partial job failure
>>> might cause data loss.
>>>
>>> Is there any suggestion on the pros and cons of using this version?
>>> Or any ongoing Spark feature development to address this issue?
>>>
>>>
>>>
>>> With Best Regards,
>>>
>>> Dipayan Dev
>>>
>> --
>>>
>>>
>>>
>>> With Best Regards,
>>>
>>> Dipayan Dev
>>> Author of *Deep Learning with Hadoop
>>> *
>>> M.Tech (AI), IISc, Bangalore
>>>
>> --
>
>
>
> With Best Regards,
>
> Dipayan Dev
> Author of *Deep Learning with Hadoop
> *
> M.Tech (AI), IISc, 

Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Dipayan Dev
It does support- It doesn’t error out for me atleast. But it took around 4
hours to finish the job.

Interestingly, it took only 10 minutes to write the output in the staging
directory and rest of the time it took to rename the objects. Thats the
concern.

Looks like a known issue as spark behaves with GCS but not getting any
workaround for this.


On Mon, 17 Jul 2023 at 7:55 PM, Yeachan Park  wrote:

> Did you check if mapreduce.fileoutputcommitter.algorithm.version 2 is
> supported on GCS? IIRC it wasn't, but you could check with GCP support
>
>
> On Mon, Jul 17, 2023 at 3:54 PM Dipayan Dev 
> wrote:
>
>> Thanks Jay,
>>
>> I will try that option.
>>
>> Any insight on the file committer algorithms?
>>
>> I tried v2 algorithm but its not enhancing the runtime. What’s the best
>> practice in Dataproc for dynamic updates in Spark.
>>
>>
>> On Mon, 17 Jul 2023 at 7:05 PM, Jay  wrote:
>>
>>> You can try increasing fs.gs.batch.threads and
>>> fs.gs.max.requests.per.batch.
>>>
>>> The definitions for these flags are available here -
>>> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
>>>
>>> On Mon, 17 Jul 2023 at 14:59, Dipayan Dev 
>>> wrote:
>>>
 No, I am using Spark 2.4 to update the GCS partitions . I have a
 managed Hive table on top of this.
 [image: image.png]
 When I do a dynamic partition update of Spark, it creates the new file
 in a Staging area as shown here.
 But the GCS blob renaming takes a lot of time. I have a partition based
 on dates and I need to update around 3 years of data. It usually takes 3
 hours to finish the process. Anyway to speed up this?
 With Best Regards,

 Dipayan Dev

 On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> So you are using GCP and your Hive is installed on Dataproc which
> happens to run your Spark as well. Is that correct?
>
> What version of Hive are you using?
>
> HTH
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Mon, 17 Jul 2023 at 09:16, Dipayan Dev 
> wrote:
>
>> Hi All,
>>
>> Of late, I have encountered the issue where I have to overwrite a lot
>> of partitions of the Hive table through Spark. It looks like writing to
>> hive_staging_directory takes 25% of the total time, whereas 75% or more
>> time goes in moving the ORC files from staging directory to the final
>> partitioned directory structure.
>>
>> I got some reference where it's mentioned to use this config during
>> the Spark write.
>> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>>
>> However, it's also mentioned it's not safe as partial job failure
>> might cause data loss.
>>
>> Is there any suggestion on the pros and cons of using this version?
>> Or any ongoing Spark feature development to address this issue?
>>
>>
>>
>> With Best Regards,
>>
>> Dipayan Dev
>>
> --
>>
>>
>>
>> With Best Regards,
>>
>> Dipayan Dev
>> Author of *Deep Learning with Hadoop
>> *
>> M.Tech (AI), IISc, Bangalore
>>
> --



With Best Regards,

Dipayan Dev
Author of *Deep Learning with Hadoop
*
M.Tech (AI), IISc, Bangalore


Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Yeachan Park
Did you check if mapreduce.fileoutputcommitter.algorithm.version 2 is
supported on GCS? IIRC it wasn't, but you could check with GCP support


On Mon, Jul 17, 2023 at 3:54 PM Dipayan Dev  wrote:

> Thanks Jay,
>
> I will try that option.
>
> Any insight on the file committer algorithms?
>
> I tried v2 algorithm but its not enhancing the runtime. What’s the best
> practice in Dataproc for dynamic updates in Spark.
>
>
> On Mon, 17 Jul 2023 at 7:05 PM, Jay  wrote:
>
>> You can try increasing fs.gs.batch.threads and
>> fs.gs.max.requests.per.batch.
>>
>> The definitions for these flags are available here -
>> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
>>
>> On Mon, 17 Jul 2023 at 14:59, Dipayan Dev 
>> wrote:
>>
>>> No, I am using Spark 2.4 to update the GCS partitions . I have a managed
>>> Hive table on top of this.
>>> [image: image.png]
>>> When I do a dynamic partition update of Spark, it creates the new file
>>> in a Staging area as shown here.
>>> But the GCS blob renaming takes a lot of time. I have a partition based
>>> on dates and I need to update around 3 years of data. It usually takes 3
>>> hours to finish the process. Anyway to speed up this?
>>> With Best Regards,
>>>
>>> Dipayan Dev
>>>
>>> On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 So you are using GCP and your Hive is installed on Dataproc which
 happens to run your Spark as well. Is that correct?

 What version of Hive are you using?

 HTH


 Mich Talebzadeh,
 Solutions Architect/Engineering Lead
 Palantir Technologies Limited
 London
 United Kingdom


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Mon, 17 Jul 2023 at 09:16, Dipayan Dev 
 wrote:

> Hi All,
>
> Of late, I have encountered the issue where I have to overwrite a lot
> of partitions of the Hive table through Spark. It looks like writing to
> hive_staging_directory takes 25% of the total time, whereas 75% or more
> time goes in moving the ORC files from staging directory to the final
> partitioned directory structure.
>
> I got some reference where it's mentioned to use this config during
> the Spark write.
> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>
> However, it's also mentioned it's not safe as partial job failure
> might cause data loss.
>
> Is there any suggestion on the pros and cons of using this version? Or
> any ongoing Spark feature development to address this issue?
>
>
>
> With Best Regards,
>
> Dipayan Dev
>
 --
>
>
>
> With Best Regards,
>
> Dipayan Dev
> Author of *Deep Learning with Hadoop
> *
> M.Tech (AI), IISc, Bangalore
>


Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Dipayan Dev
Thanks Jay,

I will try that option.

Any insight on the file committer algorithms?

I tried v2 algorithm but its not enhancing the runtime. What’s the best
practice in Dataproc for dynamic updates in Spark.


On Mon, 17 Jul 2023 at 7:05 PM, Jay  wrote:

> You can try increasing fs.gs.batch.threads and
> fs.gs.max.requests.per.batch.
>
> The definitions for these flags are available here -
> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
>
> On Mon, 17 Jul 2023 at 14:59, Dipayan Dev  wrote:
>
>> No, I am using Spark 2.4 to update the GCS partitions . I have a managed
>> Hive table on top of this.
>> [image: image.png]
>> When I do a dynamic partition update of Spark, it creates the new file in
>> a Staging area as shown here.
>> But the GCS blob renaming takes a lot of time. I have a partition based
>> on dates and I need to update around 3 years of data. It usually takes 3
>> hours to finish the process. Anyway to speed up this?
>> With Best Regards,
>>
>> Dipayan Dev
>>
>> On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> So you are using GCP and your Hive is installed on Dataproc which
>>> happens to run your Spark as well. Is that correct?
>>>
>>> What version of Hive are you using?
>>>
>>> HTH
>>>
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 17 Jul 2023 at 09:16, Dipayan Dev 
>>> wrote:
>>>
 Hi All,

 Of late, I have encountered the issue where I have to overwrite a lot
 of partitions of the Hive table through Spark. It looks like writing to
 hive_staging_directory takes 25% of the total time, whereas 75% or more
 time goes in moving the ORC files from staging directory to the final
 partitioned directory structure.

 I got some reference where it's mentioned to use this config during the
 Spark write.
 *mapreduce.fileoutputcommitter.algorithm.version = 2*

 However, it's also mentioned it's not safe as partial job failure might
 cause data loss.

 Is there any suggestion on the pros and cons of using this version? Or
 any ongoing Spark feature development to address this issue?



 With Best Regards,

 Dipayan Dev

>>> --



With Best Regards,

Dipayan Dev
Author of *Deep Learning with Hadoop
*
M.Tech (AI), IISc, Bangalore


Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Jay
You can try increasing fs.gs.batch.threads and fs.gs.max.requests.per.batch.

The definitions for these flags are available here -
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md

On Mon, 17 Jul 2023 at 14:59, Dipayan Dev  wrote:

> No, I am using Spark 2.4 to update the GCS partitions . I have a managed
> Hive table on top of this.
> [image: image.png]
> When I do a dynamic partition update of Spark, it creates the new file in
> a Staging area as shown here.
> But the GCS blob renaming takes a lot of time. I have a partition based on
> dates and I need to update around 3 years of data. It usually takes 3 hours
> to finish the process. Anyway to speed up this?
> With Best Regards,
>
> Dipayan Dev
>
> On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh 
> wrote:
>
>> So you are using GCP and your Hive is installed on Dataproc which happens
>> to run your Spark as well. Is that correct?
>>
>> What version of Hive are you using?
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 17 Jul 2023 at 09:16, Dipayan Dev 
>> wrote:
>>
>>> Hi All,
>>>
>>> Of late, I have encountered the issue where I have to overwrite a lot of
>>> partitions of the Hive table through Spark. It looks like writing to
>>> hive_staging_directory takes 25% of the total time, whereas 75% or more
>>> time goes in moving the ORC files from staging directory to the final
>>> partitioned directory structure.
>>>
>>> I got some reference where it's mentioned to use this config during the
>>> Spark write.
>>> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>>>
>>> However, it's also mentioned it's not safe as partial job failure might
>>> cause data loss.
>>>
>>> Is there any suggestion on the pros and cons of using this version? Or
>>> any ongoing Spark feature development to address this issue?
>>>
>>>
>>>
>>> With Best Regards,
>>>
>>> Dipayan Dev
>>>
>>


Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Dipayan Dev
No, I am using Spark 2.4 to update the GCS partitions . I have a managed
Hive table on top of this.
[image: image.png]
When I do a dynamic partition update of Spark, it creates the new file in a
Staging area as shown here.
But the GCS blob renaming takes a lot of time. I have a partition based on
dates and I need to update around 3 years of data. It usually takes 3 hours
to finish the process. Anyway to speed up this?
With Best Regards,

Dipayan Dev

On Mon, Jul 17, 2023 at 1:53 PM Mich Talebzadeh 
wrote:

> So you are using GCP and your Hive is installed on Dataproc which happens
> to run your Spark as well. Is that correct?
>
> What version of Hive are you using?
>
> HTH
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 17 Jul 2023 at 09:16, Dipayan Dev  wrote:
>
>> Hi All,
>>
>> Of late, I have encountered the issue where I have to overwrite a lot of
>> partitions of the Hive table through Spark. It looks like writing to
>> hive_staging_directory takes 25% of the total time, whereas 75% or more
>> time goes in moving the ORC files from staging directory to the final
>> partitioned directory structure.
>>
>> I got some reference where it's mentioned to use this config during the
>> Spark write.
>> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>>
>> However, it's also mentioned it's not safe as partial job failure might
>> cause data loss.
>>
>> Is there any suggestion on the pros and cons of using this version? Or
>> any ongoing Spark feature development to address this issue?
>>
>>
>>
>> With Best Regards,
>>
>> Dipayan Dev
>>
>


Re: Spark File Output Committer algorithm for GCS

2023-07-17 Thread Mich Talebzadeh
So you are using GCP and your Hive is installed on Dataproc which happens
to run your Spark as well. Is that correct?

What version of Hive are you using?

HTH


Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 17 Jul 2023 at 09:16, Dipayan Dev  wrote:

> Hi All,
>
> Of late, I have encountered the issue where I have to overwrite a lot of
> partitions of the Hive table through Spark. It looks like writing to
> hive_staging_directory takes 25% of the total time, whereas 75% or more
> time goes in moving the ORC files from staging directory to the final
> partitioned directory structure.
>
> I got some reference where it's mentioned to use this config during the
> Spark write.
> *mapreduce.fileoutputcommitter.algorithm.version = 2*
>
> However, it's also mentioned it's not safe as partial job failure might
> cause data loss.
>
> Is there any suggestion on the pros and cons of using this version? Or any
> ongoing Spark feature development to address this issue?
>
>
>
> With Best Regards,
>
> Dipayan Dev
>


Spark File Output Committer algorithm for GCS

2023-07-17 Thread Dipayan Dev
Hi All,

Of late, I have encountered the issue where I have to overwrite a lot of
partitions of the Hive table through Spark. It looks like writing to
hive_staging_directory takes 25% of the total time, whereas 75% or more
time goes in moving the ORC files from staging directory to the final
partitioned directory structure.

I got some reference where it's mentioned to use this config during the
Spark write.
*mapreduce.fileoutputcommitter.algorithm.version = 2*

However, it's also mentioned it's not safe as partial job failure might
cause data loss.

Is there any suggestion on the pros and cons of using this version? Or any
ongoing Spark feature development to address this issue?



With Best Regards,

Dipayan Dev