Hi Anna,
 
When I wrote that mod if was specifically so that rowid would output multiple 
files (instead of one Matrix file).  This was done so the LDA clustering I was 
using would run multiple mappers.  The rowid code (including the mod) still 
just runs locally.  I have not done much with it as of late but had modified 
the code below to support another parameter (-m) enabling one to split the data 
up even more by max number of vectors per output file so that lots of mappers 
could run for a program like LDA.  Setting -m to -1 makes rowid work the 
original way (one output Matrix file).
 
Dan
 

________________________________
 From: Anna Lahoud <annalah...@gmail.com>
To: user@mahout.apache.org 
Sent: Friday, August 17, 2012 1:09 PM
Subject: Re: More mappers in RowId
  
I have spent some time trying out the RowID mod that Dan posted. But I am
not seeing any mappers run to produce the output. It does produce multiple
outputs, but the work appears to be done locally.

Is there another piece to the puzzle that I need to distribute this job to
the Hadoop nodes as a map task?


On Tue, Jun 5, 2012 at 11:13 AM, Pat Ferrel <p...@occamsmachete.com> wrote:

> RowID+RowSimilarity are by far the slowest of the stops in my particular
> pipeline, which includes crawl, seq2sparse, and cluster.
>
> What is the status of more parallelization of these? Dan offers a mod. Has
> this been tested with rowsimilarity?
>
> I'd be happy to give it a try but am having trouble with clustering in the
> trunk right now.
>
> On 6/4/12 4:20 AM, DAN HELM wrote:
>
>>   I also developed a quick and dirty rowid mod so that it generates one
>> martrix output file for each "part" input file from seq2sparse command --
>> so multiple mappers are used to process output from rowid (such as when
>> running CVB clustering on rowid output).  For that I just made a derivative
>> rowid command called rowiddist.  So the final solution should be logic
>> rolled into a single rowid command, where the run type (multiple file
>> output vs single matrix file) is driven by parameters.  The code below was
>> just a quick "solution".
>>   **** code follows ****
>> package home.mahout;
>> import com.google.common.io.**Closeables;
>> import org.apache.hadoop.conf.**Configuration;
>> import org.apache.hadoop.fs.**FileStatus;
>> import org.apache.hadoop.fs.**FileSystem;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.**IntWritable;
>> import org.apache.hadoop.io.**SequenceFile;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.util.**ToolRunner;
>> import org.apache.mahout.common.**AbstractJob;
>> import org.apache.mahout.common.Pair;
>> import org.apache.mahout.common.**iterator.sequencefile.**PathFilters;
>> import org.apache.mahout.common.**iterator.sequencefile.**PathType;
>> import org.apache.mahout.common.**iterator.sequencefile.**
>> SequenceFileDirIterable;
>> import org.apache.mahout.math.**VectorWritable;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> import java.util.Map;
>>   public class RowIdJobDistributed extends AbstractJob {
>>    private static final Logger log = LoggerFactory.getLogger(**
>> RowIdJobDistributed.class);
>>    @Override
>>    public int run(String[] args) throws Exception {
>>      addInputOption();
>>      addOutputOption();
>>      Map<String,String>  parsedArgs = parseArguments(args);
>>      if (parsedArgs == null) {
>>        return -1;
>>      }
>>      Configuration conf = getConf();
>>      FileSystem fs = FileSystem.get(conf);
>>      Path outputPath = getOutputPath();
>>      Path indexPath = new Path(outputPath, "docIndex");
>>      SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs,
>>                                                                  conf,
>>
>>  indexPath,
>>
>>  IntWritable.class,
>>
>>  Text.class);
>>      FileStatus fstatus[] = fs.listStatus(getInputPath());
>>        try {
>>        IntWritable docId = new IntWritable();
>>        int i = 0;
>>        int numCols = 0;
>>        int matrixCount = 0;
>>              for ( FileStatus f: fstatus ) {
>>         if (f.isDir()) {
>>        continue;
>>       }            Path inPath = new Path(f.getPath().toUri().**
>> toString());
>>       Path matrixPath = new Path(outputPath, "matrix-"+matrixCount);
>>             SequenceFile.Writer matrixWriter =
>> SequenceFile.createWriter(fs,
>>
>> conf,
>>
>> matrixPath,
>>
>> IntWritable.class,
>>
>> VectorWritable.class);
>>                for (Pair<Text,VectorWritable>  record :
>>               new SequenceFileDirIterable<Text,**VectorWritable>(inPath,
>>
>>  PathType.LIST,
>>
>>  PathFilters.logsCRCFilter(),
>>                                                                null,
>>                                                                true,
>>                                                                conf)) {
>>            VectorWritable value = record.getSecond();
>>            docId.set(i);
>>            indexWriter.append(docId, record.getFirst());
>>            matrixWriter.append(docId, value);
>>            i++;
>>            numCols = value.get().size();
>>          }
>>                  Closeables.closeQuietly(**matrixWriter);
>>          matrixCount++;
>>                }
>>        log.info("Wrote out matrix with {} rows and {} columns to {}",
>> new Object[] { i, numCols, outputPath });
>>        return 0;
>>      } finally {
>>        Closeables.closeQuietly(**indexWriter);
>>      }
>>    }
>>    public static void main(String[] args) throws Exception {
>>      ToolRunner.run(new RowIdJobDistributed(), args);
>>    }
>> }
>>
>>
>>
>> ______________________________**__
>>   From: Suneel Marthi<suneel_mar...@yahoo.com**>
>> To: "user@mahout.apache.org"<user@**mahout.apache.org<user@mahout.apache.org>
>> >
>> Sent: Monday, June 4, 2012 1:39 AM
>> Subject: Questions on RowId job
>>
>>
>> 3. There was an issue that was brought up on these forums sometime last
>> week to which Jake's response was that it would be a welcome patch to be
>> able to configure the RowId Job to run in a distributed mode with many
>> mappers. I agree that this is a useful change. This change means that the
>> RowSimilarity Job which takes as inputs, the matrix and the number of
>> columns generated by the RowId job needs to be tested to ensure that
>> nothing's broken.
>>
>> Should I go ahead and open a Jira for the above issues?
>>
>>
>> Regards,
>> Suneel
>>
>

Reply via email to