Hi Subroto,
It's very kind of u to help me.
I really appreciate it.
Now I attach the source code of my program.
It's about find neighbors in a gragh.
Such as gragh " A---B-----C", we define node B is the first neighbor of node A 
while C is the second
The program is to find the first and second neighbors of A.
That's the scenario.
Node A is in the key file and the gragh(s) is in the database file.
Key file is given by users. Each line of key file is a nodename like
A
B
C
Database file is in hdfs servers ( in hard disk). Each line is an edge with 2 
nodes like
A  B  0.5
B  C  0.7
0.5 and 0.7 are weights of the edges.
I can get B according to the key A and get C according to B by mapreduce
But I want to get B and C according to the key A.
Now the program attached can finish my job, however I don't think it's good 
enough.
It's too time cost. It's not the program I wanted.
 
Maybe u can help me.
Thanks very much!

--


Regards!

Jun Tan


At 2011-08-09 19:16:17,"Subroto Sanyal" <subrotosan...@huawei.com> wrote:


Hi Jun

 

I mean that if I get some strings in mapper and I want to use them in reducer.

But they are neither keys nor values.

As per my understanding, there is no such way to pass an arbitrary reference 
from Mapper to Reducer.

The information written in Output from Mapper is available to Reducer.

Further more, I don’t feel it will be good idea to keep such dependency.

Please let me know more about your scenario…may be we/community can suggest 
some solution…

 

By the way, Can reducer get side files in cache?

Please let me know about “Side Files”…..

 

Regards,
Subroto Sanyal

From:谭军 [mailto:tanjun_2...@163.com]
Sent: Tuesday, August 09, 2011 12:25 PM
To:mapreduce-user@hadoop.apache.org;subroto.san...@huawei.com
Subject: Re:RE: Can reducer get parameters from mapper besides key and value?

 

Hi Subroto,

I mean that if I get some strings in mapper and I want to use them in reducer.

But they are neither keys nor values.

 

By the way, Can reducer get side files in cache?

 

--

Regards!

Jun Tan


At 2011-08-09 14:42:10,"Subroto Sanyal" <subrotosan...@huawei.com> wrote:



 

Hi Jun,

 

What is the file, list, string [] in this context?

I mean to say which file or list or string[].

 

The new MR (org.apache.hadoop.mapreduce.*) APIs has a parameter “context”. 
Request you to browse through the APIs of Context (Inherited from 
JobContext->TaskAttemptContext).

The context parameter may provide you the reference you need.

 

Regards,
Subroto Sanyal

From:谭军 [mailto:tanjun_2...@163.com]
Sent: Tuesday, August 09, 2011 11:58 AM
To: mapreduce
Subject: Can reducer get parameters from mapper besides key and value?

 

Hi,

Can reducer gets parameters from mapper besides key and value?

Such as files, lists, string[] etc.

Thanks




--

Regards!

Jun Tan











import java.io.IOException;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import java.net.URI;
import java.net.URISyntaxException;

@SuppressWarnings("deprecation")
public class Retrieval {
        
        public static void main(String[] args) throws IOException, 
URISyntaxException {
                if (args.length != 3) {
                        System.err
                                        .println("Usage: Retrieval <protein set 
path> <database path> <output path>");
                        System.exit(-1);
                }
                        
                JobConf conf = new JobConf(new Configuration(), 
Retrieval.class);
                conf.setJobName("Retrieval");
                DistributedCache.addCacheFile(new URI(args[0]), conf);
                DistributedCache.addCacheFile(new URI(args[1]), conf);
           
                FileInputFormat.addInputPath(conf, new Path(args[1]));
                FileOutputFormat.setOutputPath(conf, new Path(args[2]));

                conf.setMapperClass(RetrievalMapper.class);
                conf.setReducerClass(RetrievalReducer.class);

                conf.setOutputKeyClass(Text.class);
                conf.setOutputValueClass(Text.class);

                JobClient.runJob(conf);
                
        }

}
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

@SuppressWarnings("deprecation")
public class RetrievalMapper extends MapReduceBase implements
                Mapper<LongWritable, Text, Text, Text> {

        private Path[] localFiles;

        public void configure(JobConf conf) {
                try {
                        this.localFiles = 
DistributedCache.getLocalCacheFiles(conf);
                } catch (IOException e) {
                        e.printStackTrace();
                }
        }
        
        public void map(LongWritable key, Text value,
                        OutputCollector<Text, Text> output, Reporter reporter)
                        throws IOException {
        
                String line = value.toString();
                
                LinkedList<String> list = new LinkedList<String>();//store the 
first neighbors
                
                BufferedReader proReader = new BufferedReader(new 
FileReader(this.localFiles[0].toString()));

                String proID = new String("");

                String[] proteinIDs = line.split("\t");
                String tmpString = proteinIDs[0] + "\t" + proteinIDs[1];
                
                // find destination proteins and get their first neighbors
                // first neighbor as the key,line as the value send to reducer
                while ((proID = proReader.readLine()) != null) {// for each 
line (protein ID) in key file
                        
                        if(proID.equalsIgnoreCase(proteinIDs[0])){// hit and 
proteinIDs[1] is its first neighbor
                                output.collect(new Text(tmpString), new 
Text(proteinIDs[2]));
                                list.add(proteinIDs[1]);        // add first 
neighbor to list
                        }

                        if(proID.equalsIgnoreCase(proteinIDs[1])){// hit and 
proteinIDs[0] is its first neighbor
                                output.collect(new Text(tmpString), new 
Text(proteinIDs[2]));
                                list.add(proteinIDs[0]);        // add first 
neighbor to list
                        }

                }
                proReader.close();
        
                // find second neighbors
                @SuppressWarnings("rawtypes")
                Iterator iter = list.iterator();
                String ids;
                while(iter.hasNext()){
                        Object obj = iter.next();
                        proID = obj.toString();
                        String tmp;
                        BufferedReader dbReader = new BufferedReader(new 
FileReader(this.localFiles[1].toString()));
                        while((tmp = dbReader.readLine()) != null){
                                proteinIDs = tmp.split("\t");
                                ids = proteinIDs[0] + "\t" + proteinIDs[1];
                                if(proID.equalsIgnoreCase(proteinIDs[0])){
                                        output.collect(new Text(ids), new 
Text(proteinIDs[2]));
                                }
                                if(proID.equalsIgnoreCase(proteinIDs[1])){
                                        output.collect(new Text(ids), new 
Text(proteinIDs[2]));
                                }
                        }
                        
                        dbReader.close();
                }
        }
}
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

@SuppressWarnings("deprecation")
public class RetrievalReducer extends MapReduceBase implements
                Reducer<Text, Text, Text, Text> {
        
        public void reduce(Text key, Iterator<Text> values, 
OutputCollector<Text, Text> output, Reporter report)
                        throws IOException {
                
                Object obj = null;
                while(values.hasNext()){
                        obj = values.next();
                }
                output.collect(key, (Text) obj);
        }
}

Reply via email to