Hi colleagues, In Hadoop I have a lot of folders containing small files. Therefore I am reading the content of all folders, union the small files and write the unioned data into a single folder containing one file. Afterwards I delete the small files and the according folders.
I see two possible emerging problems on which I would like to get your opinion: 1. When reading all the files inside the folders into the master program, I think it might appear, that there is such an amount of files that the master program will run out of memory? To prevent this I thought about checking the file size of the folders and only read folders in as long as there is enough memory to handle the amount. Do you think that this is a possible solution or is there a better solution to handle this problem? 2. The other problem is: I am doing a UnionAll to merge all the content of the files. In my opinion this will cause that the data needs to be brought to a single master and then the data will be unioned there. So there might be the same problem, that the application runs out of memory. My proposed solution would also be to union only if the size does not exceed the available memory. Any better solution? For a better understanding you can have a look at my code at the bottom of the mail. Would be glad to hear from your experience as I would assume that this problem should be a general one. Thanks & Best, Alex val sqlContext = new SQLContext(sc) //get filesystem val conf = new Configuration() val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"), conf) //get relevant folders val directoryStatus = fs.listStatus(new Path("hdfs://sandbox.hortonworks.com/demo/parquet/staging/")) val latestFolder = directoryStatus.maxBy(x => x.getModificationTime) val toWorkFolders = directoryStatus.filter(x => x.getModificationTime < latestFolder.getModificationTime) //aggregate folder content val parquetFiles = toWorkFolders.map(folder => { sqlContext.read.parquet(folder.getPath.toString) }) val mergedParquet = parquetFiles.reduce((x, y) => x.unionAll(y)) mergedParquet.coalesce(1) //Assemble part-files into one partition ..write.mode(SaveMode.Append) ..partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*) ..parquet("hdfs://sandbox.hortonworks.com/demo/parquet/consolidated/")