Another note: for best performance you are going to want your parquet files
to be pretty big (100s of mb).  You could coalesce them and write them out
for more efficient repeat querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> sqlContext.read.parquet
> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
> takes lists of files.
>
> val fileList = sc.textFile("file_list.txt").collect() // this works but
> using spark is possibly overkill
> val dataFrame = sqlContext.read.parquet(fileList: _*)
>
> On Mon, Sep 28, 2015 at 1:35 PM, jwthomas <jordan.tho...@accenture.com>
> wrote:
>
>> We are working with use cases where we need to do batch processing on a
>> large
>> number (hundreds of thousands) of Parquet files.  The processing is quite
>> similar per file.  There are a many aggregates that are very SQL-friendly
>> (computing averages, maxima, minima, aggregations on single columns with
>> some selection criteria).  There are also some processing that is more
>> advanced time-series processing (continuous wavelet transforms and the
>> like).  This all seems like a good use case for Spark.
>>
>> But I'm having performance problems.  Let's take a look at something very
>> simple, which simply checks whether the parquet files are readable.
>>
>> Code that seems natural but doesn't work:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
>> Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1)
>>
>> My understanding is that this doesn't work because sqlContext can't be
>> used
>> inside of a transformation like "map" (or inside an action).  That it only
>> makes sense in the driver.  Thus, it becomes a null reference in the above
>> code, so all reads fail.
>>
>> Code that works:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x
>> =>
>> (x, Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x =>
>> x._1)
>>
>>
>> This works because the collect() means that everything happens back on the
>> driver.  So the sqlContext object makes sense and everything works fine.
>>
>> But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
>> executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
>> minute to execute for 100 parquet files.  Which is too long.  Recall we
>> need
>> to do this across hundreds of thousands of files.
>>
>> I realize it is possible to parallelize after the read:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val intermediate_successes =
>> parquetFiles.collect().map(x => (x,
>> Try(sqlContext.read.parquet(x))))
>> val dist_successes = sc.parallelize(successes) val successes =
>> dist_successes.filter(_._2.isSuccess).map(x => x._1)
>>
>>
>> But this does not improve performance substantially.  It seems the
>> bottleneck is that the reads are happening sequentially.
>>
>> Is there a better way to do this?
>>
>> Thanks,
>> Jordan
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to