Do you think it might be faster to put all the files in one directory but
still partitioned the same way? I don't actually need to filter on the
values of the partition keys, but I need to rely on there be no overlap in
the value of the keys between any two parquet files.

On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver <philip.wea...@gmail.com>
wrote:

> Thanks, I also confirmed that the partition discovery is slow by writing a
> non-Spark application that uses the parquet library directly to load that
> partitions.
>
> It's so slow that my colleague's Python application can read the entire
> contents of all the parquet data files faster than my application can even
> discover the partitions!
>
> On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian <lian.cs....@gmail.com> wrote:
>
>> However, it's weird that the partition discovery job only spawns 2 tasks.
>> It should use the default parallelism, which is probably 8 according to the
>> logs of the next Parquet reading job. Partition discovery is already done
>> in a distributed manner via a Spark job. But the parallelism is
>> mysteriously low...
>>
>> Cheng
>>
>>
>> On 8/7/15 3:32 PM, Cheng Lian wrote:
>>
>> Hi Philip,
>>
>> Thanks for providing the log file. It seems that most of the time are
>> spent on partition discovery. The code snippet you provided actually issues
>> two jobs. The first one is for listing the input directories to find out
>> all leaf directories (and this actually requires listing all leaf files,
>> because we can only assert that a directory is a leaf one when it contains
>> no sub-directories). Then partition information is extracted from leaf
>> directory paths. This process starts at:
>>
>> 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and
>> directories in parallel under:
>> file:/home/pweaver/work/parquet/day=20150225, …
>>
>> and ends at:
>>
>> 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose
>> tasks have all completed, from pool
>>
>> The actual tasks execution time is about 36s:
>>
>> 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0
>> (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
>> …
>> 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0
>> (TID 0) in 36107 ms on lindevspark5 (1/2)
>>
>> You mentioned that your dataset has about 40,000+ partitions, so there
>> are a lot of leaf directories and files out there. My guess is that the
>> local file system spent lots of time listing FileStatus-es of all these
>> files.
>>
>> I also noticed that Mesos job scheduling takes more time then expected.
>> It is probably because this is the first Spark job executed in the
>> application, and the system is not warmed up yet. For example, there’s a 6s
>> gap between these two adjacent lines:
>>
>> 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2
>> tasks
>> 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now
>> TASK_RUNNING
>>
>> The 2nd Spark job is the real Parquet reading job, and this one actually
>> finishes pretty quickly, only 3s (note that the Mesos job scheduling
>> latency is also included):
>>
>> 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at
>> App.scala:182) with 8 output partitions
>> …
>> 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0
>> (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
>> 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0
>> (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
>> 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0
>> (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
>> …
>> 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0
>> (TID 8) in 1527 ms on lindevspark4 (6/8)
>> 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0
>> (TID 6) in 1533 ms on lindevspark4 (7/8)
>> 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0
>> (TID 9) in 2886 ms on lindevspark5 (8/8)
>>
>> That might be the reason why you observed that the C parquet library you
>> mentioned (is it parquet-cpp?) is an order of magnitude faster?
>>
>> Cheng
>>
>> On 8/7/15 2:02 AM, Philip Weaver wrote:
>>
>> With DEBUG, the log output was over 10MB, so I opted for just INFO
>> output. The (sanitized) log is attached.
>>
>> The driver is essentially this code:
>>
>>     info("A")
>>
>>     val t = System.currentTimeMillis
>>     val df = sqlContext.read.parquet(dir).select(...).cache
>>
>>     val elapsed = System.currentTimeMillis - t
>>     info(s"Init time: ${elapsed} ms")
>>
>> We've also observed that it is very slow to read the contents of the
>> parquet files. My colleague wrote a PySpark application that gets the list
>> of files, parallelizes it, maps across it and reads each file manually
>> using a C parquet library, and aggregates manually in the loop. Ignoring
>> the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
>> query in Scala, his is an order of magnitude faster. Since he is
>> parallelizing the work through Spark, and that isn't causing any
>> performance issues, it seems to be a problem with the parquet reader. I may
>> try to do what he did to construct a DataFrame manually, and see if I can
>> query it with Spark SQL with reasonable performance.
>>
>> - Philip
>>
>>
>> On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian < <lian.cs....@gmail.com>
>> lian.cs....@gmail.com> wrote:
>>
>>> Would you mind to provide the driver log?
>>>
>>>
>>> On 8/6/15 3:58 PM, Philip Weaver wrote:
>>>
>>> I built spark from the v1.5.0-snapshot-20150803 tag in the repo and
>>> tried again.
>>>
>>> The initialization time is about 1 minute now, which is still pretty
>>> terrible.
>>>
>>> On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver <
>>> <philip.wea...@gmail.com>philip.wea...@gmail.com> wrote:
>>>
>>>> Absolutely, thanks!
>>>>
>>>> On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian < <lian.cs....@gmail.com>
>>>> lian.cs....@gmail.com> wrote:
>>>>
>>>>> We've fixed this issue in 1.5
>>>>> <https://github.com/apache/spark/pull/7396>
>>>>> https://github.com/apache/spark/pull/7396
>>>>>
>>>>> Could you give it a shot to see whether it helps in your case? We've
>>>>> observed ~50x performance boost with schema merging turned on.
>>>>>
>>>>> Cheng
>>>>>
>>>>>
>>>>> On 8/6/15 8:26 AM, Philip Weaver wrote:
>>>>>
>>>>> I have a parquet directory that was produced by partitioning by two
>>>>> keys, e.g. like this:
>>>>>
>>>>> df.write.partitionBy("a", "b").parquet("asdf")
>>>>>
>>>>>
>>>>> There are 35 values of "a", and about 1100-1200 values of "b" for each
>>>>> value of "a", for a total of over 40,000 partitions.
>>>>>
>>>>> Before running any transformations or actions on the DataFrame, just
>>>>> initializing it like this takes *2 minutes*:
>>>>>
>>>>> val df = sqlContext.read.parquet("asdf")
>>>>>
>>>>>
>>>>> Is this normal? Is this because it is doing some bookeeping to
>>>>> discover all the partitions? Is it perhaps having to merge the schema from
>>>>> each partition? Would you expect it to get better or worse if I
>>>>> subpartition by another key?
>>>>>
>>>>> - Philip
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>> ​
>>
>>
>>
>

Reply via email to