Definitely worth to try. And you can sort the record before writing out, and 
then you will get the parquet files without overlapping keys.
Let us know if that helps.

Hao

From: Philip Weaver [mailto:philip.wea...@gmail.com]
Sent: Wednesday, August 12, 2015 4:05 AM
To: Cheng Lian
Cc: user
Subject: Re: Very high latency to initialize a DataFrame from partitioned 
parquet database.

Do you think it might be faster to put all the files in one directory but still 
partitioned the same way? I don't actually need to filter on the values of the 
partition keys, but I need to rely on there be no overlap in the value of the 
keys between any two parquet files.

On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver 
<philip.wea...@gmail.com<mailto:philip.wea...@gmail.com>> wrote:
Thanks, I also confirmed that the partition discovery is slow by writing a 
non-Spark application that uses the parquet library directly to load that 
partitions.

It's so slow that my colleague's Python application can read the entire 
contents of all the parquet data files faster than my application can even 
discover the partitions!

On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian 
<lian.cs....@gmail.com<mailto:lian.cs....@gmail.com>> wrote:
However, it's weird that the partition discovery job only spawns 2 tasks. It 
should use the default parallelism, which is probably 8 according to the logs 
of the next Parquet reading job. Partition discovery is already done in a 
distributed manner via a Spark job. But the parallelism is mysteriously low...

Cheng

On 8/7/15 3:32 PM, Cheng Lian wrote:

Hi Philip,

Thanks for providing the log file. It seems that most of the time are spent on 
partition discovery. The code snippet you provided actually issues two jobs. 
The first one is for listing the input directories to find out all leaf 
directories (and this actually requires listing all leaf files, because we can 
only assert that a directory is a leaf one when it contains no 
sub-directories). Then partition information is extracted from leaf directory 
paths. This process starts at:

10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in 
parallel under: file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
have all completed, from pool

The actual tasks execution time is about 36s:

10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
lindevspark5, PROCESS_LOCAL, 3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+ partitions, so there are a 
lot of leaf directories and files out there. My guess is that the local file 
system spent lots of time listing FileStatus-es of all these files.

I also noticed that Mesos job scheduling takes more time then expected. It is 
probably because this is the first Spark job executed in the application, and 
the system is not warmed up yet. For example, there’s a 6s gap between these 
two adjacent lines:

10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now 
TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and this one actually 
finishes pretty quickly, only 3s (note that the Mesos job scheduling latency is 
also included):

10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 
8 output partitions
…
10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, 
lindevspark4, PROCESS_LOCAL, 2058 bytes)
…
10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) 
in 1527 ms on lindevspark4 (6/8)
10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) 
in 1533 ms on lindevspark4 (7/8)
10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) 
in 2886 ms on lindevspark5 (8/8)

That might be the reason why you observed that the C parquet library you 
mentioned (is it parquet-cpp?) is an order of magnitude faster?

Cheng

On 8/7/15 2:02 AM, Philip Weaver wrote:
With DEBUG, the log output was over 10MB, so I opted for just INFO output. The 
(sanitized) log is attached.

The driver is essentially this code:

    info("A")

    val t = System.currentTimeMillis
    val df = sqlContext.read.parquet(dir).select(...).cache

    val elapsed = System.currentTimeMillis - t
    info(s"Init time: ${elapsed} ms")

We've also observed that it is very slow to read the contents of the parquet 
files. My colleague wrote a PySpark application that gets the list of files, 
parallelizes it, maps across it and reads each file manually using a C parquet 
library, and aggregates manually in the loop. Ignoring the 1-2 minute 
initialization cost, compared to a Spark SQL or DataFrame query in Scala, his 
is an order of magnitude faster. Since he is parallelizing the work through 
Spark, and that isn't causing any performance issues, it seems to be a problem 
with the parquet reader. I may try to do what he did to construct a DataFrame 
manually, and see if I can query it with Spark SQL with reasonable performance.

- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian 
<lian.cs....@gmail.com<mailto:lian.cs....@gmail.com>> wrote:
Would you mind to provide the driver log?

On 8/6/15 3:58 PM, Philip Weaver wrote:
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again.

The initialization time is about 1 minute now, which is still pretty terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver 
<philip.wea...@gmail.com<mailto:philip.wea...@gmail.com>> wrote:
Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian 
<lian.cs....@gmail.com<mailto:lian.cs....@gmail.com>> wrote:
We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your case? We've observed 
~50x performance boost with schema merging turned on.

Cheng

On 8/6/15 8:26 AM, Philip Weaver wrote:
I have a parquet directory that was produced by partitioning by two keys, e.g. 
like this:

df.write.partitionBy("a", "b").parquet("asdf")

There are 35 values of "a", and about 1100-1200 values of "b" for each value of 
"a", for a total of over 40,000 partitions.

Before running any transformations or actions on the DataFrame, just 
initializing it like this takes 2 minutes:

val df = sqlContext.read.parquet("asdf")

Is this normal? Is this because it is doing some bookeeping to discover all the 
partitions? Is it perhaps having to merge the schema from each partition? Would 
you expect it to get better or worse if I subpartition by another key?

- Philip







​



Reply via email to