Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread Juho Autio
> Not sure if the dynamic overwrite logic is implemented in Spark or in Hive

AFAIK I'm using spark implementation(s). Does the thread dump that I posted
show that? I'd like to remain within Spark impl.

What I'm trying to ask is, do you spark developers see some ways to
optimize this?

Otherwise, I'm not sure what you mean by this:

> There is a probably a limit in the number of element you can pass in the
list of partitions for the listPartitionsWithAuthInfo API call

That request takes a "max" argument, which is just a limit. The type is
short, so max size per response is 32767. Any way, even with this single
request & response it already takes that 5 minutes.

On Thu, Apr 25, 2019 at 5:46 PM vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> There is a probably a limit in the number of element you can pass in the
> list of partitions for the listPartitionsWithAuthInfo API call. Not sure if
> the dynamic overwrite logic is implemented in Spark or in Hive, in which
> case using hive 1.2.1 is probably the reason for un-optimized logic but
> also a huge constraint for solving this issue as upgrading Hive version is
> a real challenge
>
> Le jeu. 25 avr. 2019 à 15:10, Juho Autio  a écrit :
>
>> Ok, I've verified that hive> SHOW PARTITIONS is using get_partition_names,
>> which is always quite fast. Spark's insertInto uses
>> get_partitions_with_auth which is much slower (it also gets location
>> etc. of each partition).
>>
>> I created a test in java that with a local metastore client to measure
>> the time:
>>
>> I used the Short.MAX_VALUE (32767) as max for both (so also get 32767
>> partitions in both responses). I didn't get next page of results, but this
>> gives the idea already:
>>
>> listPartitionNames completed in: 1540 ms ~= 1,5 seconds
>> listPartitionsWithAuthInfo completed in: 303400 ms ~= 5 minutes
>>
>> I wonder if this can be optimized on metastore side, but at least it
>> doesn't seem to be CPU-bound on the RDS db (we're using Hive metastore,
>> backed by AWS RDS).
>>
>> So my original question remains; does spark need to know about all
>> existing partitions for dynamic overwrite? I don't see why it would.
>>
>> On Thu, Apr 25, 2019 at 10:12 AM vincent gromakowski <
>> vincent.gromakow...@gmail.com> wrote:
>>
>>> Which metastore are you using?
>>>
>>> Le jeu. 25 avr. 2019 à 09:02, Juho Autio  a
>>> écrit :
>>>
 Would anyone be able to answer this question about the non-optimal
 implementation of insertInto?

 On Thu, Apr 18, 2019 at 4:45 PM Juho Autio 
 wrote:

> Hi,
>
> My job is writing ~10 partitions with insertInto. With the same input
> / output data the total duration of the job is very different depending on
> how many partitions the target table has.
>
> Target table with 10 of partitions:
> 1 min 30 s
>
> Target table with ~1 partitions:
> 13 min 0 s
>
> It seems that spark is always fetching the full list of partitions in
> target table. When this happens, the cluster is basically idling while
> driver is listing partitions.
>
> Here's a thread dump for executor driver from such idle time:
> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>
> Is there any way to optimize this currently? Is this a known issue?
> Any plans to improve?
>
> My code is essentially:
>
> spark = SparkSession.builder \
> .config('spark.sql.hive.caseSensitiveInferenceMode',
> 'NEVER_INFER') \
> .config("hive.exec.dynamic.partition", "true") \
> .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
> .enableHiveSupport() \
> .getOrCreate()
>
> out_df.write \
> .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
> .insertInto(target_table_name, overwrite=True)
>
> Table has been originally created from spark with saveAsTable.
>
> Does spark need to know anything about the existing partitions though?
> As a manual workaround I would write the files directly to the partition
> locations, delete existing files first if there's anything in that
> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
> PARTITION. This doesn't require previous knowledge on existing partitions.
>
> Thanks.
>

>>
>> --
>> *Juho Autio*
>> Senior Data Engineer
>>
>> Data Engineering, Games
>> Rovio Entertainment Corporation
>> Mobile: + 358 (0)45 313 0122
>> juho.au...@rovio.com
>> www.rovio.com
>>
>> *This message and its attachments may contain confidential information
>> and is intended solely for the attention and use of the named addressee(s).
>> If you are not the intended recipient and / or you have received this
>> message in error, please contact the sender immediately and delete all
>> material you have received in this message. You are hereby noti

Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread vincent gromakowski
There is a probably a limit in the number of element you can pass in the
list of partitions for the listPartitionsWithAuthInfo API call. Not sure if
the dynamic overwrite logic is implemented in Spark or in Hive, in which
case using hive 1.2.1 is probably the reason for un-optimized logic but
also a huge constraint for solving this issue as upgrading Hive version is
a real challenge

Le jeu. 25 avr. 2019 à 15:10, Juho Autio  a écrit :

> Ok, I've verified that hive> SHOW PARTITIONS is using get_partition_names,
> which is always quite fast. Spark's insertInto uses
> get_partitions_with_auth which is much slower (it also gets location etc.
> of each partition).
>
> I created a test in java that with a local metastore client to measure the
> time:
>
> I used the Short.MAX_VALUE (32767) as max for both (so also get 32767
> partitions in both responses). I didn't get next page of results, but this
> gives the idea already:
>
> listPartitionNames completed in: 1540 ms ~= 1,5 seconds
> listPartitionsWithAuthInfo completed in: 303400 ms ~= 5 minutes
>
> I wonder if this can be optimized on metastore side, but at least it
> doesn't seem to be CPU-bound on the RDS db (we're using Hive metastore,
> backed by AWS RDS).
>
> So my original question remains; does spark need to know about all
> existing partitions for dynamic overwrite? I don't see why it would.
>
> On Thu, Apr 25, 2019 at 10:12 AM vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Which metastore are you using?
>>
>> Le jeu. 25 avr. 2019 à 09:02, Juho Autio  a écrit :
>>
>>> Would anyone be able to answer this question about the non-optimal
>>> implementation of insertInto?
>>>
>>> On Thu, Apr 18, 2019 at 4:45 PM Juho Autio  wrote:
>>>
 Hi,

 My job is writing ~10 partitions with insertInto. With the same input /
 output data the total duration of the job is very different depending on
 how many partitions the target table has.

 Target table with 10 of partitions:
 1 min 30 s

 Target table with ~1 partitions:
 13 min 0 s

 It seems that spark is always fetching the full list of partitions in
 target table. When this happens, the cluster is basically idling while
 driver is listing partitions.

 Here's a thread dump for executor driver from such idle time:
 https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20

 Is there any way to optimize this currently? Is this a known issue? Any
 plans to improve?

 My code is essentially:

 spark = SparkSession.builder \
 .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER')
 \
 .config("hive.exec.dynamic.partition", "true") \
 .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
 .config("hive.exec.dynamic.partition.mode", "nonstrict") \
 .enableHiveSupport() \
 .getOrCreate()

 out_df.write \
 .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
 .insertInto(target_table_name, overwrite=True)

 Table has been originally created from spark with saveAsTable.

 Does spark need to know anything about the existing partitions though?
 As a manual workaround I would write the files directly to the partition
 locations, delete existing files first if there's anything in that
 partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
 PARTITION. This doesn't require previous knowledge on existing partitions.

 Thanks.

>>>
>
> --
> *Juho Autio*
> Senior Data Engineer
>
> Data Engineering, Games
> Rovio Entertainment Corporation
> Mobile: + 358 (0)45 313 0122
> juho.au...@rovio.com
> www.rovio.com
>
> *This message and its attachments may contain confidential information and
> is intended solely for the attention and use of the named addressee(s). If
> you are not the intended recipient and / or you have received this message
> in error, please contact the sender immediately and delete all material you
> have received in this message. You are hereby notified that any use of the
> information, which you have received in error in whatsoever form, is
> strictly prohibited. Thank you for your co-operation.*
>


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread Juho Autio
Ok, I've verified that hive> SHOW PARTITIONS is using get_partition_names,
which is always quite fast. Spark's insertInto uses
get_partitions_with_auth which
is much slower (it also gets location etc. of each partition).

I created a test in java that with a local metastore client to measure the
time:

I used the Short.MAX_VALUE (32767) as max for both (so also get 32767
partitions in both responses). I didn't get next page of results, but this
gives the idea already:

listPartitionNames completed in: 1540 ms ~= 1,5 seconds
listPartitionsWithAuthInfo completed in: 303400 ms ~= 5 minutes

I wonder if this can be optimized on metastore side, but at least it
doesn't seem to be CPU-bound on the RDS db (we're using Hive metastore,
backed by AWS RDS).

So my original question remains; does spark need to know about all existing
partitions for dynamic overwrite? I don't see why it would.

On Thu, Apr 25, 2019 at 10:12 AM vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Which metastore are you using?
>
> Le jeu. 25 avr. 2019 à 09:02, Juho Autio  a écrit :
>
>> Would anyone be able to answer this question about the non-optimal
>> implementation of insertInto?
>>
>> On Thu, Apr 18, 2019 at 4:45 PM Juho Autio  wrote:
>>
>>> Hi,
>>>
>>> My job is writing ~10 partitions with insertInto. With the same input /
>>> output data the total duration of the job is very different depending on
>>> how many partitions the target table has.
>>>
>>> Target table with 10 of partitions:
>>> 1 min 30 s
>>>
>>> Target table with ~1 partitions:
>>> 13 min 0 s
>>>
>>> It seems that spark is always fetching the full list of partitions in
>>> target table. When this happens, the cluster is basically idling while
>>> driver is listing partitions.
>>>
>>> Here's a thread dump for executor driver from such idle time:
>>> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>>>
>>> Is there any way to optimize this currently? Is this a known issue? Any
>>> plans to improve?
>>>
>>> My code is essentially:
>>>
>>> spark = SparkSession.builder \
>>> .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
>>> .config("hive.exec.dynamic.partition", "true") \
>>> .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
>>> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
>>> .enableHiveSupport() \
>>> .getOrCreate()
>>>
>>> out_df.write \
>>> .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
>>> .insertInto(target_table_name, overwrite=True)
>>>
>>> Table has been originally created from spark with saveAsTable.
>>>
>>> Does spark need to know anything about the existing partitions though?
>>> As a manual workaround I would write the files directly to the partition
>>> locations, delete existing files first if there's anything in that
>>> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
>>> PARTITION. This doesn't require previous knowledge on existing partitions.
>>>
>>> Thanks.
>>>
>>

-- 
*Juho Autio*
Senior Data Engineer

Data Engineering, Games
Rovio Entertainment Corporation
Mobile: + 358 (0)45 313 0122
juho.au...@rovio.com
www.rovio.com

*This message and its attachments may contain confidential information and
is intended solely for the attention and use of the named addressee(s). If
you are not the intended recipient and / or you have received this message
in error, please contact the sender immediately and delete all material you
have received in this message. You are hereby notified that any use of the
information, which you have received in error in whatsoever form, is
strictly prohibited. Thank you for your co-operation.*


Re: Different query result between spark thrift server and spark-shell

2019-04-25 Thread Jun Zhu
Never mind, I got the point, spark replace hive parquet with it's own,
Should set spark.sql.hive.convertMetastoreParquet=false to use hive's.
Thanks

On Thu, Apr 25, 2019 at 5:00 PM Jun Zhu  wrote:

> Hi,
> We are using plugins from apache hudi which self defined a hive external
> table inputformat with:
>
> ROW FORMAT SERDE
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
>
> WITH SERDEPROPERTIES (
>
>   'serialization.format' = '1'
>
> )
>
> STORED AS
>
>   INPUTFORMAT 'com.uber.hoodie.hadoop.HoodieInputFormat'
>
>   OUTPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
>
> LOCATION 's3a://vungle2-dataeng/jun-test/stage20190424new'
>
> It works when query in spark-shell, however not in spark thrift server
> with same config,
> After debug found:
> spark-shell execution plan differ from spark thrift server
> 1. in spark-shell
> |== Physical Plan ==
> TakeOrderedAndProject(limit=10, orderBy=[datestr#130 ASC NULLS
> FIRST,event_id#81 DESC NULLS LAST], output=[event_id#81,datestr#130,c#74L])
> +- *(2) Filter (c#74L > 1)
>+- *(2) HashAggregate(keys=[event_id#81, datestr#130],
> functions=[count(1)])
>   +- Exchange hashpartitioning(event_id#81, datestr#130, 200)
>  +- *(1) HashAggregate(keys=[event_id#81, datestr#130],
> functions=[partial_count(1)])
> +- *HiveTableScan* [event_id#81, datestr#130],
> *HiveTableRelation* `default`.`hoodie_test_as_reportads_new`,
> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,
> [_hoodie_record_key#78, _hoodie_commit_time#79, _hoodie_commit_seqno#8...
>
> 2. in spark thrift server
>
> | == Physical Plan ==
> TakeOrderedAndProject(limit=10, orderBy=[datestr#63 ASC NULLS
> FIRST,event_id#14 DESC NULLS LAST], output=[event_id#14,datestr#63,c#7L])
> +- *(2) Filter (c#7L > 1)
>+- *(2) HashAggregate(keys=[event_id#14, datestr#63],
> functions=[count(1)])
>   +- Exchange hashpartitioning(event_id#14, datestr#63, 200)
>  +- *(1) HashAggregate(keys=[event_id#14, datestr#63],
> functions=[partial_count(1)])
> +- *(1) *FileScan* *parquet*
> default.hoodie_test_as_reportads_new[event_id#14,datestr#63] Batched: true,
> Format: *Parquet*, Location:
> PrunedInMemoryFileIndex[s3a://vungle2-dataeng/jun-test/stage20190424new/2019-04-24_08,
> s3
>
> Looks like thrift server failed to recognize self-define inputformat.
> Any thoughts? Or can I config the FileScan to HiveTableScan? thanks~
> Best,
>
> --
> [image: vshapesaqua11553186012.gif]    *Jun Zhu*
> Sr. Engineer I, Data
> +86 18565739171
>
> [image: in1552694272.png] [image:
> fb1552694203.png]   [image:
> tw1552694330.png]   [image:
> ig1552694392.png] 
> Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China
>
>

-- 
[image: vshapesaqua11553186012.gif]    *Jun Zhu*
Sr. Engineer I, Data
+86 18565739171

[image: in1552694272.png] [image:
fb1552694203.png]   [image:
tw1552694330.png]   [image:
ig1552694392.png] 
Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China


Re: [GraphX] Preserving Partitions when reading from HDFS

2019-04-25 Thread M Bilal
If I understand correctly this would set the split size in the Hadoop
configuration when reading file. I can see that being useful when you want
to create more partitions than what the block size in HDFS might dictate.
Instead what I want to do is to create a single partition for each file
written by task (from say a previous job) i.e. data in part-0 forms
partition 1, part-1 forms partition 2 and so on and so forth.

- Bilal

On Tue, Apr 16, 2019, 6:00 AM Manu Zhang  wrote:

> You may try
> `sparkContext.hadoopConfiguration().set("mapred.max.split.size",
> "33554432")` to tune the partition size when reading from HDFS.
>
> Thanks,
> Manu Zhang
>
> On Mon, Apr 15, 2019 at 11:28 PM M Bilal  wrote:
>
>> Hi,
>>
>> I have implemented a custom partitioning algorithm to partition graphs in
>> GraphX. Saving the partitioning graph (the edges) to HDFS creates separate
>> files in the output folder with the number of files equal to the number of
>> Partitions.
>>
>> However, reading back the edges creates number of partitions that are
>> equal to the number of blocks in the HDFS folder. Is there a way to instead
>> create the same number of partitions as the number of files written to HDFS
>> while preserving the original partitioning?
>>
>> I would like to avoid repartitioning.
>>
>> Thanks.
>> - Bilal
>>
>


Different query result between spark thrift server and spark-shell

2019-04-25 Thread Jun Zhu
Hi,
We are using plugins from apache hudi which self defined a hive external
table inputformat with:

ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'

WITH SERDEPROPERTIES (

  'serialization.format' = '1'

)

STORED AS

  INPUTFORMAT 'com.uber.hoodie.hadoop.HoodieInputFormat'

  OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'

LOCATION 's3a://vungle2-dataeng/jun-test/stage20190424new'

It works when query in spark-shell, however not in spark thrift server with
same config,
After debug found:
spark-shell execution plan differ from spark thrift server
1. in spark-shell
|== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[datestr#130 ASC NULLS
FIRST,event_id#81 DESC NULLS LAST], output=[event_id#81,datestr#130,c#74L])
+- *(2) Filter (c#74L > 1)
   +- *(2) HashAggregate(keys=[event_id#81, datestr#130],
functions=[count(1)])
  +- Exchange hashpartitioning(event_id#81, datestr#130, 200)
 +- *(1) HashAggregate(keys=[event_id#81, datestr#130],
functions=[partial_count(1)])
+- *HiveTableScan* [event_id#81, datestr#130],
*HiveTableRelation* `default`.`hoodie_test_as_reportads_new`,
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,
[_hoodie_record_key#78, _hoodie_commit_time#79, _hoodie_commit_seqno#8...

2. in spark thrift server

| == Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[datestr#63 ASC NULLS
FIRST,event_id#14 DESC NULLS LAST], output=[event_id#14,datestr#63,c#7L])
+- *(2) Filter (c#7L > 1)
   +- *(2) HashAggregate(keys=[event_id#14, datestr#63],
functions=[count(1)])
  +- Exchange hashpartitioning(event_id#14, datestr#63, 200)
 +- *(1) HashAggregate(keys=[event_id#14, datestr#63],
functions=[partial_count(1)])
+- *(1) *FileScan* *parquet*
default.hoodie_test_as_reportads_new[event_id#14,datestr#63] Batched: true,
Format: *Parquet*, Location:
PrunedInMemoryFileIndex[s3a://vungle2-dataeng/jun-test/stage20190424new/2019-04-24_08,
s3

Looks like thrift server failed to recognize self-define inputformat.
Any thoughts? Or can I config the FileScan to HiveTableScan? thanks~
Best,

-- 
[image: vshapesaqua11553186012.gif]    *Jun Zhu*
Sr. Engineer I, Data
+86 18565739171

[image: in1552694272.png] [image:
fb1552694203.png]   [image:
tw1552694330.png]   [image:
ig1552694392.png] 
Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread Khare, Ankit
Why do you need 1 partition when 10 partition is doing the job .. ??

 Thanks
Ankit

From: vincent gromakowski 
Date: Thursday, 25. April 2019 at 09:12
To: Juho Autio 
Cc: user 
Subject: Re: [Spark SQL]: Slow insertInto overwrite if target table has many 
partitions

Which metastore are you using?

Le jeu. 25 avr. 2019 à 09:02, Juho Autio 
mailto:juho.au...@rovio.com>> a écrit :
Would anyone be able to answer this question about the non-optimal 
implementation of insertInto?

On Thu, Apr 18, 2019 at 4:45 PM Juho Autio 
mailto:juho.au...@rovio.com>> wrote:
Hi,

My job is writing ~10 partitions with insertInto. With the same input / output 
data the total duration of the job is very different depending on how many 
partitions the target table has.

Target table with 10 of partitions:
1 min 30 s

Target table with ~1 partitions:
13 min 0 s

It seems that spark is always fetching the full list of partitions in target 
table. When this happens, the cluster is basically idling while driver is 
listing partitions.

Here's a thread dump for executor driver from such idle time:
https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20

Is there any way to optimize this currently? Is this a known issue? Any plans 
to improve?

My code is essentially:

spark = SparkSession.builder \
.config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
.config("hive.exec.dynamic.partition", "true") \
.config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()

out_df.write \
.option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
.insertInto(target_table_name, overwrite=True)

Table has been originally created from spark with saveAsTable.

Does spark need to know anything about the existing partitions though? As a 
manual workaround I would write the files directly to the partition locations, 
delete existing files first if there's anything in that partition, and then 
call metastore to ALTER TABLE IF NOT EXISTS ADD PARTITION. This doesn't require 
previous knowledge on existing partitions.

Thanks.


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread vincent gromakowski
Which metastore are you using?

Le jeu. 25 avr. 2019 à 09:02, Juho Autio  a écrit :

> Would anyone be able to answer this question about the non-optimal
> implementation of insertInto?
>
> On Thu, Apr 18, 2019 at 4:45 PM Juho Autio  wrote:
>
>> Hi,
>>
>> My job is writing ~10 partitions with insertInto. With the same input /
>> output data the total duration of the job is very different depending on
>> how many partitions the target table has.
>>
>> Target table with 10 of partitions:
>> 1 min 30 s
>>
>> Target table with ~1 partitions:
>> 13 min 0 s
>>
>> It seems that spark is always fetching the full list of partitions in
>> target table. When this happens, the cluster is basically idling while
>> driver is listing partitions.
>>
>> Here's a thread dump for executor driver from such idle time:
>> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>>
>> Is there any way to optimize this currently? Is this a known issue? Any
>> plans to improve?
>>
>> My code is essentially:
>>
>> spark = SparkSession.builder \
>> .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
>> .config("hive.exec.dynamic.partition", "true") \
>> .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
>> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
>> .enableHiveSupport() \
>> .getOrCreate()
>>
>> out_df.write \
>> .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
>> .insertInto(target_table_name, overwrite=True)
>>
>> Table has been originally created from spark with saveAsTable.
>>
>> Does spark need to know anything about the existing partitions though? As
>> a manual workaround I would write the files directly to the partition
>> locations, delete existing files first if there's anything in that
>> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
>> PARTITION. This doesn't require previous knowledge on existing partitions.
>>
>> Thanks.
>>
>


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread Juho Autio
Would anyone be able to answer this question about the non-optimal
implementation of insertInto?

On Thu, Apr 18, 2019 at 4:45 PM Juho Autio  wrote:

> Hi,
>
> My job is writing ~10 partitions with insertInto. With the same input /
> output data the total duration of the job is very different depending on
> how many partitions the target table has.
>
> Target table with 10 of partitions:
> 1 min 30 s
>
> Target table with ~1 partitions:
> 13 min 0 s
>
> It seems that spark is always fetching the full list of partitions in
> target table. When this happens, the cluster is basically idling while
> driver is listing partitions.
>
> Here's a thread dump for executor driver from such idle time:
> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>
> Is there any way to optimize this currently? Is this a known issue? Any
> plans to improve?
>
> My code is essentially:
>
> spark = SparkSession.builder \
> .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
> .config("hive.exec.dynamic.partition", "true") \
> .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
> .enableHiveSupport() \
> .getOrCreate()
>
> out_df.write \
> .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
> .insertInto(target_table_name, overwrite=True)
>
> Table has been originally created from spark with saveAsTable.
>
> Does spark need to know anything about the existing partitions though? As
> a manual workaround I would write the files directly to the partition
> locations, delete existing files first if there's anything in that
> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
> PARTITION. This doesn't require previous knowledge on existing partitions.
>
> Thanks.
>