Hi Alexander,

You may want to try the wholeTextFiles() method of SparkContext. Using that
you could just do something like this:

sc.wholeTextFiles("hdfs://input_dir")
>     .saveAsSequenceFile("hdfs://output_dir")


The wholeTextFiles returns a RDD of ((filename, content)).
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

You will not have to worry about managing memory as much with this approach.

-sujit


On Wed, Nov 4, 2015 at 2:12 AM, Alexander Lenz <a...@lenz.tk> wrote:

> Hi colleagues,
>
> In Hadoop I have a lot of folders containing small files. Therefore I am
> reading the content of all folders, union the small files and write the
> unioned data into a single folder
> containing one file. Afterwards I delete the small files and the according
> folders.
>
> I see two possible emerging problems on which I would like to get your
> opinion:
>
> 1.       When reading all the files inside the folders into the master
> program, I think it might appear, that there is such an amount of files
> that the master program will run out of memory?
> To prevent this I thought about checking the file size of the folders and
> only read folders in as long as there is enough memory to handle the amount.
> Do you think that this is a possible solution or is there a better
> solution to handle this problem?
>
> 2.       The other problem is: I am doing a UnionAll to merge all the
> content of the files. In my opinion this will cause that the data needs to
> be brought to a single master and then the data will be unioned there.
> So there might be the same problem, that the application runs out of
> memory.
> My proposed solution would also be to union only if the size does not
> exceed the available memory. Any better solution?
>
> For a better understanding you can have a look at my code at the bottom of
> the mail.
> Would be glad to hear from your experience as I would assume that this
> problem should be a general one.
>
> Thanks & Best, Alex
>
>
>
>
> val sqlContext = new SQLContext(sc)
>
>     //get filesystem
>     val conf = new Configuration()
>     val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"),
> conf)
>
>     //get relevant folders
>     val directoryStatus = fs.listStatus(new Path("hdfs://
> sandbox.hortonworks.com/demo/parquet/staging/"))
>     val latestFolder = directoryStatus.maxBy(x => x.getModificationTime)
>
>     val toWorkFolders = directoryStatus.filter(x => x.getModificationTime
> < latestFolder.getModificationTime)
>
>     //aggregate folder content
>     val parquetFiles = toWorkFolders.map(folder => {
>       sqlContext.read.parquet(folder.getPath.toString)
>     })
>
>     val mergedParquet = parquetFiles.reduce((x, y) => x.unionAll(y))
>
>     mergedParquet.coalesce(1) //Assemble part-files into one partition
>       ..write.mode(SaveMode.Append)
>       ..partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*)
>       ..parquet("hdfs://sandbox.hortonworks.com/demo/parquet/consolidated/
> ")
>
>

Reply via email to