Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
I have a parquet directory that was produced by partitioning by two keys,
e.g. like this:

df.write.partitionBy("a", "b").parquet("asdf")


There are 35 values of "a", and about 1100-1200 values of "b" for each
value of "a", for a total of over 40,000 partitions.

Before running any transformations or actions on the DataFrame, just
initializing it like this takes *2 minutes*:

val df = sqlContext.read.parquet("asdf")


Is this normal? Is this because it is doing some bookeeping to discover all
the partitions? Is it perhaps having to merge the schema from each
partition? Would you expect it to get better or worse if I subpartition by
another key?

- Philip


Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Cheng Lian

We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your case? We've 
observed ~50x performance boost with schema merging turned on.


Cheng

On 8/6/15 8:26 AM, Philip Weaver wrote:
I have a parquet directory that was produced by partitioning by two 
keys, e.g. like this:


df.write.partitionBy("a", "b").parquet("asdf")


There are 35 values of "a", and about 1100-1200 values of "b" for each 
value of "a", for a total of over 40,000 partitions.


Before running any transformations or actions on the DataFrame, just 
initializing it like this takes *2 minutes*:


val df = sqlContext.read.parquet("asdf")


Is this normal? Is this because it is doing some bookeeping to 
discover all the partitions? Is it perhaps having to merge the schema 
from each partition? Would you expect it to get better or worse if I 
subpartition by another key?


- Philip






Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian  wrote:

> We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396
>
> Could you give it a shot to see whether it helps in your case? We've
> observed ~50x performance boost with schema merging turned on.
>
> Cheng
>
>
> On 8/6/15 8:26 AM, Philip Weaver wrote:
>
> I have a parquet directory that was produced by partitioning by two keys,
> e.g. like this:
>
> df.write.partitionBy("a", "b").parquet("asdf")
>
>
> There are 35 values of "a", and about 1100-1200 values of "b" for each
> value of "a", for a total of over 40,000 partitions.
>
> Before running any transformations or actions on the DataFrame, just
> initializing it like this takes *2 minutes*:
>
> val df = sqlContext.read.parquet("asdf")
>
>
> Is this normal? Is this because it is doing some bookeeping to discover
> all the partitions? Is it perhaps having to merge the schema from each
> partition? Would you expect it to get better or worse if I subpartition by
> another key?
>
> - Philip
>
>
>
>


Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
again.

The initialization time is about 1 minute now, which is still pretty
terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver 
wrote:

> Absolutely, thanks!
>
> On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian  wrote:
>
>> We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396
>>
>> Could you give it a shot to see whether it helps in your case? We've
>> observed ~50x performance boost with schema merging turned on.
>>
>> Cheng
>>
>>
>> On 8/6/15 8:26 AM, Philip Weaver wrote:
>>
>> I have a parquet directory that was produced by partitioning by two keys,
>> e.g. like this:
>>
>> df.write.partitionBy("a", "b").parquet("asdf")
>>
>>
>> There are 35 values of "a", and about 1100-1200 values of "b" for each
>> value of "a", for a total of over 40,000 partitions.
>>
>> Before running any transformations or actions on the DataFrame, just
>> initializing it like this takes *2 minutes*:
>>
>> val df = sqlContext.read.parquet("asdf")
>>
>>
>> Is this normal? Is this because it is doing some bookeeping to discover
>> all the partitions? Is it perhaps having to merge the schema from each
>> partition? Would you expect it to get better or worse if I subpartition by
>> another key?
>>
>> - Philip
>>
>>
>>
>>
>


Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Cheng Lian

Would you mind to provide the driver log?

On 8/6/15 3:58 PM, Philip Weaver wrote:
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and 
tried again.


The initialization time is about 1 minute now, which is still pretty 
terrible.


On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver > wrote:


Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian mailto:lian.cs@gmail.com>> wrote:

We've fixed this issue in 1.5
https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your case?
We've observed ~50x performance boost with schema merging
turned on.

Cheng


On 8/6/15 8:26 AM, Philip Weaver wrote:

I have a parquet directory that was produced by partitioning
by two keys, e.g. like this:

df.write.partitionBy("a", "b").parquet("asdf")


There are 35 values of "a", and about 1100-1200 values of "b"
for each value of "a", for a total of over 40,000 partitions.

Before running any transformations or actions on the
DataFrame, just initializing it like this takes *2 minutes*:

val df = sqlContext.read.parquet("asdf")


Is this normal? Is this because it is doing some bookeeping
to discover all the partitions? Is it perhaps having to merge
the schema from each partition? Would you expect it to get
better or worse if I subpartition by another key?

- Philip










Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
With DEBUG, the log output was over 10MB, so I opted for just INFO output.
The (sanitized) log is attached.

The driver is essentially this code:

info("A")

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(s"Init time: ${elapsed} ms")

We've also observed that it is very slow to read the contents of the
parquet files. My colleague wrote a PySpark application that gets the list
of files, parallelizes it, maps across it and reads each file manually
using a C parquet library, and aggregates manually in the loop. Ignoring
the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
query in Scala, his is an order of magnitude faster. Since he is
parallelizing the work through Spark, and that isn't causing any
performance issues, it seems to be a problem with the parquet reader. I may
try to do what he did to construct a DataFrame manually, and see if I can
query it with Spark SQL with reasonable performance.

- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian  wrote:

> Would you mind to provide the driver log?
>
>
> On 8/6/15 3:58 PM, Philip Weaver wrote:
>
> I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
> again.
>
> The initialization time is about 1 minute now, which is still pretty
> terrible.
>
> On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver 
> wrote:
>
>> Absolutely, thanks!
>>
>> On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian < 
>> lian.cs@gmail.com> wrote:
>>
>>> We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396
>>>
>>> Could you give it a shot to see whether it helps in your case? We've
>>> observed ~50x performance boost with schema merging turned on.
>>>
>>> Cheng
>>>
>>>
>>> On 8/6/15 8:26 AM, Philip Weaver wrote:
>>>
>>> I have a parquet directory that was produced by partitioning by two
>>> keys, e.g. like this:
>>>
>>> df.write.partitionBy("a", "b").parquet("asdf")
>>>
>>>
>>> There are 35 values of "a", and about 1100-1200 values of "b" for each
>>> value of "a", for a total of over 40,000 partitions.
>>>
>>> Before running any transformations or actions on the DataFrame, just
>>> initializing it like this takes *2 minutes*:
>>>
>>> val df = sqlContext.read.parquet("asdf")
>>>
>>>
>>> Is this normal? Is this because it is doing some bookeeping to discover
>>> all the partitions? Is it perhaps having to merge the schema from each
>>> partition? Would you expect it to get better or worse if I subpartition by
>>> another key?
>>>
>>> - Philip
>>>
>>>
>>>
>>>
>>
>
>
10:51:42  INFO spark.SparkContext: Running Spark version 1.5.0-SNAPSHOT
10:51:42  WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:51:42  INFO spark.SecurityManager: Changing view acls to: pweaver
10:51:42  INFO spark.SecurityManager: Changing modify acls to: pweaver
10:51:42  INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pweaver); users with modify permissions: Set(pweaver)
10:51:43  INFO slf4j.Slf4jLogger: Slf4jLogger started
10:51:43  INFO Remoting: Starting remoting
10:51:43  INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.26.21.70:51400]
10:51:43  INFO util.Utils: Successfully started service 'sparkDriver' on port 51400.
10:51:43  INFO spark.SparkEnv: Registering MapOutputTracker
10:51:43  INFO spark.SparkEnv: Registering BlockManagerMaster
10:51:43  INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-04438917-93ee-45f3-bc10-c5f5eb3d6a4a
10:51:43  INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
10:51:43  INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858/httpd-50939810-7da7-42d9-9342-48d9dc2705dc
10:51:43  INFO spark.HttpServer: Starting HTTP Server
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55227
10:51:43  INFO util.Utils: Successfully started service 'HTTP file server' on port 55227.
10:51:43  INFO spark.SparkEnv: Registering OutputCommitCoordinator
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
10:51:43  INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
10:51:43  INFO ui.SparkUI: Started SparkUI at http://172.26.21.70:4040
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark-assembly-1.0-deps.jar at http://172.26.21.70:55227/jars/linear_spark-assembly-1.0-deps.jar with timestamp 1438883503937
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark_2.11-1.0.jar at http://172.26.21.70:55227/jars/linear_spark_2.11-1.0.jar with timestamp 1438883503940
10:51:44  WARN metrics.MetricsSystem: Using default name DAGScheduler for source beca

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-07 Thread Cheng Lian

Hi Philip,

Thanks for providing the log file. It seems that most of the time are 
spent on partition discovery. The code snippet you provided actually 
issues two jobs. The first one is for listing the input directories to 
find out all leaf directories (and this actually requires listing all 
leaf files, because we can only assert that a directory is a leaf one 
when it contains no sub-directories). Then partition information is 
extracted from leaf directory paths. This process starts at:


   10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and
   directories in parallel under:
   file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

   10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
   whose tasks have all completed, from pool

The actual tasks execution time is about 36s:

   10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
   0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
   …
   10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
   0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+ partitions, so there 
are a lot of leaf directories and files out there. My guess is that the 
local file system spent lots of time listing FileStatus-es of all these 
files.


I also noticed that Mesos job scheduling takes more time then expected. 
It is probably because this is the first Spark job executed in the 
application, and the system is not warmed up yet. For example, there’s a 
6s gap between these two adjacent lines:


   10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
   2 tasks
   10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now
   TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and this one actually 
finishes pretty quickly, only 3s (note that the Mesos job scheduling 
latency is also included):


   10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at
   App.scala:182) with 8 output partitions
   …
   10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
   1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
   10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
   1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
   10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
   1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
   …
   10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage
   1.0 (TID 8) in 1527 ms on lindevspark4 (6/8)
   10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage
   1.0 (TID 6) in 1533 ms on lindevspark4 (7/8)
   10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage
   1.0 (TID 9) in 2886 ms on lindevspark5 (8/8)

That might be the reason why you observed that the C parquet library you 
mentioned (is it parquet-cpp?) is an order of magnitude faster?


Cheng

On 8/7/15 2:02 AM, Philip Weaver wrote:

With DEBUG, the log output was over 10MB, so I opted for just INFO 
output. The (sanitized) log is attached.


The driver is essentially this code:

info("A")

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(s"Init time: ${elapsed} ms")

We've also observed that it is very slow to read the contents of the 
parquet files. My colleague wrote a PySpark application that gets the 
list of files, parallelizes it, maps across it and reads each file 
manually using a C parquet library, and aggregates manually in the 
loop. Ignoring the 1-2 minute initialization cost, compared to a Spark 
SQL or DataFrame query in Scala, his is an order of magnitude faster. 
Since he is parallelizing the work through Spark, and that isn't 
causing any performance issues, it seems to be a problem with the 
parquet reader. I may try to do what he did to construct a DataFrame 
manually, and see if I can query it with Spark SQL with reasonable 
performance.


- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian > wrote:


Would you mind to provide the driver log?


On 8/6/15 3:58 PM, Philip Weaver wrote:

I built spark from the v1.5.0-snapshot-20150803 tag in the repo
and tried again.

The initialization time is about 1 minute now, which is still
pretty terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver
mailto:philip.wea...@gmail.com>> wrote:

Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian
mailto:lian.cs@gmail.com>> wrote:

We've fixed this issue in 1.5
https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your
case? We've observed ~50x performance boost with schema
merging turned on.

Cheng


On 8/6/15 8:26 AM, Philip Weaver wrote:

I have a parquet directory that was produced by
par

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-07 Thread Cheng Lian
However, it's weird that the partition discovery job only spawns 2 
tasks. It should use the default parallelism, which is probably 8 
according to the logs of the next Parquet reading job. Partition 
discovery is already done in a distributed manner via a Spark job. But 
the parallelism is mysteriously low...


Cheng

On 8/7/15 3:32 PM, Cheng Lian wrote:


Hi Philip,

Thanks for providing the log file. It seems that most of the time are 
spent on partition discovery. The code snippet you provided actually 
issues two jobs. The first one is for listing the input directories to 
find out all leaf directories (and this actually requires listing all 
leaf files, because we can only assert that a directory is a leaf one 
when it contains no sub-directories). Then partition information is 
extracted from leaf directory paths. This process starts at:


10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and
directories in parallel under:
file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool

The actual tasks execution time is about 36s:

10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+ partitions, so there 
are a lot of leaf directories and files out there. My guess is that 
the local file system spent lots of time listing FileStatus-es of all 
these files.


I also noticed that Mesos job scheduling takes more time then 
expected. It is probably because this is the first Spark job executed 
in the application, and the system is not warmed up yet. For example, 
there’s a 6s gap between these two adjacent lines:


10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is
now TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and this one 
actually finishes pretty quickly, only 3s (note that the Mesos job 
scheduling latency is also included):


10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at
App.scala:182) with 8 output partitions
…
10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
…
10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage
1.0 (TID 8) in 1527 ms on lindevspark4 (6/8)
10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage
1.0 (TID 6) in 1533 ms on lindevspark4 (7/8)
10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage
1.0 (TID 9) in 2886 ms on lindevspark5 (8/8)

That might be the reason why you observed that the C parquet library 
you mentioned (is it parquet-cpp?) is an order of magnitude faster?


Cheng

On 8/7/15 2:02 AM, Philip Weaver wrote:

With DEBUG, the log output was over 10MB, so I opted for just INFO 
output. The (sanitized) log is attached.


The driver is essentially this code:

info("A")

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(s"Init time: ${elapsed} ms")

We've also observed that it is very slow to read the contents of the 
parquet files. My colleague wrote a PySpark application that gets the 
list of files, parallelizes it, maps across it and reads each file 
manually using a C parquet library, and aggregates manually in the 
loop. Ignoring the 1-2 minute initialization cost, compared to a 
Spark SQL or DataFrame query in Scala, his is an order of magnitude 
faster. Since he is parallelizing the work through Spark, and that 
isn't causing any performance issues, it seems to be a problem with 
the parquet reader. I may try to do what he did to construct a 
DataFrame manually, and see if I can query it with Spark SQL with 
reasonable performance.


- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian > wrote:


Would you mind to provide the driver log?


On 8/6/15 3:58 PM, Philip Weaver wrote:

I built spark from the v1.5.0-snapshot-20150803 tag in the repo
and tried again.

The initialization time is about 1 minute now, which is still
pretty terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver
 wrote:

Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian
 wrote:

We've fixed this issue in 1.5
https://github.com/apache/spark/p

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-07 Thread Philip Weaver
Thanks, I also confirmed that the partition discovery is slow by writing a
non-Spark application that uses the parquet library directly to load that
partitions.

It's so slow that my colleague's Python application can read the entire
contents of all the parquet data files faster than my application can even
discover the partitions!

On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian  wrote:

> However, it's weird that the partition discovery job only spawns 2 tasks.
> It should use the default parallelism, which is probably 8 according to the
> logs of the next Parquet reading job. Partition discovery is already done
> in a distributed manner via a Spark job. But the parallelism is
> mysteriously low...
>
> Cheng
>
>
> On 8/7/15 3:32 PM, Cheng Lian wrote:
>
> Hi Philip,
>
> Thanks for providing the log file. It seems that most of the time are
> spent on partition discovery. The code snippet you provided actually issues
> two jobs. The first one is for listing the input directories to find out
> all leaf directories (and this actually requires listing all leaf files,
> because we can only assert that a directory is a leaf one when it contains
> no sub-directories). Then partition information is extracted from leaf
> directory paths. This process starts at:
>
> 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories
> in parallel under: file:/home/pweaver/work/parquet/day=20150225, …
>
> and ends at:
>
> 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose
> tasks have all completed, from pool
>
> The actual tasks execution time is about 36s:
>
> 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0
> (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
> …
> 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0
> (TID 0) in 36107 ms on lindevspark5 (1/2)
>
> You mentioned that your dataset has about 40,000+ partitions, so there are
> a lot of leaf directories and files out there. My guess is that the local
> file system spent lots of time listing FileStatus-es of all these files.
>
> I also noticed that Mesos job scheduling takes more time then expected. It
> is probably because this is the first Spark job executed in the
> application, and the system is not warmed up yet. For example, there’s a 6s
> gap between these two adjacent lines:
>
> 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
> 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now
> TASK_RUNNING
>
> The 2nd Spark job is the real Parquet reading job, and this one actually
> finishes pretty quickly, only 3s (note that the Mesos job scheduling
> latency is also included):
>
> 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182)
> with 8 output partitions
> …
> 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0
> (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
> 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0
> (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
> 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0
> (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
> …
> 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0
> (TID 8) in 1527 ms on lindevspark4 (6/8)
> 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0
> (TID 6) in 1533 ms on lindevspark4 (7/8)
> 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0
> (TID 9) in 2886 ms on lindevspark5 (8/8)
>
> That might be the reason why you observed that the C parquet library you
> mentioned (is it parquet-cpp?) is an order of magnitude faster?
>
> Cheng
>
> On 8/7/15 2:02 AM, Philip Weaver wrote:
>
> With DEBUG, the log output was over 10MB, so I opted for just INFO output.
> The (sanitized) log is attached.
>
> The driver is essentially this code:
>
> info("A")
>
> val t = System.currentTimeMillis
> val df = sqlContext.read.parquet(dir).select(...).cache
>
> val elapsed = System.currentTimeMillis - t
> info(s"Init time: ${elapsed} ms")
>
> We've also observed that it is very slow to read the contents of the
> parquet files. My colleague wrote a PySpark application that gets the list
> of files, parallelizes it, maps across it and reads each file manually
> using a C parquet library, and aggregates manually in the loop. Ignoring
> the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
> query in Scala, his is an order of magnitude faster. Since he is
> parallelizing the work through Spark, and that isn't causing any
> performance issues, it seems to be a problem with the parquet reader. I may
> try to do what he did to construct a DataFrame manually, and see if I can
> query it with Spark SQL with reasonable performance.
>
> - Philip
>
>
> On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian < 
> lian.cs@gmail.com> wrote:
>
>> Would you mind to provide the driver log?
>>
>>
>> On 8/6/15 3:58 PM, Philip Weaver wrote:
>>
>> I built 

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-11 Thread Philip Weaver
Do you think it might be faster to put all the files in one directory but
still partitioned the same way? I don't actually need to filter on the
values of the partition keys, but I need to rely on there be no overlap in
the value of the keys between any two parquet files.

On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver 
wrote:

> Thanks, I also confirmed that the partition discovery is slow by writing a
> non-Spark application that uses the parquet library directly to load that
> partitions.
>
> It's so slow that my colleague's Python application can read the entire
> contents of all the parquet data files faster than my application can even
> discover the partitions!
>
> On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian  wrote:
>
>> However, it's weird that the partition discovery job only spawns 2 tasks.
>> It should use the default parallelism, which is probably 8 according to the
>> logs of the next Parquet reading job. Partition discovery is already done
>> in a distributed manner via a Spark job. But the parallelism is
>> mysteriously low...
>>
>> Cheng
>>
>>
>> On 8/7/15 3:32 PM, Cheng Lian wrote:
>>
>> Hi Philip,
>>
>> Thanks for providing the log file. It seems that most of the time are
>> spent on partition discovery. The code snippet you provided actually issues
>> two jobs. The first one is for listing the input directories to find out
>> all leaf directories (and this actually requires listing all leaf files,
>> because we can only assert that a directory is a leaf one when it contains
>> no sub-directories). Then partition information is extracted from leaf
>> directory paths. This process starts at:
>>
>> 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and
>> directories in parallel under:
>> file:/home/pweaver/work/parquet/day=20150225, …
>>
>> and ends at:
>>
>> 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose
>> tasks have all completed, from pool
>>
>> The actual tasks execution time is about 36s:
>>
>> 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0
>> (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
>> …
>> 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0
>> (TID 0) in 36107 ms on lindevspark5 (1/2)
>>
>> You mentioned that your dataset has about 40,000+ partitions, so there
>> are a lot of leaf directories and files out there. My guess is that the
>> local file system spent lots of time listing FileStatus-es of all these
>> files.
>>
>> I also noticed that Mesos job scheduling takes more time then expected.
>> It is probably because this is the first Spark job executed in the
>> application, and the system is not warmed up yet. For example, there’s a 6s
>> gap between these two adjacent lines:
>>
>> 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2
>> tasks
>> 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now
>> TASK_RUNNING
>>
>> The 2nd Spark job is the real Parquet reading job, and this one actually
>> finishes pretty quickly, only 3s (note that the Mesos job scheduling
>> latency is also included):
>>
>> 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at
>> App.scala:182) with 8 output partitions
>> …
>> 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0
>> (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
>> 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0
>> (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
>> 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0
>> (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
>> …
>> 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0
>> (TID 8) in 1527 ms on lindevspark4 (6/8)
>> 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0
>> (TID 6) in 1533 ms on lindevspark4 (7/8)
>> 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0
>> (TID 9) in 2886 ms on lindevspark5 (8/8)
>>
>> That might be the reason why you observed that the C parquet library you
>> mentioned (is it parquet-cpp?) is an order of magnitude faster?
>>
>> Cheng
>>
>> On 8/7/15 2:02 AM, Philip Weaver wrote:
>>
>> With DEBUG, the log output was over 10MB, so I opted for just INFO
>> output. The (sanitized) log is attached.
>>
>> The driver is essentially this code:
>>
>> info("A")
>>
>> val t = System.currentTimeMillis
>> val df = sqlContext.read.parquet(dir).select(...).cache
>>
>> val elapsed = System.currentTimeMillis - t
>> info(s"Init time: ${elapsed} ms")
>>
>> We've also observed that it is very slow to read the contents of the
>> parquet files. My colleague wrote a PySpark application that gets the list
>> of files, parallelizes it, maps across it and reads each file manually
>> using a C parquet library, and aggregates manually in the loop. Ignoring
>> the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
>> query in Scala, his is an order of magnitude faster. Since he is
>> parallelizing the work throu

RE: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-11 Thread Cheng, Hao
Definitely worth to try. And you can sort the record before writing out, and 
then you will get the parquet files without overlapping keys.
Let us know if that helps.

Hao

From: Philip Weaver [mailto:philip.wea...@gmail.com]
Sent: Wednesday, August 12, 2015 4:05 AM
To: Cheng Lian
Cc: user
Subject: Re: Very high latency to initialize a DataFrame from partitioned 
parquet database.

Do you think it might be faster to put all the files in one directory but still 
partitioned the same way? I don't actually need to filter on the values of the 
partition keys, but I need to rely on there be no overlap in the value of the 
keys between any two parquet files.

On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver 
mailto:philip.wea...@gmail.com>> wrote:
Thanks, I also confirmed that the partition discovery is slow by writing a 
non-Spark application that uses the parquet library directly to load that 
partitions.

It's so slow that my colleague's Python application can read the entire 
contents of all the parquet data files faster than my application can even 
discover the partitions!

On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian 
mailto:lian.cs@gmail.com>> wrote:
However, it's weird that the partition discovery job only spawns 2 tasks. It 
should use the default parallelism, which is probably 8 according to the logs 
of the next Parquet reading job. Partition discovery is already done in a 
distributed manner via a Spark job. But the parallelism is mysteriously low...

Cheng

On 8/7/15 3:32 PM, Cheng Lian wrote:

Hi Philip,

Thanks for providing the log file. It seems that most of the time are spent on 
partition discovery. The code snippet you provided actually issues two jobs. 
The first one is for listing the input directories to find out all leaf 
directories (and this actually requires listing all leaf files, because we can 
only assert that a directory is a leaf one when it contains no 
sub-directories). Then partition information is extracted from leaf directory 
paths. This process starts at:

10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories in 
parallel under: file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
have all completed, from pool

The actual tasks execution time is about 36s:

10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
lindevspark5, PROCESS_LOCAL, 3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+ partitions, so there are a 
lot of leaf directories and files out there. My guess is that the local file 
system spent lots of time listing FileStatus-es of all these files.

I also noticed that Mesos job scheduling takes more time then expected. It is 
probably because this is the first Spark job executed in the application, and 
the system is not warmed up yet. For example, there’s a 6s gap between these 
two adjacent lines:

10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now 
TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and this one actually 
finishes pretty quickly, only 3s (note that the Mesos job scheduling latency is 
also included):

10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182) with 
8 output partitions
…
10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, 
lindevspark4, PROCESS_LOCAL, 2058 bytes)
…
10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0 (TID 8) 
in 1527 ms on lindevspark4 (6/8)
10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 6) 
in 1533 ms on lindevspark4 (7/8)
10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0 (TID 9) 
in 2886 ms on lindevspark5 (8/8)

That might be the reason why you observed that the C parquet library you 
mentioned (is it parquet-cpp?) is an order of magnitude faster?

Cheng

On 8/7/15 2:02 AM, Philip Weaver wrote:
With DEBUG, the log output was over 10MB, so I opted for just INFO output. The 
(sanitized) log is attached.

The driver is essentially this code:

info("A")

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(s"Init time: ${elapsed} ms")

We've also observed that it is very slow to read the contents of the parquet 
files. My colleague wrote a PySpark application that gets the list of files, 
parallelizes it, maps across it and reads each

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-12 Thread Cheng Lian

Hi Philip,

What do you mean by saying "still partitioned the same way"? If you are 
trying to to save the partition columns encoded in partition directories 
directly into Parquet files, and put all Parquet part-files into a 
single directory without creating any intermediate sub-directories, then 
I'd expect it to be much faster. This is at least true for file systems 
like S3, haven't tested listing contents of super wide and flat 
directories (i.e. those containing no sub-directories but a lot of files).


And, as Hao suggested, sorting columns in which you are interested can 
give better performance on read path because of Parquet specific 
optimizations like filter push-down. However, I think in your case, the 
time spent in reading Parquet files is not the bottleneck, according to 
our previous discussion.


Cheng

On 8/12/15 8:56 AM, Cheng, Hao wrote:


Definitely worth to try. And you can sort the record before writing 
out, and then you will get the parquet files without overlapping keys.


Let us know if that helps.

Hao

*From:*Philip Weaver [mailto:philip.wea...@gmail.com]
*Sent:* Wednesday, August 12, 2015 4:05 AM
*To:* Cheng Lian
*Cc:* user
*Subject:* Re: Very high latency to initialize a DataFrame from 
partitioned parquet database.


Do you think it might be faster to put all the files in one directory 
but still partitioned the same way? I don't actually need to filter on 
the values of the partition keys, but I need to rely on there be no 
overlap in the value of the keys between any two parquet files.


On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver <mailto:philip.wea...@gmail.com>> wrote:


Thanks, I also confirmed that the partition discovery is slow by
writing a non-Spark application that uses the parquet library
directly to load that partitions.

It's so slow that my colleague's Python application can read the
entire contents of all the parquet data files faster than my
application can even discover the partitions!

On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian mailto:lian.cs@gmail.com>> wrote:

However, it's weird that the partition discovery job only
spawns 2 tasks. It should use the default parallelism, which
is probably 8 according to the logs of the next Parquet
reading job. Partition discovery is already done in a
distributed manner via a Spark job. But the parallelism is
mysteriously low...

Cheng

On 8/7/15 3:32 PM, Cheng Lian wrote:

Hi Philip,

Thanks for providing the log file. It seems that most of
the time are spent on partition discovery. The code
snippet you provided actually issues two jobs. The first
one is for listing the input directories to find out all
leaf directories (and this actually requires listing all
leaf files, because we can only assert that a directory is
a leaf one when it contains no sub-directories). Then
partition information is extracted from leaf directory
paths. This process starts at:

10:51:44 INFO sources.HadoopFsRelation: Listing leaf
files and directories in parallel under:
file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

10:52:31 INFO scheduler.TaskSchedulerImpl: Removed
TaskSet 0.0, whose tasks have all completed, from pool

The actual tasks execution time is about 36s:

10:51:54 INFO scheduler.TaskSetManager: Starting task
0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL,
3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task
0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+
partitions, so there are a lot of leaf directories and
files out there. My guess is that the local file system
spent lots of time listing FileStatus-es of all these files.

I also noticed that Mesos job scheduling takes more time
then expected. It is probably because this is the first
Spark job executed in the application, and the system is
not warmed up yet. For example, there’s a 6s gap between
these two adjacent lines:

10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task
set 0.0 with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos
task 0 is now TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and
this one actually finishes pretty quickly, only 3s (note
that the Mesos job scheduling latency is also included):

10:52:32 INFO scheduler.DAGScheduler: Got job 1