Hi Eugene,

in my case the list of values that I want to sort and write to a separate
file, its fairly small so the way I solved it is the following:

.groupByKey().foreach(e => {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig);
  val newPath = rootPath+"/"+e._1;
  val dstream = hdfs.create(new Path(newPath));
  val bstream = new BufferedOutputStream(dstream, 100 * 1024)
  val writer = new PrintWriter(bstream)
  e._2.toList.sortBy(_._1).foreach(sub => {
    writer.println(Utils.getDateStr(sub._1)+","+sub._2+","+sub._3);
  })
  writer.flush()
  writer.close();
})


Not sure what I changed to the way I write to HDFS, but this approach worked.


Thanks a lot!


On 13 August 2015 at 01:06, Eugene Morozov <fathers...@list.ru> wrote:

> Yiannis,
>
> sorry for late response,
> It is indeed not possible to create new RDD inside of foreachPartitions,
> so you have to write data manually. I haven’t tried that and haven’t got
> such an exception, but I’d assume you might try to write locally and them
> upload it into HDFS. FileSystem has a specific method for that
> “copyFromLocalFile”.
>
> Another approach would be to try to split RDD into multiple RDDs by key.
> You can get distinct keys, collect them on driver and have a loop over they
> keys and filter out new RDD out of the original one by that key.
>
> for( key : keys ) {
>     RDD.filter( key ).saveAsTextfile()
> }
>
> It might help to cache original rdd.
>
> On 16 Jul 2015, at 12:21, Yiannis Gkoufas <johngou...@gmail.com> wrote:
>
> Hi Eugene,
>
> thanks for your response!
> Your recommendation makes sense, that's what I more or less tried.
> The problem that I am facing is that inside foreachPartition() I cannot
> create a new rdd and use saveAsTextFile.
> It would probably make sense to write directly to HDFS using the Java API.
> When I tried that I was getting errors similar to this:
>
> Failed on local exception: java.io.InterruptedIOException: Interruped
> while waiting for IO on channel java.nio.channels.SocketChannel
>
> Probably it's hitting a race condition.
>
> Has anyone else faced this situation? Any suggestions?
>
> Thanks a lot!
>
> On 15 July 2015 at 14:04, Eugene Morozov <fathers...@list.ru> wrote:
>
>> Yiannis ,
>>
>> It looks like you might explore other approach.
>>
>> sc.textFile("input/path")
>> .map() // your own implementation
>> .partitionBy(new HashPartitioner(num))
>> .groupBy() //your own implementation, as a result - PairRDD of key vs
>> Iterable of values
>> .foreachPartition()
>>
>> On the last step you could sort all values for the key and store them
>> into separate file even into the same directory of all other files for
>> other keys.
>> HashParititoner must guarantee that all values for specific key will
>> reside in just one partition, but it might happen that one partition might
>> contain more, than one key (with values). This I’m not sure, but that
>> shouldn’t be a big deal as you would iterate over tuple<key,
>> Iterable<value>> and store one key to a specific file.
>>
>> On 15 Jul 2015, at 03:23, Yiannis Gkoufas <johngou...@gmail.com> wrote:
>>
>> Hi there,
>>
>> I have been using the approach described here:
>>
>>
>> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>>
>> In addition to that, I was wondering if there is a way to set the
>> customize the order of those values contained in each file.
>>
>> Thanks a lot!
>>
>>
>> Eugene Morozov
>> fathers...@list.ru
>>
>>
>>
>>
>>
>
> Eugene Morozov
> fathers...@list.ru
>
>
>
>
>

Reply via email to