hi, i have an idea to solve my problem, i want write one file for each spark partion, but i not know to get the actuel partion suffix/ID in my call function?
points.foreachPartition( new VoidFunction<Iterator<Tuple2<Integer, GeoTimeDataTupel>>>() { private static final long serialVersionUID = -7210897529331503565L; public void call(Iterator<Tuple2<Integer, GeoTimeDataTupel>> entry)throws Exception { while(entry.hasNext()) { Tuple2<Integer, GeoTimeDataTupel> temp = entry.next(); try { FileSystem fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new Configuration()); Path pt=new Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results"); } catch(Exception e) { e.printStackTrace(); } } } } ); 2015-06-09 15:34 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > hi community, > > i want append results to one file. if i work local my function build all > right, > if i run this on a yarn cluster, i lost same rows. > > here my function to write: > > points.foreach( > new VoidFunction<Tuple2<Integer, GeoTimeDataTupel>>() { > > private static final long serialVersionUID = > 2459995649387229261L; > > public void call(Tuple2<Integer, GeoTimeDataTupel> > entry)throws Exception { > try { > FileSystem fs = FileSystem.get(new > URI(pro.getProperty("hdfs.namenode")),new Configuration()); > Path pt=new > Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results"); > > if(fs.exists(pt)) { > FSDataInputStream in = fs.open(pt); > Path pt_temp = new > Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results_temp"); > backup(fs.getConf(), fs, in, pt_temp); > in.close(); > > FSDataOutputStream out = fs.create((pt), true); > FSDataInputStream backup = fs.open(pt_temp); > > int offset = 0; > int bufferSize = 4096; > > int result = 0; > > byte[] buffer = new byte[bufferSize]; > // pre read a part of content from input stream > result = backup.read(offset, buffer, 0, > bufferSize); > // loop read input stream until it does not > fill whole size of buffer > while (result == bufferSize) { > out.write(buffer); > // read next segment from input stream by > moving the offset pointer > offset += bufferSize; > result = backup.read(offset, buffer, 0, > bufferSize); > } > > if (result > 0 && result < bufferSize) { > for (int i = 0; i < result; i++) { > out.write(buffer[i]); > } > } > out.writeBytes("Cluster: "+entry._1+", Point: > "+entry._2.toString()+"\n"); > out.close(); > } > else { > BufferedWriter bw =new BufferedWriter(new > OutputStreamWriter(fs.create(pt))); > bw.write("Cluster: "+entry._1+", Point: > "+entry._2.toString()+"\n"); > bw.close(); > } > } catch (Exception e) { > e.printStackTrace(); > } > } > > public void backup(Configuration conf, FileSystem > fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception { > > FSDataOutputStream out = fs.create(pt_temp, true); > IOUtils.copyBytes(sourceContent, out, 4096, false); > out.close(); > } > > where is my fault?? or give it a function to write(append) to the hadoop > hdfs? > > best regards, > paul >