Hi Anna, When I wrote that mod if was specifically so that rowid would output multiple files (instead of one Matrix file). This was done so the LDA clustering I was using would run multiple mappers. The rowid code (including the mod) still just runs locally. I have not done much with it as of late but had modified the code below to support another parameter (-m) enabling one to split the data up even more by max number of vectors per output file so that lots of mappers could run for a program like LDA. Setting -m to -1 makes rowid work the original way (one output Matrix file). Dan
________________________________ From: Anna Lahoud <annalah...@gmail.com> To: user@mahout.apache.org Sent: Friday, August 17, 2012 1:09 PM Subject: Re: More mappers in RowId I have spent some time trying out the RowID mod that Dan posted. But I am not seeing any mappers run to produce the output. It does produce multiple outputs, but the work appears to be done locally. Is there another piece to the puzzle that I need to distribute this job to the Hadoop nodes as a map task? On Tue, Jun 5, 2012 at 11:13 AM, Pat Ferrel <p...@occamsmachete.com> wrote: > RowID+RowSimilarity are by far the slowest of the stops in my particular > pipeline, which includes crawl, seq2sparse, and cluster. > > What is the status of more parallelization of these? Dan offers a mod. Has > this been tested with rowsimilarity? > > I'd be happy to give it a try but am having trouble with clustering in the > trunk right now. > > On 6/4/12 4:20 AM, DAN HELM wrote: > >> I also developed a quick and dirty rowid mod so that it generates one >> martrix output file for each "part" input file from seq2sparse command -- >> so multiple mappers are used to process output from rowid (such as when >> running CVB clustering on rowid output). For that I just made a derivative >> rowid command called rowiddist. So the final solution should be logic >> rolled into a single rowid command, where the run type (multiple file >> output vs single matrix file) is driven by parameters. The code below was >> just a quick "solution". >> **** code follows **** >> package home.mahout; >> import com.google.common.io.**Closeables; >> import org.apache.hadoop.conf.**Configuration; >> import org.apache.hadoop.fs.**FileStatus; >> import org.apache.hadoop.fs.**FileSystem; >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.io.**IntWritable; >> import org.apache.hadoop.io.**SequenceFile; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.util.**ToolRunner; >> import org.apache.mahout.common.**AbstractJob; >> import org.apache.mahout.common.Pair; >> import org.apache.mahout.common.**iterator.sequencefile.**PathFilters; >> import org.apache.mahout.common.**iterator.sequencefile.**PathType; >> import org.apache.mahout.common.**iterator.sequencefile.** >> SequenceFileDirIterable; >> import org.apache.mahout.math.**VectorWritable; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> import java.util.Map; >> public class RowIdJobDistributed extends AbstractJob { >> private static final Logger log = LoggerFactory.getLogger(** >> RowIdJobDistributed.class); >> @Override >> public int run(String[] args) throws Exception { >> addInputOption(); >> addOutputOption(); >> Map<String,String> parsedArgs = parseArguments(args); >> if (parsedArgs == null) { >> return -1; >> } >> Configuration conf = getConf(); >> FileSystem fs = FileSystem.get(conf); >> Path outputPath = getOutputPath(); >> Path indexPath = new Path(outputPath, "docIndex"); >> SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs, >> conf, >> >> indexPath, >> >> IntWritable.class, >> >> Text.class); >> FileStatus fstatus[] = fs.listStatus(getInputPath()); >> try { >> IntWritable docId = new IntWritable(); >> int i = 0; >> int numCols = 0; >> int matrixCount = 0; >> for ( FileStatus f: fstatus ) { >> if (f.isDir()) { >> continue; >> } Path inPath = new Path(f.getPath().toUri().** >> toString()); >> Path matrixPath = new Path(outputPath, "matrix-"+matrixCount); >> SequenceFile.Writer matrixWriter = >> SequenceFile.createWriter(fs, >> >> conf, >> >> matrixPath, >> >> IntWritable.class, >> >> VectorWritable.class); >> for (Pair<Text,VectorWritable> record : >> new SequenceFileDirIterable<Text,**VectorWritable>(inPath, >> >> PathType.LIST, >> >> PathFilters.logsCRCFilter(), >> null, >> true, >> conf)) { >> VectorWritable value = record.getSecond(); >> docId.set(i); >> indexWriter.append(docId, record.getFirst()); >> matrixWriter.append(docId, value); >> i++; >> numCols = value.get().size(); >> } >> Closeables.closeQuietly(**matrixWriter); >> matrixCount++; >> } >> log.info("Wrote out matrix with {} rows and {} columns to {}", >> new Object[] { i, numCols, outputPath }); >> return 0; >> } finally { >> Closeables.closeQuietly(**indexWriter); >> } >> } >> public static void main(String[] args) throws Exception { >> ToolRunner.run(new RowIdJobDistributed(), args); >> } >> } >> >> >> >> ______________________________**__ >> From: Suneel Marthi<suneel_mar...@yahoo.com**> >> To: "user@mahout.apache.org"<user@**mahout.apache.org<user@mahout.apache.org> >> > >> Sent: Monday, June 4, 2012 1:39 AM >> Subject: Questions on RowId job >> >> >> 3. There was an issue that was brought up on these forums sometime last >> week to which Jake's response was that it would be a welcome patch to be >> able to configure the RowId Job to run in a distributed mode with many >> mappers. I agree that this is a useful change. This change means that the >> RowSimilarity Job which takes as inputs, the matrix and the number of >> columns generated by the RowId job needs to be tested to ensure that >> nothing's broken. >> >> Should I go ahead and open a Jira for the above issues? >> >> >> Regards, >> Suneel >> >