Re: Spark Streaming with compressed xml files
textFileStream and default fileStream recognizes the compressed xml(.xml.gz) files. Each line in the xml file is an element in RDD[string]. Then whole RDD is converted to a proper xml format data and stored in a *Scala variable*. - I believe storing huge data in a *Scala variable* is inefficient. Is there any alternative processing for xml files? - How to create Spark SQL table with the above xml data? Regards Vijay Innamuri On 16 March 2015 at 12:12, Akhil Das ak...@sigmoidanalytics.com wrote: One approach would be, If you are using fileStream you can access the individual filenames from the partitions and with that filename you can apply your uncompression logic/parsing logic and get it done. Like: UnionPartition upp = (UnionPartition) ds.values().getPartitions()[i]; NewHadoopPartition npp = (NewHadoopPartition) upp.split(); String *fPath* = npp.serializableHadoopSplit().value().toString(); Another approach would be to create a custom inputReader and InpurFormat, then pass it along with your fileStream and within the reader, you do your uncompression/parsing etc. You can also look into XMLInputFormat https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java of mahout. Thanks Best Regards On Mon, Mar 16, 2015 at 11:28 AM, Vijay Innamuri vijay.innam...@gmail.com wrote: Hi All, Processing streaming JSON files with Spark features (Spark streaming and Spark SQL), is very efficient and works like a charm. Below is the code snippet to process JSON files. windowDStream.foreachRDD(IncomingFiles = { val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles); IncomingFilesTable.registerAsTable(IncomingFilesTable); val result = sqlContext.sql(select text from IncomingFilesTable).collect; sc.parallelize(result).saveAsTextFile(filepath); } But, I feel its difficult to use spark features efficiently with streaming xml files (each compressed file would be 4 MB). What is the best approach for processing compressed xml files? Regards Vijay
Re: Spark Streaming with compressed xml files
One approach would be, If you are using fileStream you can access the individual filenames from the partitions and with that filename you can apply your uncompression logic/parsing logic and get it done. Like: UnionPartition upp = (UnionPartition) ds.values().getPartitions()[i]; NewHadoopPartition npp = (NewHadoopPartition) upp.split(); String *fPath* = npp.serializableHadoopSplit().value().toString(); Another approach would be to create a custom inputReader and InpurFormat, then pass it along with your fileStream and within the reader, you do your uncompression/parsing etc. You can also look into XMLInputFormat https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java of mahout. Thanks Best Regards On Mon, Mar 16, 2015 at 11:28 AM, Vijay Innamuri vijay.innam...@gmail.com wrote: Hi All, Processing streaming JSON files with Spark features (Spark streaming and Spark SQL), is very efficient and works like a charm. Below is the code snippet to process JSON files. windowDStream.foreachRDD(IncomingFiles = { val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles); IncomingFilesTable.registerAsTable(IncomingFilesTable); val result = sqlContext.sql(select text from IncomingFilesTable).collect; sc.parallelize(result).saveAsTextFile(filepath); } But, I feel its difficult to use spark features efficiently with streaming xml files (each compressed file would be 4 MB). What is the best approach for processing compressed xml files? Regards Vijay
Spark Streaming with compressed xml files
Hi All, Processing streaming JSON files with Spark features (Spark streaming and Spark SQL), is very efficient and works like a charm. Below is the code snippet to process JSON files. windowDStream.foreachRDD(IncomingFiles = { val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles); IncomingFilesTable.registerAsTable(IncomingFilesTable); val result = sqlContext.sql(select text from IncomingFilesTable).collect; sc.parallelize(result).saveAsTextFile(filepath); } But, I feel its difficult to use spark features efficiently with streaming xml files (each compressed file would be 4 MB). What is the best approach for processing compressed xml files? Regards Vijay