Hi Philip,

What do you mean by saying "still partitioned the same way"? If you are trying to to save the partition columns encoded in partition directories directly into Parquet files, and put all Parquet part-files into a single directory without creating any intermediate sub-directories, then I'd expect it to be much faster. This is at least true for file systems like S3, haven't tested listing contents of super wide and flat directories (i.e. those containing no sub-directories but a lot of files).

And, as Hao suggested, sorting columns in which you are interested can give better performance on read path because of Parquet specific optimizations like filter push-down. However, I think in your case, the time spent in reading Parquet files is not the bottleneck, according to our previous discussion.

Cheng

On 8/12/15 8:56 AM, Cheng, Hao wrote:

Definitely worth to try. And you can sort the record before writing out, and then you will get the parquet files without overlapping keys.

Let us know if that helps.

Hao

*From:*Philip Weaver [mailto:philip.wea...@gmail.com]
*Sent:* Wednesday, August 12, 2015 4:05 AM
*To:* Cheng Lian
*Cc:* user
*Subject:* Re: Very high latency to initialize a DataFrame from partitioned parquet database.

Do you think it might be faster to put all the files in one directory but still partitioned the same way? I don't actually need to filter on the values of the partition keys, but I need to rely on there be no overlap in the value of the keys between any two parquet files.

On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver <philip.wea...@gmail.com <mailto:philip.wea...@gmail.com>> wrote:

    Thanks, I also confirmed that the partition discovery is slow by
    writing a non-Spark application that uses the parquet library
    directly to load that partitions.

    It's so slow that my colleague's Python application can read the
    entire contents of all the parquet data files faster than my
    application can even discover the partitions!

    On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian <lian.cs....@gmail.com
    <mailto:lian.cs....@gmail.com>> wrote:

        However, it's weird that the partition discovery job only
        spawns 2 tasks. It should use the default parallelism, which
        is probably 8 according to the logs of the next Parquet
        reading job. Partition discovery is already done in a
        distributed manner via a Spark job. But the parallelism is
        mysteriously low...

        Cheng

        On 8/7/15 3:32 PM, Cheng Lian wrote:

            Hi Philip,

            Thanks for providing the log file. It seems that most of
            the time are spent on partition discovery. The code
            snippet you provided actually issues two jobs. The first
            one is for listing the input directories to find out all
            leaf directories (and this actually requires listing all
            leaf files, because we can only assert that a directory is
            a leaf one when it contains no sub-directories). Then
            partition information is extracted from leaf directory
            paths. This process starts at:

                10:51:44 INFO sources.HadoopFsRelation: Listing leaf
                files and directories in parallel under:
                file:/home/pweaver/work/parquet/day=20150225, …

            and ends at:

                10:52:31 INFO scheduler.TaskSchedulerImpl: Removed
                TaskSet 0.0, whose tasks have all completed, from pool

            The actual tasks execution time is about 36s:

                10:51:54 INFO scheduler.TaskSetManager: Starting task
                0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL,
                3087 bytes)
                …
                10:52:30 INFO scheduler.TaskSetManager: Finished task
                0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)

            You mentioned that your dataset has about 40,000+
            partitions, so there are a lot of leaf directories and
            files out there. My guess is that the local file system
            spent lots of time listing FileStatus-es of all these files.

            I also noticed that Mesos job scheduling takes more time
            then expected. It is probably because this is the first
            Spark job executed in the application, and the system is
            not warmed up yet. For example, there’s a 6s gap between
            these two adjacent lines:

                10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task
                set 0.0 with 2 tasks
                10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos
                task 0 is now TASK_RUNNING

            The 2nd Spark job is the real Parquet reading job, and
            this one actually finishes pretty quickly, only 3s (note
            that the Mesos job scheduling latency is also included):

                10:52:32 INFO scheduler.DAGScheduler: Got job 1
                (parquet at App.scala:182) with 8 output partitions
                …
                10:52:32 INFO scheduler.TaskSetManager: Starting task
                0.0 in stage 1.0 (TID 2, lindevspark4, PROCESS_LOCAL,
                2058 bytes)
                10:52:32 INFO scheduler.TaskSetManager: Starting task
                1.0 in stage 1.0 (TID 3, lindevspark5, PROCESS_LOCAL,
                2058 bytes)
                10:52:32 INFO scheduler.TaskSetManager: Starting task
                2.0 in stage 1.0 (TID 4, lindevspark4, PROCESS_LOCAL,
                2058 bytes)
                …
                10:52:34 INFO scheduler.TaskSetManager: Finished task
                6.0 in stage 1.0 (TID 8) in 1527 ms on lindevspark4 (6/8)
                10:52:34 INFO scheduler.TaskSetManager: Finished task
                4.0 in stage 1.0 (TID 6) in 1533 ms on lindevspark4 (7/8)
                10:52:35 INFO scheduler.TaskSetManager: Finished task
                7.0 in stage 1.0 (TID 9) in 2886 ms on lindevspark5 (8/8)

            That might be the reason why you observed that the C
            parquet library you mentioned (is it parquet-cpp?) is an
            order of magnitude faster?

            Cheng

            On 8/7/15 2:02 AM, Philip Weaver wrote:

                With DEBUG, the log output was over 10MB, so I opted
                for just INFO output. The (sanitized) log is attached.

                The driver is essentially this code:

                    info("A")

                    val t = System.currentTimeMillis

                    val df =
                sqlContext.read.parquet(dir).select(...).cache

                    val elapsed = System.currentTimeMillis - t

                    info(s"Init time: ${elapsed} ms")

                We've also observed that it is very slow to read the
                contents of the parquet files. My colleague wrote a
                PySpark application that gets the list of files,
                parallelizes it, maps across it and reads each file
                manually using a C parquet library, and aggregates
                manually in the loop. Ignoring the 1-2 minute
                initialization cost, compared to a Spark SQL or
                DataFrame query in Scala, his is an order of magnitude
                faster. Since he is parallelizing the work through
                Spark, and that isn't causing any performance issues,
                it seems to be a problem with the parquet reader. I
                may try to do what he did to construct a DataFrame
                manually, and see if I can query it with Spark SQL
                with reasonable performance.

                - Philip

                On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian
                <lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>>
                wrote:

                    Would you mind to provide the driver log?

                    On 8/6/15 3:58 PM, Philip Weaver wrote:

                        I built spark from the
                        v1.5.0-snapshot-20150803 tag in the repo and
                        tried again.

                        The initialization time is about 1 minute now,
                        which is still pretty terrible.

                        On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver
                        <philip.wea...@gmail.com
                        <mailto:philip.wea...@gmail.com>> wrote:

                            Absolutely, thanks!

                            On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian
                            <lian.cs....@gmail.com
                            <mailto:lian.cs....@gmail.com>> wrote:

                                We've fixed this issue in 1.5
                                https://github.com/apache/spark/pull/7396

                                Could you give it a shot to see
                                whether it helps in your case? We've
                                observed ~50x performance boost with
                                schema merging turned on.

                                Cheng

                                On 8/6/15 8:26 AM, Philip Weaver wrote:

                                    I have a parquet directory that
                                    was produced by partitioning by
                                    two keys, e.g. like this:

                                        df.write.partitionBy("a",
                                        "b").parquet("asdf")

                                    There are 35 values of "a", and
                                    about 1100-1200 values of "b" for
                                    each value of "a", for a total of
                                    over 40,000 partitions.

                                    Before running any transformations
                                    or actions on the DataFrame, just
                                    initializing it like this takes *2
                                    minutes*:

                                        val df =
                                        sqlContext.read.parquet("asdf")

                                    Is this normal? Is this because it
                                    is doing some bookeeping to
                                    discover all the partitions? Is it
                                    perhaps having to merge the schema
                                    from each partition? Would you
                                    expect it to get better or worse
                                    if I subpartition by another key?

                                    - Philip

            ​


Reply via email to