However, it's weird that the partition discovery job only spawns 2
tasks. It should use the default parallelism, which is probably 8
according to the logs of the next Parquet reading job. Partition
discovery is already done in a distributed manner via a Spark job. But
the parallelism is mysteriously low...
Cheng
On 8/7/15 3:32 PM, Cheng Lian wrote:
Hi Philip,
Thanks for providing the log file. It seems that most of the time are
spent on partition discovery. The code snippet you provided actually
issues two jobs. The first one is for listing the input directories to
find out all leaf directories (and this actually requires listing all
leaf files, because we can only assert that a directory is a leaf one
when it contains no sub-directories). Then partition information is
extracted from leaf directory paths. This process starts at:
10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and
directories in parallel under:
file:/home/pweaver/work/parquet/day=20150225, …
and ends at:
10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool
The actual tasks execution time is about 36s:
10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)
You mentioned that your dataset has about 40,000+ partitions, so there
are a lot of leaf directories and files out there. My guess is that
the local file system spent lots of time listing FileStatus-es of all
these files.
I also noticed that Mesos job scheduling takes more time then
expected. It is probably because this is the first Spark job executed
in the application, and the system is not warmed up yet. For example,
there’s a 6s gap between these two adjacent lines:
10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is
now TASK_RUNNING
The 2nd Spark job is the real Parquet reading job, and this one
actually finishes pretty quickly, only 3s (note that the Mesos job
scheduling latency is also included):
10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at
App.scala:182) with 8 output partitions
…
10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
…
10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage
1.0 (TID 8) in 1527 ms on lindevspark4 (6/8)
10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage
1.0 (TID 6) in 1533 ms on lindevspark4 (7/8)
10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage
1.0 (TID 9) in 2886 ms on lindevspark5 (8/8)
That might be the reason why you observed that the C parquet library
you mentioned (is it parquet-cpp?) is an order of magnitude faster?
Cheng
On 8/7/15 2:02 AM, Philip Weaver wrote:
With DEBUG, the log output was over 10MB, so I opted for just INFO
output. The (sanitized) log is attached.
The driver is essentially this code:
info("A")
val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache
val elapsed = System.currentTimeMillis - t
info(s"Init time: ${elapsed} ms")
We've also observed that it is very slow to read the contents of the
parquet files. My colleague wrote a PySpark application that gets the
list of files, parallelizes it, maps across it and reads each file
manually using a C parquet library, and aggregates manually in the
loop. Ignoring the 1-2 minute initialization cost, compared to a
Spark SQL or DataFrame query in Scala, his is an order of magnitude
faster. Since he is parallelizing the work through Spark, and that
isn't causing any performance issues, it seems to be a problem with
the parquet reader. I may try to do what he did to construct a
DataFrame manually, and see if I can query it with Spark SQL with
reasonable performance.
- Philip
On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian <lian.cs....@gmail.com
<mailto:lian.cs....@gmail.com>> wrote:
Would you mind to provide the driver log?
On 8/6/15 3:58 PM, Philip Weaver wrote:
I built spark from the v1.5.0-snapshot-20150803 tag in the repo
and tried again.
The initialization time is about 1 minute now, which is still
pretty terrible.
On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver
<philip.wea...@gmail.com> wrote:
Absolutely, thanks!
On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian
<lian.cs....@gmail.com> wrote:
We've fixed this issue in 1.5
https://github.com/apache/spark/pull/7396
Could you give it a shot to see whether it helps in your
case? We've observed ~50x performance boost with schema
merging turned on.
Cheng
On 8/6/15 8:26 AM, Philip Weaver wrote:
I have a parquet directory that was produced by
partitioning by two keys, e.g. like this:
df.write.partitionBy("a", "b").parquet("asdf")
There are 35 values of "a", and about 1100-1200 values
of "b" for each value of "a", for a total of over
40,000 partitions.
Before running any transformations or actions on the
DataFrame, just initializing it like this takes *2
minutes*:
val df = sqlContext.read.parquet("asdf")
Is this normal? Is this because it is doing some
bookeeping to discover all the partitions? Is it
perhaps having to merge the schema from each partition?
Would you expect it to get better or worse if I
subpartition by another key?
- Philip