Hello,

Thank you for your efforts. We've also been trying some things and currently we have implemented a solution that we are satisfied with. It allows us to instantly start merging any indexing job.

It's a single class that wraps an indexing Job and submits it to the JobTracker, asynchronously. It periodically retrieves TaskCompletionEvent events and it merges task outputs that are complete. This effectively means that the job submitter client also is the index merger. It has support for both jobs with and without reduces. It simply copies job output to the local disk in order to merge it. When the job has finished, it optimizes the index and completes the output to the configured filesystem. For more implementation details, see class code listing below. (Compiles to CDH2 hadoop-0.20.1+169.89.)

Regards,
Ferdy.



package mypackage;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;

public class AsyncLuceneMergerJobWrapper {
 private static final Log LOG = LogFactory.getLog(new Object() {
 }.getClass().getEnclosingClass());


 private final Job job;
 private final FileSystem fs;
 private final LocalFileSystem localFs;
 private Path localTemp;
 private Path localParts;
 private Path localMerged;
 private final IndexWriter writer;

public AsyncLuceneMergerJobWrapper(Job job, Analyzer analyzer) throws IOException {
   this.job = job;

   Configuration conf = job.getConfiguration();

   fs = FileSystem.get(conf);
   localFs = FileSystem.getLocal(conf);

localTemp = new Path(conf.get("hadoop.tmp.dir"), "async-merge-"+System.currentTimeMillis()); if (localFs.exists(localTemp)) throw new IllegalStateException("Temporary directory already exists " + localTemp);
   localParts = new Path(localTemp, "parts");
   localFs.mkdirs(localParts);
   localMerged = new Path(localTemp, "merged");


this.writer = new IndexWriter(FSDirectory.open(new File(localMerged.toString())),
       analyzer, true, MaxFieldLength.LIMITED);
 }

 /**
* @return The index writer that is used for merging. Use this method to set
  * merging options.
  *
  */
 public IndexWriter getWriter() {
   return writer;
 }


 /**
* Start the job and merge job outputs (Lucene indices) into a single optimized segment. * It will begin merging from the moment the first task outputs are available.
  *
* It will not delete the job output. The merged output is put into {job_output}/merged
  *
  * @return
  * @throws ClassNotFoundException
  * @throws InterruptedException
  * @throws IOException
  */
public boolean waitForCompletion() throws IOException, InterruptedException, ClassNotFoundException {
   LOG.info("Starting");
Path jobOutput = FileOutputFormat.getOutputPath(job);
   LOG.info("jobOutput="+jobOutput);


   try {
     job.submit();

     int receivedEvents = 0;
boolean allPartsMerged = false;
     while (!allPartsMerged) {
       //merge completed job parts

       List<FSDirectory> dirs = new ArrayList<FSDirectory>();
TaskCompletionEvent[] taskCompletionEvents = job.getTaskCompletionEvents(receivedEvents);
       receivedEvents += taskCompletionEvents.length;
for (TaskCompletionEvent event : taskCompletionEvents) { LOG.info("New event " + event + "; isMapTask=" + event.isMapTask() + " ;idWithinJob=" + event.idWithinJob()); if (TaskCompletionEvent.Status.SUCCEEDED.equals(event.getTaskStatus())) {
           //new part!
//we only want outputting tasks; that is mappers when no reducers or just the reducers.
           if (job.getNumReduceTasks() == 0 || !event.isMapTask()) {
             String partName = getPartname(event);
Path partOutput = new Path(jobOutput, partName); if (!fs.exists(partOutput)) {
               LOG.info("Task had no output, continuing. Event="+event);
               continue;
             }
Path localCopy = new Path(localParts, partName);
             LOG.info("Copying new part " + partOutput);
             fs.copyToLocalFile(partOutput, localCopy);
             LOG.info("Copy done.");
FSDirectory dir = FSDirectory.open(new File(localCopy.toString())); dirs.add(dir);
           }

         }
       }
if (dirs.size() > 0) {
         writer.addIndexesNoOptimize(dirs.toArray(new FSDirectory[0]));
         LOG.info("AddIndexesNoOptimize(dirs) done.");
         for (Directory dir : dirs) {
           dir.close();
         }
}
       //we are done when there are no more events and the job is complete
allPartsMerged = taskCompletionEvents.length == 0 && job.isComplete();
       Thread.sleep(10000);
     }
LOG.info("Completed. Calling optimize()");
     writer.optimize();
     LOG.info("Optimized.");
     writer.close();
     LOG.info("Closed. Completing local output.");
     fs.completeLocalOutput(new Path(jobOutput, "merged"), localMerged);
     LOG.info("Done.");
   } finally {
     if (localFs.exists(localTemp)) {
       localFs.delete(localTemp, true);
     }
   }

   return job.isSuccessful();
 }

 @SuppressWarnings("deprecation")
 private String getPartname(TaskCompletionEvent event) {
//getTaskID() is deprecated because TaskCompletionEvent returns the old TaskAttemptId
   return event.getTaskAttemptId().getTaskID().toString();
 }
}




Shai Erera wrote:
Hi

I've done some work and thought I'd report back the results (that are
not too encouraging).

Approach 1:
* Mappers output a side effect Lucene Directory (written on-disk) and
a <Long, Text> pair where the value is the location of the index on
disk and the key is unimportant for now.
* Reducer merges the on-disk indexes into a larger on-disk index and
optimize() it. Its output is the same as the Mapper.

Pros:
1) Low memory footprint on both Mapper and Reducer side.
2) Hadoop related output is very small - just the location of the
index that needs to be copied around and sort/merged.
3) Utilizes Lucene powerful indexing capabilities by assigning each
Mapper a large portion of text to index.

Cons:
1) no pipelining can be done, because the output of all Mappers is
very small (Hadoop-wise) and thus fits in memory and never flushed.
I've even tried reducing the buffer on the reducer side (played a/ all
sorts of parameters), but this never triggered the Combiner I've set.
2) if the cluster has complex structure, where not all nodes see the
same HD (not in my case), then this approach may fail, because Hadoop
is not aware of the side-effect files and thus cannot pass them to the
Reducer(s).

Approach 2:
* Mappers output a DirectoryWritable, which is a Writable that
(de)serializes a Lucene Directory.
* Reducer merges the incoming DWs into a larger index on-disk (output
is same as above)
* Combiner merges the incoming DWs into a larger DW (not on-disk,
though I've tried both)

Pros:
1) Working w/ a DW is better Hadoop-wise -- the Directories can be
passed around between machines and overcome the 2nd "Con" in Approach
#1.
2) Since the output of the Mappers is large (40 MB in my case), the
Reducer's buffer gets flushed more often, and the Combiner actually
gets to do some work.

Cons:
Because all is done in-memory, I've constantly hit OOM errors on the
Combiner side. I've tried limiting the size of its created directory
(by letting it merge as much N MB before it outputs anything) down to
128MB, but still hit those exceptions. I've even set the child's JVM
options to -Xmx2048m and 4096m, to no avail. The machine has 64GB RAM
(16 cores), so a 4GB RAM should be ok, but still hit the OOM in the
reduce/shuffle/sort phases. Only when I set it to 64MB it worked, but
that's useless because it means each DW is written as is (as they
already come in 40MB size from the Mapper), and not only that I don't
gain anything, I actually lose some (because of all the
(de)serialization done by the Writable).

It would really be great if I can have the Combiner invoked in
Approach #1, but I wasn't able to set its buffer low enough. I don't
mind doing all the merges on-disk (that's how it's done in the final
phase anyway), but I cannot seem to get the Combiner working.

So unless there's an option to tell the Combiner "kick in after N
Mappers finished" or "kick in after VERY SMALL #bytes arrived from
Mappers (in the order of 10/100s bytes)", I'm not sure indexing w/
Lucene and using a Combiner is doable.

Hope you'll find that interesting and that someone knows if/how I can
achieve what I want :-).

Thanks for your help and comments thus far,
Shai

On Thursday, July 29, 2010, Ferdy Galema <ferdy.gal...@kalooga.com> wrote:




Very well. Could you keep us informed on how your instant merging plans
work out? We're actually running a similar indexing process.

It's very interesting to be able to start merging Lucene indexes once
the first mappers have finished, instead of waiting until ALL mappers
have finished.

Shai Erera wrote:

  Well, the current scale does not warrant sharding up
the index. 13GB of data, ~8-10GB index is something a single machine
(even not a strong one) can handle pretty well. And as the data will
grow, so will the number of Mappers, and the number of sub-indexes. So
at some point I will need to merge indexes. Sharding is one of my
strategies, but for the TB scale, not what I'm training on now :).

Thanks,
Shai

  On Thu, Jul 29, 2010 at 1:23 PM, Ferdy
Galema <ferdy.gal...@kalooga.com>
wrote:

    You right about the fact that
merging cannot be done by simply
appending. Have you thought about the possibility to actually take
advantage of fact that your final index will be split into several
segments? Especially if you plan to increase the scale of the input
data, you may eventually want some sort of scaling on the search side
as well. Sharding your index on several machines (there are frameworks
that will help you doing this; e.g. Katta - distributed lucene). You
can even do this on a single machine with a ParallelMultiSearcher,
preferrably if the machine has several disks to spread I/O.

Ofcourse in this case you still have to make sure that the number of
segments won't be too high.



Shai Erera wrote:

      Specifically at the moment, a Mapper output is an
index (Lucene), and the Reducer's job is to take all indexes and merge
them down to 1. Searching on an index w/ hundreds of segments is
inefficient. Basically, if you think of the Reducer as holding the
'final' index (to be passed on in the larger pipeline I have), then as
soon as a Mapper finishes, I'd like to merge its index w/ the
Reducer's. Not all merges will actually be expensive - only every
several such indexes will they be truly merged together.

I cannot use simple merge techniques - must use Lucene's API to merge
the indexes. And the merge is not a simple 'append to current index'
either. So if I could get a hold of some object (code) which gets
executed when Mappers output is ready for the Reducer, then I can have
my merge code executed there. From what I understand so far, it can be
the Combiner, which is what I'm investigating now.

On the multi-level reduce -- if your cluster is big, and Mappers work
on their own hardware (even if some share the same HW), then
multi-level reduce can help. True - you might eventually process more
bytes than you would if you had one Reducer, but (1) that's not always
true, and (2) wall-clock time may be shortened in such a case.

Shai

      On Thu, Jul 29, 2010 at 12:31 PM, Ferdy
Galema <ferdy.gal...@kalooga.com>
wrote:

        Could you elaborate on
how
you merge your data? If you have independent
map tasks with single key-value outputs that can be written as soon as
possible, I'm still curious why need to reduce at all. Surely there
must be a way to merge your data 'on read'.

About multi-level reducing; if you merge your data in multiple steps,
there would only be a gain if the reducers somehow reduce (no pun
intended) the amount of data in the pipeline. For example running 400
maps and 10 reduces followed by another job with a single reducer will
not benefit if the single reducer has to process the same amount of
data that the previous reducers have been outputting. Therefore it
completely depends on what your reducer actually does.

Ferdy.



Shai Erera wrote:

          Yes, I'm aware of the fact that I can run w/ 0
reducers. However, I do need my output to be merged, and unfortunately
the merge is not really that simple that I can use 'getmerge'. I think
I'll give the Combiner a chance now - hopefully its reduce() method
will get called as Mappers finish their work.

BTW, if I'd like to have multi-level Reducers, such that each level
works on less data, do I need to do that using multiple Jobs? I haven't
seen any API to do so, unless I chain mappers/reducers and includes
#levels in the chain. To clarify, if I need to take 400 outputs (or
much more) and merge them down to 1, currently I need to do that
seque


Reply via email to