Hi Eugene, in my case the list of values that I want to sort and write to a separate file, its fairly small so the way I solved it is the following:
.groupByKey().foreach(e => { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig); val newPath = rootPath+"/"+e._1; val dstream = hdfs.create(new Path(newPath)); val bstream = new BufferedOutputStream(dstream, 100 * 1024) val writer = new PrintWriter(bstream) e._2.toList.sortBy(_._1).foreach(sub => { writer.println(Utils.getDateStr(sub._1)+","+sub._2+","+sub._3); }) writer.flush() writer.close(); }) Not sure what I changed to the way I write to HDFS, but this approach worked. Thanks a lot! On 13 August 2015 at 01:06, Eugene Morozov <fathers...@list.ru> wrote: > Yiannis, > > sorry for late response, > It is indeed not possible to create new RDD inside of foreachPartitions, > so you have to write data manually. I haven’t tried that and haven’t got > such an exception, but I’d assume you might try to write locally and them > upload it into HDFS. FileSystem has a specific method for that > “copyFromLocalFile”. > > Another approach would be to try to split RDD into multiple RDDs by key. > You can get distinct keys, collect them on driver and have a loop over they > keys and filter out new RDD out of the original one by that key. > > for( key : keys ) { > RDD.filter( key ).saveAsTextfile() > } > > It might help to cache original rdd. > > On 16 Jul 2015, at 12:21, Yiannis Gkoufas <johngou...@gmail.com> wrote: > > Hi Eugene, > > thanks for your response! > Your recommendation makes sense, that's what I more or less tried. > The problem that I am facing is that inside foreachPartition() I cannot > create a new rdd and use saveAsTextFile. > It would probably make sense to write directly to HDFS using the Java API. > When I tried that I was getting errors similar to this: > > Failed on local exception: java.io.InterruptedIOException: Interruped > while waiting for IO on channel java.nio.channels.SocketChannel > > Probably it's hitting a race condition. > > Has anyone else faced this situation? Any suggestions? > > Thanks a lot! > > On 15 July 2015 at 14:04, Eugene Morozov <fathers...@list.ru> wrote: > >> Yiannis , >> >> It looks like you might explore other approach. >> >> sc.textFile("input/path") >> .map() // your own implementation >> .partitionBy(new HashPartitioner(num)) >> .groupBy() //your own implementation, as a result - PairRDD of key vs >> Iterable of values >> .foreachPartition() >> >> On the last step you could sort all values for the key and store them >> into separate file even into the same directory of all other files for >> other keys. >> HashParititoner must guarantee that all values for specific key will >> reside in just one partition, but it might happen that one partition might >> contain more, than one key (with values). This I’m not sure, but that >> shouldn’t be a big deal as you would iterate over tuple<key, >> Iterable<value>> and store one key to a specific file. >> >> On 15 Jul 2015, at 03:23, Yiannis Gkoufas <johngou...@gmail.com> wrote: >> >> Hi there, >> >> I have been using the approach described here: >> >> >> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job >> >> In addition to that, I was wondering if there is a way to set the >> customize the order of those values contained in each file. >> >> Thanks a lot! >> >> >> Eugene Morozov >> fathers...@list.ru >> >> >> >> >> > > Eugene Morozov > fathers...@list.ru > > > > >