Hi Pedro

These are much more accurate performance numbers

In total I have 5,671,287 rows. Each row was stored in JSON. The JSON is
very complicated and can be upto 4k per row

I randomly picked 30 partitions.

My ³big² files are at most 64M

execution timesrccoalesce(num)file sizenum files
34min 54ss3Falsesmall5,970
32min 24s errors330small5,970
13min 13sHDFSFalsesmall5,970
09min 42sHDFS30small5,970
04min 29sHDFSFalsebig1,046
04min 44sHDFS30big1,046

From:  Andrew Davidson <a...@santacruzintegration.com>
Date:  Friday, July 29, 2016 at 1:04 PM
To:  Andrew Davidson <a...@santacruzintegration.com>, Pedro Rodriguez
<ski.rodrig...@gmail.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  use big files and read from HDFS was: performance problem when
reading lots of small files created by spark streaming.

> Hi Pedro
> 
> I did some experiments. I  using one of our relatively small data set. The
> data set is loaded into 3 or 4 data frames. I then call count()
> 
> 
> Looks like using bigger files and reading from HDFS is a good solution for
> reading data. I guess I¹ll need to do something similar to this to deal with
> S3 write performance
> 
> I think this could probably be tuned up a bit. I randomly choose a max 30
> partitions for each data frame. When I combined files I checked the the
> combined file size was < 64mb (64,000,000) how ever in practice the are bigger
> 
> Andy
> 
> 
> execution timesrcis coalescefile sizenum files
> 39min 44ss3Falsesmall270,518
> 32min 24ss330small270,518
> 3min 09sHDFS30small270,518
> 4min 24sHDFSFalsesmall270,518
> 2min 19sHDFSFalsebig001,046
> 2min 06sHDFS30big001,046
> 
> 
> From:  Andrew Davidson <a...@santacruzintegration.com>
> Date:  Thursday, July 28, 2016 at 8:58 AM
> To:  Pedro Rodriguez <ski.rodrig...@gmail.com>
> Cc:  "user @spark" <user@spark.apache.org>
> Subject:  Re: performance problem when reading lots of small files created by
> spark streaming.
> 
>> Hi Pedro
>> 
>> Thanks for the explanation. I started watching your repo. In the short term I
>> think I am going to try concatenating my small files into 64MB and using
>> HDFS. My spark streaming app is implemented Java and uses data frames. It
>> writes to s3. My batch processing is written in python It reads data into
>> data frames.
>> 
>> Its probably a lot of work to make your solution working in these other
>> contexts.
>> 
>> Here is another use case you might be interested in
>> Writing multiple files to S3 is really slow. It causes a lot of problems for
>> my streaming app. Bad things happen if your processing time exceeds your
>> window length. Our streaming app must save all the input. For each mini batch
>> we split the input into as many as 30 different data sets. Each one needs to
>> be written to S3.
>> 
>> As a temporary work around I use an executor service to try and get more
>> concurrent writes. Ideally the spark frame work would provide support for
>> async IO, and hopefully the S3 performance issue would be improved. Here is
>> my code if you are interested
>> 
>> 
>> public class StreamingKafkaGnipCollector {
>> 
>>     static final int POOL_SIZE = 30;
>> 
>>     static ExecutorService executor =
>> Executors.newFixedThreadPool(POOL_SIZE);
>> 
>> 
>> Š
>> 
>> private static void saveRawInput(SQLContext sqlContext,
>> JavaPairInputDStream<String, String> messages, String outputURIBase) {
>> 
>> JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>,
>> String>() {
>> 
>> private static final long serialVersionUID = 1L;
>> 
>> 
>> 
>> @Override
>> 
>> public String call(Tuple2<String, String> tuple2) {
>> 
>> //logger.warn("TODO _2:{}", tuple2._2);
>> 
>> return tuple2._2();
>> 
>> }
>> 
>> });
>> 
>> 
>> 
>> lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
>> 
>> @Override
>> 
>> public void call(JavaRDD<String> jsonRDD, Time time) throws Exception {
>> 
>> Š
>> // df.write().json("s3://"); is very slow
>> 
>> // run saves concurrently
>> 
>> List<SaveData> saveData = new ArrayList<SaveData>(100);
>> 
>> for (String tag: tags) {
>> 
>> DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>> 
>> String dirPath = createPath(outputURIBase, date, tag, milliSeconds);
>> 
>> saveData.add(new SaveData(saveDF, dirPath));
>> 
>> }
>> 
>> 
>> 
>> saveImpl(saveData, executor); // concurrent writes to S3
>> 
>> }
>> 
>> private void saveImpl(List<SaveData> saveData, ExecutorService executor) {
>> 
>> List<Future<?>> runningThreads = new ArrayList<Future<?>>(POOL_SIZE);
>> 
>> for(SaveData data : saveData) {
>> 
>> SaveWorker worker = new SaveWorker(data);
>> 
>> Future<?> f = executor.submit(worker);
>> 
>> runningThreads.add(f);
>> 
>> }
>> 
>> // wait for all the workers to complete
>> 
>> for (Future<?> worker : runningThreads) {
>> 
>> try {
>> 
>> worker.get();
>> 
>> logger.debug("worker completed");
>> 
>> } catch (InterruptedException e) {
>> 
>> logger.error("", e);
>> 
>> } catch (ExecutionException e) {
>> 
>> logger.error("", e);
>> 
>> }
>> 
>> } 
>> 
>> }
>> 
>> 
>> 
>> static class SaveData {
>> 
>> private DataFrame df;
>> 
>> private String path;
>> 
>> 
>> 
>> SaveData(DataFrame df, String path) {
>> 
>> this.df = df;
>> 
>> this.path = path;
>> 
>> }
>> 
>> }
>> 
>> static class SaveWorker implements Runnable {
>> 
>> SaveData data;
>> 
>> 
>> 
>> public SaveWorker(SaveData data) {
>> 
>> this.data = data;
>> 
>> }
>> 
>> 
>> 
>> @Override
>> 
>> public void run() {
>> 
>> if (data.df.count() >= 1) {
>> 
>> data.df.write().json(data.path);
>> 
>> }
>> 
>> }
>> 
>> }
>> 
>> }
>> 
>> 
>> 
>> From:  Pedro Rodriguez <ski.rodrig...@gmail.com>
>> Date:  Wednesday, July 27, 2016 at 8:40 PM
>> To:  Andrew Davidson <a...@santacruzintegration.com>
>> Cc:  "user @spark" <user@spark.apache.org>
>> Subject:  Re: performance problem when reading lots of small files created by
>> spark streaming.
>> 
>>> There are a few blog posts that detail one possible/likely issue for
>>> example: 
>>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>>> 
>>> TLDR: The hadoop libraries spark uses assumes that its input comes from a
>>> file system (works with HDFS) however S3 is a key value store, not a file
>>> system. Somewhere along the line, this makes things very slow. Below I
>>> describe their approach and a library I am working on to solve this problem.
>>> 
>>> (Much) Longer Version (with a shiny new library in development):
>>> So far in my reading of source code, Hadoop attempts to actually read from
>>> S3 which can be expensive particularly since it does so from a single driver
>>> core (different from listing files, actually reading them, I can find the
>>> source code and link it later if you would like). The concept explained
>>> above is to instead use the AWS sdk to list files then distribute the files
>>> names as a collection with sc.parallelize, then read them in parallel. I
>>> found this worked, but lacking in a few ways so I started this project:
>>> https://github.com/EntilZha/spark-s3
>>> 
>>> This takes that idea further by:
>>> 1. Rather than sc.parallelize, implement the RDD interface where each
>>> partition is defined by the files it needs to read (haven't gotten to
>>> DataFrames yet)
>>> 2. At the driver node, use the AWS SDK to list all the files with their size
>>> (listing is fast), then run the Least Processing Time Algorithm to sift the
>>> files into roughly balanced partitions by size
>>> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
>>> "folder2").regularRDDOperationsHere or import implicits and do
>>> sc.s3.textFileByPrefix
>>> 
>>> At present, I am battle testing and benchmarking it at my current job and
>>> results are promising with significant improvements to jobs dealing with
>>> many files especially many small files and to jobs whose input is unbalanced
>>> to start with. Jobs perform better because: 1) there isn't a long stall at
>>> the driver when hadoop decides how to split S3 files 2) the partitions end
>>> up nearly perfectly balanced because of LPT algorithm.
>>> 
>>> Since I hadn't intended to advertise this quite yet the documentation is not
>>> super polished but exists here:
>>> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context
>>> 
>>> I am completing the sonatype process for publishing artifacts on maven
>>> central (this should be done by tomorrow so referencing
>>> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to
>>> hear if this library solution works, otherwise I hope the blog post above is
>>> illuminating.
>>> 
>>> Pedro
>>> 
>>> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson
>>> <a...@santacruzintegration.com> wrote:
>>>> I have a relatively small data set however it is split into many small JSON
>>>> files. Each file is between maybe 4K and 400K
>>>> This is probably a very common issue for anyone using spark streaming. My
>>>> streaming app works fine, how ever my batch application takes several hours
>>>> to run. 
>>>> 
>>>> All I am doing is calling count(). Currently I am trying to read the files
>>>> from s3. When I look at the app UI it looks like spark is blocked probably
>>>> on IO? Adding additional workers and memory does not improve performance.
>>>> 
>>>> I am able to copy the files from s3 to a worker relatively quickly. So I do
>>>> not think s3 read time is the problem.
>>>> 
>>>> In the past when I had similar data sets stored on HDFS I was able to use
>>>> coalesce() to reduce the number of partition from 200K to 30. This made a
>>>> big improvement in processing time. How ever when I read from s3 coalesce()
>>>> does not improve performance.
>>>> 
>>>> I tried copying the files to a normal file system and then using Œhadoop fs
>>>> put¹ to copy the files to hdfs how ever this takes several hours and is no
>>>> where near completion. It appears hdfs does not deal with small files well.
>>>> 
>>>> I am considering copying the files from s3 to a normal file system on one
>>>> of my workers and then concatenating the files into a few much large files,
>>>> then using Œhadoop fs put¹ to move them to hdfs. Do you think this would
>>>> improve the spark count() performance issue?
>>>> 
>>>> Does anyone know of heuristics for determining the number or size of the
>>>> concatenated files?
>>>> 
>>>> Thanks in advance
>>>> 
>>>> Andy
>>> 
>>> 
>>> 
>>> -- 
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>> 
>>> ski.rodrig...@gmail.com | pedrorodriguez.io <http://pedrorodriguez.io>  |
>>> 909-353-4423
>>> Github: github.com/EntilZha <http://github.com/EntilZha>  | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>> 
> 

Reply via email to