[ 
https://issues.apache.org/jira/browse/MAHOUT-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134174#comment-14134174
 ] 

ASF GitHub Bot commented on MAHOUT-1615:
----------------------------------------

Github user dlyubimov commented on the pull request:

    https://github.com/apache/mahout/pull/52#issuecomment-55627685
  
    Well, there are three solutions to the problem I see. One is easy, another 
is pretty easy but perhaps more expensive operationally.
    
    First, let's clarify what is going on there. 
    
    (1) since the key type is having a ContextTag as context bound: DrmLike 
type is defined as DrmLike[K:ClassTag], it means that returning 
CheckpointedDrm[_] (which is subtype) from drmFromHDFS in fact implicitly means 
returning two things: CheckpointedDrm and evidence:ContextTag[K]. The evidence 
is essentially a run time class information (including all generics hierarchies 
possibly attached to it).
    
    Thinking is, since key class name is stored in the sequence file headers, 
it makes sense to expect for this routine to read out the header and infer 
ContextTag[K] on its own. As comment indicates there, it actually expected 
Spark to do that. But simple examination of sequenceFile() reveals that Spark 
actually does not do that. Instead, it is just propagates the evidence it 
already got when this method is called, and doesn't read any headers (or 
anything for that matter) until actual execution action occurs. As a result, 
intended conversion from writable to its wrapped type (String, Int or Long) 
does not really happen as it stands; instead, Writable is just propagated up 
the food chain, and we of course already know that writable instances are 
reused and would not make it thru strict Scala collections. 
    
    So, based on that, there are two ways to address that. 
    
    (1) One, simple way, is to escape this problem the same way Spark has 
escaped it: namely, require actual key type evidence from the user. That would 
require changing method signature to 
    
       def drmFromHDFS[K:ClassTag] (path: String, parMin:Int = 0)(implicit sc: 
DistributedContext): CheckpointedDrm[K] 
    
    
    Then you'd have to implicitly supply evidence by doing something like this:
    
        val drmA:DrmLike[String] = drmFromHdfs(...)
    
    I don't like this too much. 
    
    First, you have to remember to explicitly give it a type like above instead 
of implicit type inference which is expected in Scala 
    
         val drmA=...
    
    Another reason i don't like it is that you potentially are supplying a 
redundant information (since this information is already available form 
sequence file headers). Not only that, you _have_ to be right, or your delayed 
execution will fail in the tasks (so it is late error catching, which is fairly 
hairy for most users, so i'd expect a wall of questions of the type "why my 
tasks fail" for this very basic operation. 
    
    So, not so user-friendly (although those who know will cope). 
    
    (2) Another way to fix it is to keep current method contract, and fill in 
this functionality that I believe is missing in Spark, namely, examine input 
headers and load in key class type and thus infer proper ClassTag[_]. 
    
    There practically no downsides to this, except for one. Up until now we 
pretty much avoided direct dependencies on any Hadoop libraries. Spark goes to 
great length (mostly, reflection and pluggable Strategies based on Hadoop 
version) in order to retain compatibility with various HDFS flavors and 
versions. I just don't want to join this game. Very much so.
    
    (3) Another way is similar to (2) in the sense that we keep the signature 
and idea of filling the gap of inferring ClassTag[_] ourselves, but we do it by 
examining first Writeable key instance in the dataset. 
    
    This is even better than (2) since it keeps us from dealing with HDFS apis 
directly. But obvious downside is that we probably now will have to load the 
entire partition in spark instead of just reading sequence file header. So much 
for delayed action and memory allocation. 
    
    
    ---- 
    my personal preference is probably going with (2) while exploring (3) in 
terms of how bad this practice actually may get. Of course (1) is also an 
option since it is the easiest to implement.


> SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for 
> Text-Keyed SequenceFiles
> -------------------------------------------------------------------------------------------------
>
>                 Key: MAHOUT-1615
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1615
>             Project: Mahout
>          Issue Type: Bug
>            Reporter: Andrew Palumbo
>             Fix For: 1.0
>
>
> When reading in seq2sparse output from HDFS in the spark-shell of form 
> <Text,VectorWriteable>  SparkEngine's drmFromHDFS method is creating rdds 
> with the same Key for all Pairs:  
> {code}
> mahout> val drmTFIDF= drmFromHDFS( path = 
> "/tmp/mahout-work-andy/20news-test-vectors/part-r-00000")
> {code}
> Has keys:
> {...} 
>     key: /talk.religion.misc/84570
>     key: /talk.religion.misc/84570
>     key: /talk.religion.misc/84570
> {...}
> for the entire set.  This is the last Key in the set.
> The problem can be traced to the first line of drmFromHDFS(...) in 
> SparkEngine.scala: 
> {code}
>  val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], 
> minPartitions = parMin)
>         // Get rid of VectorWritable
>         .map(t => (t._1, t._2.get()))
> {code}
> which gives the same key for all t._1.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to