Hello,
For those interested, you can filter and
search Lucene documents in the reduce.
code:
import java.io.*;
import java.util.*;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.search.Searcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Hits;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
public class ql
{
/**************************************
* Query Lucene using keys.
*
* input:
* java^this page is about java
* ruby^site only mentions rails
* php^another resource about php
* java^ejb3 discussed and spring
* eof^eof
*
* make docs,search,mapreduce
*
* output:
* php^topic^another resource about php
* java^topic^this page is about java
***************************************/
public static class M extends MapReduceBase implements Mapper
{
HashMap hm=new HashMap();
Map group_m=Collections.synchronizedMap(hm);
String ITEM_KEY,BATCH_KEY="";int batch=0;
public void map(WritableComparable wc,Writable w,
OutputCollector out,Reporter rep)throws IOException
{
String ln=((Text)w).toString();
String[] parse_a=ln.split("\\^");
if(batch>(100-1)) // new lucene document group
{out.collect(new Text(BATCH_KEY),new BytesWritable(ob
(group_m)));
BATCH_KEY="BATCH_"+key_maker(String.valueOf
(batch));batch=0;group_m.clear();}
else if(parse_a[0].equals("eof"))out.collect(new Text
(BATCH_KEY),new BytesWritable(ob(group_m)));
else ;
ITEM_KEY="ITEM_"+key_maker(parse_a[0]);
Document single_d=make_lucene_doc(parse_a[0],parse_a[1],ITEM_KEY);
group_m.put(ITEM_KEY,single_d);
batch++;
}
}
public static class R extends MapReduceBase implements Reducer
{
public void reduce(WritableComparable wc,Iterator it,
OutputCollector out,Reporter rep)throws IOException
{
while(it.hasNext())
{
try
{
Map m=(Map)bo(((BytesWritable)it.next()).get());
if(m instanceof Map)
{
try
{
// build temp index
Directory rd=new RAMDirectory();
Analyzer sa=new StandardAnalyzer();
IndexWriter iw=new IndexWriter(rd,sa,true);
// unwrap,cast,send to mem
List keys=new ArrayList(m.keySet());
Iterator itr_u=keys.iterator();
while(itr_u.hasNext())
{
Object k_u=itr_u.next();
Document dtmp=(Document)m.get(k_u);
iw.addDocument(dtmp);
}
iw.optimize();iw.close();
Searcher is=new IndexSearcher(rd);
// simple doc filter
Iterator itr_s=keys.iterator();
while(itr_s.hasNext())
{
Object k_s=itr_s.next();
String tmp_topic=k_s.toString();
TermQuery tq_i=new TermQuery(new Term
("item",tmp_topic.trim()));
// query term from key
tmp_topic=tmp_topic.substring(tmp_topic.lastIndexOf
("_")+1,tmp_topic.length());
TermQuery tq_b=new TermQuery(new Term
("body",tmp_topic));
// search topic with inventory key
BooleanQuery bq=new BooleanQuery();
bq.add(tq_i,BooleanClause.Occur.MUST);
bq.add(tq_b,BooleanClause.Occur.MUST);
Hits h=is.search(bq);
for(int j=0;j<h.length();j++)
{
Document doc=h.doc(j);
String tmp_tpc=doc.get("topic");
String tmp_bdy=doc.get("body");
out.collect(wc,new Text(tmp_tpc+"^topic^"+tmp_bdy));
}
}
keys.clear();is.close();
}
catch(Exception e){System.out.println(e);}
}
}
catch(Exception io){io.printStackTrace();}
}
}
}
public static void main(String[] args)throws Exception
{
String IN_DIR="/opt/hadoop-0.15.0/indir";
String OUT_DIR="/opt/hadoop-0.15.0/outdir";
try
{
JobConf jc=new JobConf(ql.class);
jc.setJobName("ql");
jc.setMapperClass(M.class);
jc.setReducerClass(R.class);
jc.setOutputKeyClass(Text.class);
jc.setOutputValueClass(BytesWritable.class);
jc.setInputPath(new Path(IN_DIR));
jc.setOutputPath(new Path(OUT_DIR));
JobClient.runJob(jc);
}
catch(Exception e){System.out.println(e);}
}
private static Document make_lucene_doc(String in_tpc,String
in_bdy,String in_itm)
{
Document d=new Document();
d.add(new Field
("topic",in_tpc,Field.Store.YES,Field.Index.TOKENIZED));
d.add(new Field
("item",in_itm,Field.Store.NO,Field.Index.UN_TOKENIZED));
d.add(new Field
("body",in_bdy,Field.Store.YES,Field.Index.TOKENIZED));
return d;
}
private static String key_maker(String t)
{
Random r=new Random();
Date d=new Date(System.currentTimeMillis());
return key_filter(Double.toString(r.nextDouble())+d.toString()
+"_"+t);
}
private static String key_filter(String s)
{ // adapted
String rs="";StringBuffer sb=new StringBuffer("");
String
ok="_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
for(int i=0;i<s.length();i++)if(ok.indexOf(s.charAt(i))> -1)
sb.append(s.charAt(i));
rs=(sb.toString()).trim();sb.delete(0,sb.length());return rs;
}
private static Object bo(byte[] b)throws Exception
{ // adapted
ByteArrayInputStream bis=new ByteArrayInputStream(b);
ObjectInputStream ois=new ObjectInputStream(bis);
Object o=ois.readObject();ois.close();bis.close();return o;
}
private static byte[] ob(Object o)
{ // adapted
byte[] b=new String("").getBytes();
ByteArrayOutputStream bos=new ByteArrayOutputStream();
try{ObjectOutputStream os=new ObjectOutputStream(bos);
os.writeObject(o);b=bos.toByteArray();bos.close();os.close();}
catch(IOException e){}return b;
}
}
Regards,
Peter W.