Definitely worth to try. And you can sort the record before writing
out, and then you will get the parquet files without overlapping keys.
Let us know if that helps.
Hao
*From:*Philip Weaver [mailto:philip.wea...@gmail.com]
*Sent:* Wednesday, August 12, 2015 4:05 AM
*To:* Cheng Lian
*Cc:* user
*Subject:* Re: Very high latency to initialize a DataFrame from
partitioned parquet database.
Do you think it might be faster to put all the files in one directory
but still partitioned the same way? I don't actually need to filter on
the values of the partition keys, but I need to rely on there be no
overlap in the value of the keys between any two parquet files.
On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver <philip.wea...@gmail.com
<mailto:philip.wea...@gmail.com>> wrote:
Thanks, I also confirmed that the partition discovery is slow by
writing a non-Spark application that uses the parquet library
directly to load that partitions.
It's so slow that my colleague's Python application can read the
entire contents of all the parquet data files faster than my
application can even discover the partitions!
On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian <lian.cs....@gmail.com
<mailto:lian.cs....@gmail.com>> wrote:
However, it's weird that the partition discovery job only
spawns 2 tasks. It should use the default parallelism, which
is probably 8 according to the logs of the next Parquet
reading job. Partition discovery is already done in a
distributed manner via a Spark job. But the parallelism is
mysteriously low...
Cheng
On 8/7/15 3:32 PM, Cheng Lian wrote:
Hi Philip,
Thanks for providing the log file. It seems that most of
the time are spent on partition discovery. The code
snippet you provided actually issues two jobs. The first
one is for listing the input directories to find out all
leaf directories (and this actually requires listing all
leaf files, because we can only assert that a directory is
a leaf one when it contains no sub-directories). Then
partition information is extracted from leaf directory
paths. This process starts at:
10:51:44 INFO sources.HadoopFsRelation: Listing leaf
files and directories in parallel under:
file:/home/pweaver/work/parquet/day=20150225, …
and ends at:
10:52:31 INFO scheduler.TaskSchedulerImpl: Removed
TaskSet 0.0, whose tasks have all completed, from pool
The actual tasks execution time is about 36s:
10:51:54 INFO scheduler.TaskSetManager: Starting task
0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL,
3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task
0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)
You mentioned that your dataset has about 40,000+
partitions, so there are a lot of leaf directories and
files out there. My guess is that the local file system
spent lots of time listing FileStatus-es of all these files.
I also noticed that Mesos job scheduling takes more time
then expected. It is probably because this is the first
Spark job executed in the application, and the system is
not warmed up yet. For example, there’s a 6s gap between
these two adjacent lines:
10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task
set 0.0 with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos
task 0 is now TASK_RUNNING
The 2nd Spark job is the real Parquet reading job, and
this one actually finishes pretty quickly, only 3s (note
that the Mesos job scheduling latency is also included):
10:52:32 INFO scheduler.DAGScheduler: Got job 1
(parquet at App.scala:182) with 8 output partitions
…
10:52:32 INFO scheduler.TaskSetManager: Starting task
0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL,
2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task
1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL,
2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task
2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL,
2058 bytes)
…
10:52:34 INFO scheduler.TaskSetManager: Finished task
6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8)
10:52:34 INFO scheduler.TaskSetManager: Finished task
4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8)
10:52:35 INFO scheduler.TaskSetManager: Finished task
7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8)
That might be the reason why you observed that the C
parquet library you mentioned (is it parquet-cpp?) is an
order of magnitude faster?
Cheng
On 8/7/15 2:02 AM, Philip Weaver wrote:
With DEBUG, the log output was over 10MB, so I opted
for just INFO output. The (sanitized) log is attached.
The driver is essentially this code:
info("A")
val t = System.currentTimeMillis
val df =
sqlContext.read.parquet(dir).select(...).cache
val elapsed = System.currentTimeMillis - t
info(s"Init time: ${elapsed} ms")
We've also observed that it is very slow to read the
contents of the parquet files. My colleague wrote a
PySpark application that gets the list of files,
parallelizes it, maps across it and reads each file
manually using a C parquet library, and aggregates
manually in the loop. Ignoring the 1-2 minute
initialization cost, compared to a Spark SQL or
DataFrame query in Scala, his is an order of magnitude
faster. Since he is parallelizing the work through
Spark, and that isn't causing any performance issues,
it seems to be a problem with the parquet reader. I
may try to do what he did to construct a DataFrame
manually, and see if I can query it with Spark SQL
with reasonable performance.
- Philip
On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian
<lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>>
wrote:
Would you mind to provide the driver log?
On 8/6/15 3:58 PM, Philip Weaver wrote:
I built spark from the
v1.5.0-snapshot-20150803 tag in the repo and
tried again.
The initialization time is about 1 minute now,
which is still pretty terrible.
On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver
<philip.wea...@gmail.com
<mailto:philip.wea...@gmail.com>> wrote:
Absolutely, thanks!
On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian
<lian.cs....@gmail.com
<mailto:lian.cs....@gmail.com>> wrote:
We've fixed this issue in 1.5
https://github.com/apache/spark/pull/7396
Could you give it a shot to see
whether it helps in your case? We've
observed ~50x performance boost with
schema merging turned on.
Cheng
On 8/6/15 8:26 AM, Philip Weaver wrote:
I have a parquet directory that
was produced by partitioning by
two keys, e.g. like this:
df.write.partitionBy("a",
"b").parquet("asdf")
There are 35 values of "a", and
about 1100-1200 values of "b" for
each value of "a", for a total of
over 40,000 partitions.
Before running any transformations
or actions on the DataFrame, just
initializing it like this takes *2
minutes*:
val df =
sqlContext.read.parquet("asdf")
Is this normal? Is this because it
is doing some bookeeping to
discover all the partitions? Is it
perhaps having to merge the schema
from each partition? Would you
expect it to get better or worse
if I subpartition by another key?
- Philip