Hello,

For those interested, you can filter and
search Lucene documents in the reduce.

code:

import java.io.*;
import java.util.*;

import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.search.Searcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Hits;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;

public class ql
   {
   /**************************************
   * Query Lucene using keys.
   *
   * input:
   * java^this page is about java
   * ruby^site only mentions rails
   * php^another resource about php
   * java^ejb3 discussed and spring
   * eof^eof
   *
   * make docs,search,mapreduce
   *
   * output:
   * php^topic^another resource about php
   * java^topic^this page is about java
   ***************************************/

   public static class M extends MapReduceBase implements Mapper
      {
      HashMap hm=new HashMap();
      Map group_m=Collections.synchronizedMap(hm);
      String ITEM_KEY,BATCH_KEY="";int batch=0;

      public void map(WritableComparable wc,Writable w,
         OutputCollector out,Reporter rep)throws IOException
         {
         String ln=((Text)w).toString();
         String[] parse_a=ln.split("\\^");

         if(batch>(100-1)) // new lucene document group
{out.collect(new Text(BATCH_KEY),new BytesWritable(ob (group_m))); BATCH_KEY="BATCH_"+key_maker(String.valueOf (batch));batch=0;group_m.clear();} else if(parse_a[0].equals("eof"))out.collect(new Text (BATCH_KEY),new BytesWritable(ob(group_m)));
         else ;

         ITEM_KEY="ITEM_"+key_maker(parse_a[0]);
         Document single_d=make_lucene_doc(parse_a[0],parse_a[1],ITEM_KEY);
         group_m.put(ITEM_KEY,single_d);
         batch++;
         }
      }

   public static class R extends MapReduceBase implements Reducer
      {
      public void reduce(WritableComparable wc,Iterator it,
         OutputCollector out,Reporter rep)throws IOException
         {
         while(it.hasNext())
            {
            try
               {
               Map m=(Map)bo(((BytesWritable)it.next()).get());
               if(m instanceof Map)
                  {
                  try
                     {
                     // build temp index
                     Directory rd=new RAMDirectory();
                     Analyzer sa=new StandardAnalyzer();
                     IndexWriter iw=new IndexWriter(rd,sa,true);

                     // unwrap,cast,send to mem
                     List keys=new ArrayList(m.keySet());
                     Iterator itr_u=keys.iterator();
                     while(itr_u.hasNext())
                        {
                        Object k_u=itr_u.next();
                        Document dtmp=(Document)m.get(k_u);
                        iw.addDocument(dtmp);
                        }

                     iw.optimize();iw.close();
                     Searcher is=new IndexSearcher(rd);

                     // simple doc filter
                     Iterator itr_s=keys.iterator();
                     while(itr_s.hasNext())
                        {
                        Object k_s=itr_s.next();
                        String tmp_topic=k_s.toString();
TermQuery tq_i=new TermQuery(new Term ("item",tmp_topic.trim()));

                        // query term from key
tmp_topic=tmp_topic.substring(tmp_topic.lastIndexOf ("_")+1,tmp_topic.length()); TermQuery tq_b=new TermQuery(new Term ("body",tmp_topic));

                        // search topic with inventory key
                        BooleanQuery bq=new BooleanQuery();
                        bq.add(tq_i,BooleanClause.Occur.MUST);
                        bq.add(tq_b,BooleanClause.Occur.MUST);

                        Hits h=is.search(bq);
                        for(int j=0;j<h.length();j++)
                           {
                           Document doc=h.doc(j);
                           String tmp_tpc=doc.get("topic");
                           String tmp_bdy=doc.get("body");
                           out.collect(wc,new Text(tmp_tpc+"^topic^"+tmp_bdy));
                           }
                        }
                     keys.clear();is.close();
                     }
                  catch(Exception e){System.out.println(e);}
                  }
               }
            catch(Exception io){io.printStackTrace();}
            }
         }
      }

   public static void main(String[] args)throws Exception
      {
      String IN_DIR="/opt/hadoop-0.15.0/indir";
      String OUT_DIR="/opt/hadoop-0.15.0/outdir";

      try
         {
         JobConf jc=new JobConf(ql.class);
         jc.setJobName("ql");
         jc.setMapperClass(M.class);
         jc.setReducerClass(R.class);
         jc.setOutputKeyClass(Text.class);
         jc.setOutputValueClass(BytesWritable.class);
         jc.setInputPath(new Path(IN_DIR));
         jc.setOutputPath(new Path(OUT_DIR));
         JobClient.runJob(jc);
         }
      catch(Exception e){System.out.println(e);}
      }

private static Document make_lucene_doc(String in_tpc,String in_bdy,String in_itm)
      {
      Document d=new Document();
d.add(new Field ("topic",in_tpc,Field.Store.YES,Field.Index.TOKENIZED)); d.add(new Field ("item",in_itm,Field.Store.NO,Field.Index.UN_TOKENIZED)); d.add(new Field ("body",in_bdy,Field.Store.YES,Field.Index.TOKENIZED));
      return d;
      }

   private static String key_maker(String t)
      {
      Random r=new Random();
      Date d=new Date(System.currentTimeMillis());
return key_filter(Double.toString(r.nextDouble())+d.toString() +"_"+t);
      }

   private static String key_filter(String s)
      { // adapted
      String rs="";StringBuffer sb=new StringBuffer("");
String ok="_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; for(int i=0;i<s.length();i++)if(ok.indexOf(s.charAt(i))> -1) sb.append(s.charAt(i));
      rs=(sb.toString()).trim();sb.delete(0,sb.length());return rs;
      }

   private static Object bo(byte[] b)throws Exception
      { // adapted
      ByteArrayInputStream bis=new ByteArrayInputStream(b);
      ObjectInputStream ois=new ObjectInputStream(bis);
      Object o=ois.readObject();ois.close();bis.close();return o;
      }

   private static byte[] ob(Object o)
      { // adapted
      byte[] b=new String("").getBytes();
      ByteArrayOutputStream bos=new ByteArrayOutputStream();
      try{ObjectOutputStream os=new ObjectOutputStream(bos);
      os.writeObject(o);b=bos.toByteArray();bos.close();os.close();}
      catch(IOException e){}return b;
      }
   }

Regards,

Peter W.

Reply via email to