Hi colleagues, 
In Hadoop I have a lot of folders containing small files. Therefore I am 
reading the content of all folders, union the small files and write the unioned 
data into a single folder
containing one file. Afterwards I delete the small files and the according 
folders.

I see two possible emerging problems on which I would like to get your opinion:

1.       When reading all the files inside the folders into the master program, 
I think it might appear, that there is such an amount of files that the master 
program will run out of memory?
To prevent this I thought about checking the file size of the folders and only 
read folders in as long as there is enough memory to handle the amount.
Do you think that this is a possible solution or is there a better solution to 
handle this problem?

2.       The other problem is: I am doing a UnionAll to merge all the content 
of the files. In my opinion this will cause that the data needs to be brought 
to a single master and then the data will be unioned there.
So there might be the same problem, that the application runs out of memory.
My proposed solution would also be to union only if the size does not exceed 
the available memory. Any better solution?

For a better understanding you can have a look at my code at the bottom of the 
mail.
Would be glad to hear from your experience as I would assume that this problem 
should be a general one.

Thanks & Best, Alex




val sqlContext = new SQLContext(sc)

//get filesystem
val conf = new Configuration()
val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"), conf)

//get relevant folders
val directoryStatus = fs.listStatus(new 
Path("hdfs://sandbox.hortonworks.com/demo/parquet/staging/"))
val latestFolder = directoryStatus.maxBy(x => x.getModificationTime)

val toWorkFolders = directoryStatus.filter(x => x.getModificationTime < 
latestFolder.getModificationTime)

//aggregate folder content
val parquetFiles = toWorkFolders.map(folder => {
sqlContext.read.parquet(folder.getPath.toString)
})

val mergedParquet = parquetFiles.reduce((x, y) => x.unionAll(y))

mergedParquet.coalesce(1) //Assemble part-files into one partition
..write.mode(SaveMode.Append)
..partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*)
..parquet("hdfs://sandbox.hortonworks.com/demo/parquet/consolidated/")

Reply via email to