[ 
https://issues.apache.org/jira/browse/MAHOUT-923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13167443#comment-13167443
 ] 

jirapos...@reviews.apache.org commented on MAHOUT-923:
------------------------------------------------------



bq.  On 2011-12-12 02:10:01, Dmitriy Lyubimov wrote:
bq.  > Hm. I hope i did not read the code or miss something. 
bq.  > 
bq.  > 1 -- i am not sure this will actually work as intended unless # of 
reducers is corced to 1, of which i see no mention in the code. 
bq.  > 2 -- mappers do nothing, passing on all the row pressure to sort which 
is absolutely not necessary. Even if you use combiners. This is going to be 
especially the case if you coerce 1 reducer an no combiners. IMO mean 
computation should be pushed up to mappers to avoid sort pressures of map 
reduce. Then reduction becomes largely symbolical(but you do need pass on the # 
of rows mapper has seen, to the reducer, in order for that operation to apply 
correctly).
bq.  > 3 -- i am not sure -- is NullWritable as a key legit? In my experience 
sequence file reader cannot instantiate it because NullWritable is a singleton 
and its creation is prohibited by making constructor private.
bq.  
bq.  Raphael Cendrillon wrote:
bq.      Thanks Dmitry.
bq.      
bq.      Regarding 1, if I understand correctly the number of reducers depends 
on the number of unique keys. Since all keys are set to the same value (null), 
then all of the mapper outputs should arrive at the same reducer. This seems to 
work in the unit test, but I may be missing something?
bq.      
bq.      Regarding 2, that makes alot of sense. I'm wondering how many rows 
should be processed per mapper?  I guess there is a trade-off between 
scalability (processing more rows within a single map job means that each row 
must have less columns) and speed?  Is there someplace in the SSVD code where 
the matrix is split into slices of rows that I could use as a reference?
bq.      
bq.      Regarding 3, I believe NullWritable is OK. It's used pretty 
extensively in TimesSquaredJob in DistributedRowMatrx. However if you feel 
there is some disadvantage to this I could replace "NullWritable.get()" with 
"new IntWritable(1)" (that is, set all of the keys to 1). Would that be more 
suitable?
bq.      
bq.
bq.  
bq.  Dmitriy Lyubimov wrote:
bq.      NullWritable objection is withdrawn. Apparently i haven't looked into 
hadoop for too long, amazingly it seems to work now.
bq.      
bq.      
bq.      1 -- I don't think your statement about # of reduce tasks is true. 
bq.      
bq.      The job (or, rather, user) sets the number of reduce tasks via config 
propery. All users will follow hadoop recommendation to set that to 95% of 
capacity they want to take. (usually the whole cluster). So in production 
environment you are virtually _guaranteed_ to have number of reducers of 
something like 75 on a 40-noder and consequently 75 output files (unless users 
really want to read the details of your job and figure you meant it to be just 
1). 
bq.      Now, it is true that only one file will actually end up having 
something and the rest of task slots will just be occupied doing nothing . 
bq.      
bq.      So there are two problems with that scheme: a) is that job that 
allocates so many task slots that do nothing is not a good citizen, since in 
real production cluster is always shared with multiple jobs. b) your code 
assumes result will end up in partition 0, whereas contractually it may end up 
in any of 75 files. (in reality with default hash partitioner for key 1 it will 
wind up in partion 0001 unless there's one reducer as i guess in your test 
was). 
bq.      
bq.      2-- it is simple. when you send n rows to reducers, they are shuffled 
- and - sorted. Sending massive sets to reducers has 2 effects: first, even if 
they all group under the same key, they are still sorted with ~ n log (n/p) 
where p is number of partitions assuming uniform distribution (which it is not 
because you are sending everything to the same place). Just because we can run 
distributed sort, doesn't mean we should. Secondly, all these rows are 
physically moved to reduce tasks, which is still ~n rows. Finally what has made 
your case especially problematic is that you are sending everything to the same 
reducer, i.e. you are not actually doing sort in distributed way but rather 
simple single threaded sort at the reducer that happens to get all the input. 
bq.      
bq.      So that would allocate a lot of tasks slots that are not used; but do 
a sort that is not needed; and do it in a single reducer thread for the entire 
input which is not parallel at all. 
bq.      
bq.      Instead, consider this: map has a state consisting of (sum(X), k). it 
keeps updating it sum+=x, k++ for every new x. At the end of the cycle (in 
cleanup) it writes only 1 tuple (sum(x), k) as output. so we just reduced 
complexity of the sort and io from millions of elements to just # of maps 
(which is perhaps just handful and in reality rarely overshoots 500 mappers). 
That is, about at least 4 orders of magnitude. 
bq.      
bq.      Now, we send that handful tuples to single reducer and just do 
combining (sum(X)+= sum_i(X); n+= n_i) where i is the tuple in reducer. And 
because it is only a handful, reducer also runs very quickly, so the fact that 
we coerced it to be 1, is pretty benign. That volume of anywhere between 1 to 
500 vectors it sums up doesn't warrant distributed computation. 
bq.      
bq.      But, you have to make sure there's only 1 reducer no matter what user 
put into the config, and you have to make sure you do all heavy lifting in the 
mappers.
bq.      
bq.      Finally, you don't even to coerce to 1 reducer. You still can have 
several (but uniformly distributed) and do final combine in front end of the 
method. However, given small size and triviality of the reduction processing, 
it is probably not warranted. Coercing to 1 reducer is ok in this case IMO.
bq.      
bq.      3 i guess any writable is ok but NullWritable. Maybe something has 
changed. i remember falling into that pitfall several generations of hadoop 
ago. You can verify by staging a simple experiment of writing a sequence file 
with nullwritable as either key or value and try to read it back. in my test 
long ago it would write ok but not read back. I beleive similar approach is 
used with keys in shuffle and sort. There is a reflection writable factory 
inside which is trying to use default constructor of the class to bring it up 
which is(was) not available for NullWritable.
bq.      
bq.      
bq.

Thanks Dmitriy. I've updated the diff to push the row summation into the mapper 
as you suggested, force the number of reducers to 1, and make the final output 
key IntWritable. Could you please take a look?


- Raphael


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3147/#review3838
-----------------------------------------------------------


On 2011-12-12 10:41:46, Raphael Cendrillon wrote:
bq.  
bq.  -----------------------------------------------------------
bq.  This is an automatically generated e-mail. To reply, visit:
bq.  https://reviews.apache.org/r/3147/
bq.  -----------------------------------------------------------
bq.  
bq.  (Updated 2011-12-12 10:41:46)
bq.  
bq.  
bq.  Review request for mahout.
bq.  
bq.  
bq.  Summary
bq.  -------
bq.  
bq.  Here's a patch with a simple job to calculate the row mean (column-wise 
mean). One outstanding issue is the combiner, this requires a wrtiable class 
IntVectorTupleWritable, where the Int stores the number of rows, and the Vector 
stores the column-wise sum.
bq.  
bq.  
bq.  This addresses bug MAHOUT-923.
bq.      https://issues.apache.org/jira/browse/MAHOUT-923
bq.  
bq.  
bq.  Diffs
bq.  -----
bq.  
bq.    
/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
 1213095 
bq.    
/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixRowMeanJob.java 
PRE-CREATION 
bq.    
/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
 1213095 
bq.  
bq.  Diff: https://reviews.apache.org/r/3147/diff
bq.  
bq.  
bq.  Testing
bq.  -------
bq.  
bq.  Junit test
bq.  
bq.  
bq.  Thanks,
bq.  
bq.  Raphael
bq.  
bq.


                
> Row mean job for PCA
> --------------------
>
>                 Key: MAHOUT-923
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-923
>             Project: Mahout
>          Issue Type: Improvement
>          Components: Math
>    Affects Versions: 0.6
>            Reporter: Raphael Cendrillon
>            Assignee: Raphael Cendrillon
>             Fix For: Backlog
>
>         Attachments: MAHOUT-923.patch
>
>
> Add map reduce job for calculating mean row (column-wise mean) of a 
> Distributed Row Matrix for use in PCA.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to