Hi,

I've spent some time fixing and tuning the FastSegmentMergeTool, and even though the speed improvement is not so dramatic as I originally thought, it is still considerable. It is also using much less disk space for temporary files, and doesn't require indexing segments prior to running it.

The old version has some advantages still - because under the hood it uses the *.main(String[]) methods of all other tools, so it probably offers some degree of resilience to changes in the underlying data formats. On the other hand, the new version works directly with the segment data, so any changes to that format will cause changes in this tool as well... but I'd say that's a small cost for the speed advantage.

Some non-conclusive benchmarks that I've run on my machine, using a collection of 10 segments containing 1.2mln pages in total:

                        pages/s         temp disk space overhead %
SegmentMergeTool        140             ~50%
FastSegmentMergeTool    470             ~20%

In short, I propose to retire the current implementation of SegmentMergeTool in favor of this one (attached is the latest version - it depends on the net.nutch.segment tools I sent to the list earlier, but can be made independent).

The question is what to do with the old version - should I just replace it with this one?

If there are no objections, I will commit it within the next 2 days, with or without the segment tools.

--
Best regards,
Andrzej Bialecki

-------------------------------------------------
Software Architect, System Integration Specialist
CEN/ISSS EC Workshop, ECIMF project chair
EU FP6 E-Commerce Expert/Evaluator
-------------------------------------------------
FreeBSD developer (http://www.freebsd.org)


/* Copyright (c) 2004 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.tools;

import java.io.File;
import java.io.FileFilter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.logging.Logger;

import net.nutch.fetcher.FetcherOutput;
import net.nutch.indexer.IndexSegment;
import net.nutch.io.MD5Hash;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;
import net.nutch.protocol.Content;
import net.nutch.segment.SegmentReader;
import net.nutch.segment.SegmentWriter;
import net.nutch.util.FileUtil;
import net.nutch.util.LogFormatter;

import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.DateField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermDocs;
import org.apache.lucene.index.TermEnum;

/**
 * This class cleans up accumulated segments data, and merges them into a single
 * segment, with no duplicates in it.
 * 
 * <p>
 * Contrary to the SegmentMergeTool there are no prerequisites for its correct
 * operation except for a set of already fetched segments. This tool does not
 * use DeleteDuplicates, but creates its own "master" index of all pages in all
 * segments. Then it walks sequentially through this index and picks up only
 * single pages for every unique value of url or hash.
 * </p>
 * <p>
 * The newly created segment is then optionally indexed, so that it can be
 * either merged with more new segments, or used for searching as it is.
 * </p>
 * <p>
 * Old segments may be optionally removed, because all needed data has already
 * been copied to the new merged segment.
 * </p>
 * <p>
 * You may directly run FastSegmentMergeTool, with all options turned on, i.e.
 * to merge segments into the output segment, index it, and then delete the
 * original segments data.
 * </p>
 * 
 * @author Andrzej Bialecki <[EMAIL PROTECTED]>
 */
public class FastSegmentMergeTool {

  public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.FastSegmentMergeTool");

  public static int LOG_STEP = 20000;
  public static int INDEX_SIZE = 250000;
  public static int INDEX_MERGE_FACTOR = 30;
  public static int INDEX_MIN_MERGE_DOCS = 100;
  
  private String segments = null;

  private String output = null;

  private List segdirs = null;

  private List allsegdirs = null;

  private boolean runIndexer = false;

  private boolean delSegs = false;

  private HashMap readers = new HashMap();

  public FastSegmentMergeTool(String segments, String output, boolean runIndexer, boolean delSegs) throws Exception {
    this.segments = segments;
    this.runIndexer = runIndexer;
    this.delSegs = delSegs;
    File segs = new File(segments);
    if (!segs.exists() || !segs.isDirectory()) throw new Exception("Not a segments dir: " + segs);
    File[] dirs = segs.listFiles(new FileFilter() {
      public boolean accept(File file) {
        if (file.isDirectory()) return true;
        return false;
      }
    });
    allsegdirs = Arrays.asList(dirs);
    this.output = output;
  }

  // Create a new segment name
  private String getSegmentName() {
    return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
  }

  public void run() {
    long start = System.currentTimeMillis();
    try {
      segdirs = new ArrayList();
      // open all segments
      long total = 0L;
      for (int i = 0; i < allsegdirs.size(); i++) {
        File dir = (File) allsegdirs.get(i);
        SegmentReader sr = null;
        try {
          sr = new SegmentReader(dir);
        } catch (Exception e) {
          // this segment is hosed, don't use it
          LOG.warning(" - segment " + dir + " is corrupt, skipping all entries.");
          continue;
        }
        segdirs.add(dir);
        total += sr.size;
        LOG.info("Segment " + dir.getName() + ": " + sr.size + " entries.");
        readers.put(dir.getName(), sr);
      }
      LOG.info("TOTAL " + total + " input entries in " + segdirs.size() + " segments.");
      LOG.info("Creating master index...");
      Vector masters = new Vector();
      File fsmtIndexDir = new File(new File(segments), ".fastmerge_index");
      File masterDir = new File(fsmtIndexDir, "0");
      if (!masterDir.mkdirs()) {
        LOG.severe("Could not create a master index dir: " + masterDir);
        return;
      }
      masters.add(masterDir);
      IndexWriter iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
      iw.setUseCompoundFile(false);
      iw.mergeFactor = INDEX_MERGE_FACTOR;
      iw.minMergeDocs = INDEX_MIN_MERGE_DOCS;
      long s1 = System.currentTimeMillis();
      Iterator it = readers.values().iterator();
      long cnt = 0L;
      while (it.hasNext()) {
        SegmentReader sr = (SegmentReader) it.next();
        String name = sr.segmentDir.getName();
        FetcherOutput fo = new FetcherOutput();
        for (long i = 0; i < sr.size; i++) {
          try {
            if (!sr.get(i, fo, null, null, null)) break;

            Document doc = new Document();
            doc.add(new Field("sd", name + "|" + i, true, false, false));
            doc.add(new Field("uh", MD5Hash.digest(fo.getUrl().toString()).toString(), true, true, false));
            doc.add(new Field("ch", fo.getMD5Hash().toString(), true, true, false));
            doc.add(new Field("time", DateField.timeToString(fo.getFetchDate()), true, false, false));
            iw.addDocument(doc);
            cnt++;
            if (cnt > 0 && (cnt % LOG_STEP == 0)) LOG.info("Processed " + cnt + " entries.");
            if (cnt > 0 && (cnt % INDEX_SIZE == 0)) {
              iw.optimize();
              iw.close();
              LOG.info(" - creating next subindex...");
              masterDir = new File(fsmtIndexDir, "" + masters.size());
              if (!masterDir.mkdirs()) {
                LOG.severe("Could not create a master index dir: " + masterDir);
                return;
              }
              masters.add(masterDir);
              iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
              iw.setUseCompoundFile(false);
              iw.mergeFactor = INDEX_MERGE_FACTOR;
              iw.minMergeDocs = INDEX_MIN_MERGE_DOCS;
            }
          } catch (Throwable t) {
            // we can assume the data is invalid from now on - break here
            LOG.info(" - segment " + name + " truncated to " + (i + 1) + " entries (" +
                    t.getMessage() + ")");
            break;
          }
        }
      }
      iw.optimize();
      LOG.info("* Creating index took " + (System.currentTimeMillis() - s1) + " ms");
      s1 = System.currentTimeMillis();
      // merge all other indexes using the latest IndexWriter (still open):
      if (masters.size() > 1) {
        IndexReader[] ireaders = new IndexReader[masters.size() - 1];
        for (int i = 0; i < masters.size() - 1; i++) ireaders[i] = IndexReader.open((File)masters.get(i));
        iw.addIndexes(ireaders);
        for (int i = 0; i < masters.size() - 1; i++) {
          ireaders[i].close();
          FileUtil.fullyDelete((File)masters.get(i));
        }
      }
      iw.close();
      LOG.info("* Optimizing index took " + (System.currentTimeMillis() - s1) + " ms");
      LOG.info("Removing duplicate entries...");
      IndexReader ir = IndexReader.open(masterDir);
      int outputCnt = 0, i = 0;
      cnt = 0L;
      s1 = System.currentTimeMillis();
      TermEnum te = ir.terms();
      while(te.next()) {
        Term t = te.term();
        if (t == null) continue;
        if (!(t.field().equals("ch") || t.field().equals("uh"))) continue;
        cnt++;
        if (cnt > 0 && (cnt % (LOG_STEP * 2) == 0)) LOG.info("Processed " + cnt/2 + " entries.");
        // Enumerate all docs with the same URL hash or content hash
        TermDocs td = ir.termDocs(t);
        if (td == null) continue;
        int id = -1;
        String time = null;
        Document doc = null;
        // Keep only the latest version of the document with
        // the same hash (url or content). Note: even if the content
        // hash is identical, other metadata may be different, so even
        // in this case it makes sense to keep the latest version.
        while (td.next()) {
          int docid = td.doc();
          if (!ir.isDeleted(docid)) {
            doc = ir.document(docid);
            if (time == null) {
              time = doc.get("time");
              id = docid;
              continue;
            }
            String dtime = doc.get("time");
            // "time" is a DateField, and can be compared lexicographically
            if (dtime.compareTo(time) > 0) {
              if (id != -1) {
                ir.delete(id);
              }
              time = dtime;
              id = docid;
            } else {
              ir.delete(docid);
            }
          }
        }
      }
      //
      // keep the IndexReader open...
      //
      
      LOG.info("* Deduplicating took " + (System.currentTimeMillis() - s1) + " ms");
      File directory = new File(output);
      if (directory.exists() && !directory.isDirectory())
              throw new Exception("Output dir is not a directory: " + directory);

      if (!directory.exists()) directory.mkdirs();
      directory = new File(directory, getSegmentName());
      LOG.info("Merging all segments into " + directory.getName());
      s1 = System.currentTimeMillis();
      directory.mkdirs();
      SegmentWriter sw = new SegmentWriter(directory, true);
      FetcherOutput fo = new FetcherOutput();
      Content co = new Content();
      ParseText pt = new ParseText();
      ParseData pd = new ParseData();

      for (int n = 0; n < ir.maxDoc(); n++) {
        if (ir.isDeleted(n)) {
          //System.out.println("-del");
          continue;
        }
        Document doc = ir.document(n);
        String segDoc = doc.get("sd");
        int idx = segDoc.indexOf('|');
        String segName = segDoc.substring(0, idx);
        String docName = segDoc.substring(idx + 1);
        SegmentReader sr = (SegmentReader) readers.get(segName);
        long docid;
        try {
          docid = Long.parseLong(docName);
        } catch (Exception e) {
          continue;
        }
        i++;
        if (i > 0 && (i % LOG_STEP == 0)) LOG.info("Merged " + i + " entries.");
        try {
          // get data from the reader
          sr.get(docid, fo, co, pt, pd);
        } catch (Throwable thr) {
          // don't break the loop, because only one of the segments
          // may be corrupted...
          LOG.fine(" - corrupt entry no. " + docid + " in segment " + sr.segmentDir.getName() + " - skipping.");
          continue;
        }
        sw.append(fo, co, pt, pd);
        outputCnt++;
      }
      LOG.info("* merging took " + (System.currentTimeMillis() - s1) + " ms");
      ir.close();
      sw.close();
      FileUtil.fullyDelete(fsmtIndexDir);
      for (Iterator iter = readers.keySet().iterator(); iter.hasNext();) {
        SegmentReader sr = (SegmentReader) readers.get(iter.next());
        sr.close();
      }
      if (runIndexer) {
        LOG.info("Creating new segment index...");
        IndexSegment.main(new String[] { directory.toString() });
      }
      if (delSegs) {
        // This deletes also all corrupt segments, which are
        // unusable anyway
        LOG.info("Deleting old segments...");
        for (int k = 0; i < allsegdirs.size(); k++) {
          FileUtil.fullyDelete((File) allsegdirs.get(k));
        }
      }
      long delta = System.currentTimeMillis() - start;
      float eps = (float) total / (float) (delta / 1000);
      LOG.info("DONE segment merging, INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in "
              + ((float) delta / 1000f) + " s (" + eps + " entries/sec).");
    } catch (Exception e) {
      e.printStackTrace();
      LOG.severe(e.getMessage());
    }
  }

  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      System.err.println("Too few arguments.\n");
      usage();
      System.exit(-1);
    }
    boolean runIndexer = false;
    boolean delSegs = false;
    String output = null;
    for (int i = 1; i < args.length; i++) {
      if (args[i].equals("-o")) {
        if (args.length > i + 1) {
          output = args[++i];
          continue;
        } else {
          System.err.println("Required value of '-o' argument missing.\n");
          usage();
          System.exit(-1);
        }
      } else if (args[i].equals("-i"))
        runIndexer = true;
      else if (args[i].equals("-ds")) delSegs = true;
    }
    if (output == null) output = args[0];
    FastSegmentMergeTool st = new FastSegmentMergeTool(args[0], output, runIndexer, delSegs);
    st.run();
  }

  private static void usage() {
    System.err.println("FastSegmentMergeTool <input_segments_dir> [-o <output_segment_dir>] [-i] [-ds]");
    System.err.println("\t<input_segments_dir>\tpath to directory containing\n\t\t\t\tall input segments");
    System.err
            .println("\t-o <output_segment_dir>\t(optional) path to directory which will\n\t\t\t\tcontain a single output segment.\n\t\t\tNOTE: If not present, the original segments path will be used.");
    System.err.println("\t-i\t\t(optional) index the output segment.");
    System.err.println("\t-ds\t\t(optional) delete the original segments when finished.");
    System.err.println();
  }
}

Reply via email to