Hi Subroto, It's very kind of u to help me. I really appreciate it. Now I attach the source code of my program. It's about find neighbors in a gragh. Such as gragh " A---B-----C", we define node B is the first neighbor of node A while C is the second The program is to find the first and second neighbors of A. That's the scenario. Node A is in the key file and the gragh(s) is in the database file. Key file is given by users. Each line of key file is a nodename like A B C Database file is in hdfs servers ( in hard disk). Each line is an edge with 2 nodes like A B 0.5 B C 0.7 0.5 and 0.7 are weights of the edges. I can get B according to the key A and get C according to B by mapreduce But I want to get B and C according to the key A. Now the program attached can finish my job, however I don't think it's good enough. It's too time cost. It's not the program I wanted. Maybe u can help me. Thanks very much!
-- Regards! Jun Tan At 2011-08-09 19:16:17,"Subroto Sanyal" <subrotosan...@huawei.com> wrote: Hi Jun I mean that if I get some strings in mapper and I want to use them in reducer. But they are neither keys nor values. As per my understanding, there is no such way to pass an arbitrary reference from Mapper to Reducer. The information written in Output from Mapper is available to Reducer. Further more, I don’t feel it will be good idea to keep such dependency. Please let me know more about your scenario…may be we/community can suggest some solution… By the way, Can reducer get side files in cache? Please let me know about “Side Files”….. Regards, Subroto Sanyal From:谭军 [mailto:tanjun_2...@163.com] Sent: Tuesday, August 09, 2011 12:25 PM To:mapreduce-user@hadoop.apache.org;subroto.san...@huawei.com Subject: Re:RE: Can reducer get parameters from mapper besides key and value? Hi Subroto, I mean that if I get some strings in mapper and I want to use them in reducer. But they are neither keys nor values. By the way, Can reducer get side files in cache? -- Regards! Jun Tan At 2011-08-09 14:42:10,"Subroto Sanyal" <subrotosan...@huawei.com> wrote: Hi Jun, What is the file, list, string [] in this context? I mean to say which file or list or string[]. The new MR (org.apache.hadoop.mapreduce.*) APIs has a parameter “context”. Request you to browse through the APIs of Context (Inherited from JobContext->TaskAttemptContext). The context parameter may provide you the reference you need. Regards, Subroto Sanyal From:谭军 [mailto:tanjun_2...@163.com] Sent: Tuesday, August 09, 2011 11:58 AM To: mapreduce Subject: Can reducer get parameters from mapper besides key and value? Hi, Can reducer gets parameters from mapper besides key and value? Such as files, lists, string[] etc. Thanks -- Regards! Jun Tan
import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import java.net.URI; import java.net.URISyntaxException; @SuppressWarnings("deprecation") public class Retrieval { public static void main(String[] args) throws IOException, URISyntaxException { if (args.length != 3) { System.err .println("Usage: Retrieval <protein set path> <database path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(new Configuration(), Retrieval.class); conf.setJobName("Retrieval"); DistributedCache.addCacheFile(new URI(args[0]), conf); DistributedCache.addCacheFile(new URI(args[1]), conf); FileInputFormat.addInputPath(conf, new Path(args[1])); FileOutputFormat.setOutputPath(conf, new Path(args[2])); conf.setMapperClass(RetrievalMapper.class); conf.setReducerClass(RetrievalReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); JobClient.runJob(conf); } }
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; @SuppressWarnings("deprecation") public class RetrievalMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { private Path[] localFiles; public void configure(JobConf conf) { try { this.localFiles = DistributedCache.getLocalCacheFiles(conf); } catch (IOException e) { e.printStackTrace(); } } public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String line = value.toString(); LinkedList<String> list = new LinkedList<String>();//store the first neighbors BufferedReader proReader = new BufferedReader(new FileReader(this.localFiles[0].toString())); String proID = new String(""); String[] proteinIDs = line.split("\t"); String tmpString = proteinIDs[0] + "\t" + proteinIDs[1]; // find destination proteins and get their first neighbors // first neighbor as the key,line as the value send to reducer while ((proID = proReader.readLine()) != null) {// for each line (protein ID) in key file if(proID.equalsIgnoreCase(proteinIDs[0])){// hit and proteinIDs[1] is its first neighbor output.collect(new Text(tmpString), new Text(proteinIDs[2])); list.add(proteinIDs[1]); // add first neighbor to list } if(proID.equalsIgnoreCase(proteinIDs[1])){// hit and proteinIDs[0] is its first neighbor output.collect(new Text(tmpString), new Text(proteinIDs[2])); list.add(proteinIDs[0]); // add first neighbor to list } } proReader.close(); // find second neighbors @SuppressWarnings("rawtypes") Iterator iter = list.iterator(); String ids; while(iter.hasNext()){ Object obj = iter.next(); proID = obj.toString(); String tmp; BufferedReader dbReader = new BufferedReader(new FileReader(this.localFiles[1].toString())); while((tmp = dbReader.readLine()) != null){ proteinIDs = tmp.split("\t"); ids = proteinIDs[0] + "\t" + proteinIDs[1]; if(proID.equalsIgnoreCase(proteinIDs[0])){ output.collect(new Text(ids), new Text(proteinIDs[2])); } if(proID.equalsIgnoreCase(proteinIDs[1])){ output.collect(new Text(ids), new Text(proteinIDs[2])); } } dbReader.close(); } } }
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; @SuppressWarnings("deprecation") public class RetrievalReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter report) throws IOException { Object obj = null; while(values.hasNext()){ obj = values.next(); } output.collect(key, (Text) obj); } }