hi,

i have an idea to solve my problem, i want write one file for each spark
partion,
but i not know to get the actuel partion suffix/ID in my call function?

points.foreachPartition(
                new VoidFunction<Iterator<Tuple2<Integer,
GeoTimeDataTupel>>>() {

                    private static final long serialVersionUID =
-7210897529331503565L;

                    public void call(Iterator<Tuple2<Integer,
GeoTimeDataTupel>> entry)throws Exception {
                        while(entry.hasNext()) {
                            Tuple2<Integer, GeoTimeDataTupel> temp =
entry.next();

                            try {
                                FileSystem fs = FileSystem.get(new
URI(pro.getProperty("hdfs.namenode")),new Configuration());
                                Path pt=new
Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
                            }
                            catch(Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
        );

2015-06-09 15:34 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>:

> hi community,
>
> i want append results to one file. if i work local my function build all
> right,
> if i run this on a yarn cluster, i lost same rows.
>
> here my function to write:
>
> points.foreach(
>             new VoidFunction<Tuple2<Integer, GeoTimeDataTupel>>() {
>
>                 private static final long serialVersionUID =
> 2459995649387229261L;
>
>                 public void call(Tuple2<Integer, GeoTimeDataTupel>
> entry)throws Exception {
>                     try {
>                         FileSystem fs = FileSystem.get(new
> URI(pro.getProperty("hdfs.namenode")),new Configuration());
>                         Path pt=new
> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
>
>                         if(fs.exists(pt)) {
>                             FSDataInputStream in = fs.open(pt);
>                             Path pt_temp = new
> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results_temp");
>                             backup(fs.getConf(), fs, in, pt_temp);
>                             in.close();
>
>                             FSDataOutputStream out = fs.create((pt), true);
>                             FSDataInputStream backup = fs.open(pt_temp);
>
>                             int offset = 0;
>                             int bufferSize = 4096;
>
>                             int result = 0;
>
>                             byte[] buffer = new byte[bufferSize];
>                             // pre read a part of content from input stream
>                             result = backup.read(offset, buffer, 0,
> bufferSize);
>                             // loop read input stream until it does not
> fill whole size of buffer
>                             while (result == bufferSize) {
>                                 out.write(buffer);
>                                 // read next segment from input stream by
> moving the offset pointer
>                                 offset += bufferSize;
>                                 result = backup.read(offset, buffer, 0,
> bufferSize);
>                             }
>
>                             if (result > 0 && result < bufferSize) {
>                                 for (int i = 0; i < result; i++) {
>                                     out.write(buffer[i]);
>                                 }
>                             }
>                             out.writeBytes("Cluster: "+entry._1+", Point:
> "+entry._2.toString()+"\n");
>                             out.close();
>                         }
>                         else {
>                             BufferedWriter bw =new BufferedWriter(new
> OutputStreamWriter(fs.create(pt)));
>                             bw.write("Cluster: "+entry._1+", Point:
> "+entry._2.toString()+"\n");
>                             bw.close();
>                         }
>                     } catch (Exception e) {
>                         e.printStackTrace();
>                     }
>                 }
>
>                 public void backup(Configuration conf, FileSystem
> fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception {
>
>                     FSDataOutputStream out = fs.create(pt_temp, true);
>                     IOUtils.copyBytes(sourceContent, out, 4096, false);
>                     out.close();
>                 }
>
> where is my fault?? or give it a function to write(append) to the hadoop
> hdfs?
>
> best regards,
> paul
>

Reply via email to