Do you think it might be faster to put all the files in one directory but still partitioned the same way? I don't actually need to filter on the values of the partition keys, but I need to rely on there be no overlap in the value of the keys between any two parquet files.
On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver <philip.wea...@gmail.com> wrote: > Thanks, I also confirmed that the partition discovery is slow by writing a > non-Spark application that uses the parquet library directly to load that > partitions. > > It's so slow that my colleague's Python application can read the entire > contents of all the parquet data files faster than my application can even > discover the partitions! > > On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian <lian.cs....@gmail.com> wrote: > >> However, it's weird that the partition discovery job only spawns 2 tasks. >> It should use the default parallelism, which is probably 8 according to the >> logs of the next Parquet reading job. Partition discovery is already done >> in a distributed manner via a Spark job. But the parallelism is >> mysteriously low... >> >> Cheng >> >> >> On 8/7/15 3:32 PM, Cheng Lian wrote: >> >> Hi Philip, >> >> Thanks for providing the log file. It seems that most of the time are >> spent on partition discovery. The code snippet you provided actually issues >> two jobs. The first one is for listing the input directories to find out >> all leaf directories (and this actually requires listing all leaf files, >> because we can only assert that a directory is a leaf one when it contains >> no sub-directories). Then partition information is extracted from leaf >> directory paths. This process starts at: >> >> 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and >> directories in parallel under: >> file:/home/pweaver/work/parquet/day=20150225, … >> >> and ends at: >> >> 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose >> tasks have all completed, from pool >> >> The actual tasks execution time is about 36s: >> >> 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 >> (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes) >> … >> 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 >> (TID 0) in 36107 ms on lindevspark5 (1/2) >> >> You mentioned that your dataset has about 40,000+ partitions, so there >> are a lot of leaf directories and files out there. My guess is that the >> local file system spent lots of time listing FileStatus-es of all these >> files. >> >> I also noticed that Mesos job scheduling takes more time then expected. >> It is probably because this is the first Spark job executed in the >> application, and the system is not warmed up yet. For example, there’s a 6s >> gap between these two adjacent lines: >> >> 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 >> tasks >> 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now >> TASK_RUNNING >> >> The 2nd Spark job is the real Parquet reading job, and this one actually >> finishes pretty quickly, only 3s (note that the Mesos job scheduling >> latency is also included): >> >> 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at >> App.scala:182) with 8 output partitions >> … >> 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 >> (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes) >> 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 >> (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes) >> 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 >> (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes) >> … >> 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 >> (TID 8) in 1527 ms on lindevspark4 (6/8) >> 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 >> (TID 6) in 1533 ms on lindevspark4 (7/8) >> 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 >> (TID 9) in 2886 ms on lindevspark5 (8/8) >> >> That might be the reason why you observed that the C parquet library you >> mentioned (is it parquet-cpp?) is an order of magnitude faster? >> >> Cheng >> >> On 8/7/15 2:02 AM, Philip Weaver wrote: >> >> With DEBUG, the log output was over 10MB, so I opted for just INFO >> output. The (sanitized) log is attached. >> >> The driver is essentially this code: >> >> info("A") >> >> val t = System.currentTimeMillis >> val df = sqlContext.read.parquet(dir).select(...).cache >> >> val elapsed = System.currentTimeMillis - t >> info(s"Init time: ${elapsed} ms") >> >> We've also observed that it is very slow to read the contents of the >> parquet files. My colleague wrote a PySpark application that gets the list >> of files, parallelizes it, maps across it and reads each file manually >> using a C parquet library, and aggregates manually in the loop. Ignoring >> the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame >> query in Scala, his is an order of magnitude faster. Since he is >> parallelizing the work through Spark, and that isn't causing any >> performance issues, it seems to be a problem with the parquet reader. I may >> try to do what he did to construct a DataFrame manually, and see if I can >> query it with Spark SQL with reasonable performance. >> >> - Philip >> >> >> On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian < <lian.cs....@gmail.com> >> lian.cs....@gmail.com> wrote: >> >>> Would you mind to provide the driver log? >>> >>> >>> On 8/6/15 3:58 PM, Philip Weaver wrote: >>> >>> I built spark from the v1.5.0-snapshot-20150803 tag in the repo and >>> tried again. >>> >>> The initialization time is about 1 minute now, which is still pretty >>> terrible. >>> >>> On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver < >>> <philip.wea...@gmail.com>philip.wea...@gmail.com> wrote: >>> >>>> Absolutely, thanks! >>>> >>>> On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian < <lian.cs....@gmail.com> >>>> lian.cs....@gmail.com> wrote: >>>> >>>>> We've fixed this issue in 1.5 >>>>> <https://github.com/apache/spark/pull/7396> >>>>> https://github.com/apache/spark/pull/7396 >>>>> >>>>> Could you give it a shot to see whether it helps in your case? We've >>>>> observed ~50x performance boost with schema merging turned on. >>>>> >>>>> Cheng >>>>> >>>>> >>>>> On 8/6/15 8:26 AM, Philip Weaver wrote: >>>>> >>>>> I have a parquet directory that was produced by partitioning by two >>>>> keys, e.g. like this: >>>>> >>>>> df.write.partitionBy("a", "b").parquet("asdf") >>>>> >>>>> >>>>> There are 35 values of "a", and about 1100-1200 values of "b" for each >>>>> value of "a", for a total of over 40,000 partitions. >>>>> >>>>> Before running any transformations or actions on the DataFrame, just >>>>> initializing it like this takes *2 minutes*: >>>>> >>>>> val df = sqlContext.read.parquet("asdf") >>>>> >>>>> >>>>> Is this normal? Is this because it is doing some bookeeping to >>>>> discover all the partitions? Is it perhaps having to merge the schema from >>>>> each partition? Would you expect it to get better or worse if I >>>>> subpartition by another key? >>>>> >>>>> - Philip >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> >> >> >> >> >