[ 
https://issues.apache.org/jira/browse/MAHOUT-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147373#comment-14147373
 ] 

ASF GitHub Bot commented on MAHOUT-1615:
----------------------------------------

Github user dlyubimov commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/52#discussion_r18014902
  
    --- Diff: 
spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---
    @@ -127,33 +131,41 @@ object SparkEngine extends DistributedEngine {
        */
       def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: 
DistributedContext): CheckpointedDrm[_] = {
     
    -    val rdd = sc.sequenceFile(path, classOf[Writable], 
classOf[VectorWritable], minPartitions = parMin)
    -        // Get rid of VectorWritable
    -        .map(t => (t._1, t._2.get()))
    +    // HDFS Paramaters
    +    val hConf= new Configuration()
    +    val hPath= new Path(path)
    +    val fs= FileSystem.get(hConf)
     
    -    def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) = 
implicitly[ClassTag[K]]
    +    /** Get the Key Class For the Sequence File */
    +    def getKeyClassTag[K:ClassTag] = ClassTag(new SequenceFile.Reader(fs, 
hPath, hConf).getKeyClass)
    +    /** Get the Value Class For the Sequence File */
    +//    def getValueClassTag[V:ClassTag] = ClassTag(new 
SequenceFile.Reader(fs, hPath, hConf).getValueClass)
     
    -    // Spark should've loaded the type info from the header, right?
    -    val keyTag = getKeyClassTag(rdd)
    +    // Spark doesn't check the Sequence File Header so we have to.
    +    val keyTag = getKeyClassTag
    +//    val ct= ClassTag(keyTag.getClass)
    +
    +    // ClassTag to match on not lost by erasure
    +    val ct= ClassTag(classOf[Writable])
     
         val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match {
     
    -      case xx: ClassTag[Writable] if (xx == 
implicitly[ClassTag[IntWritable]]) => (
    +      case ct if (keyTag == implicitly[ClassTag[IntWritable]]) => (
               (v: AnyRef) => v.asInstanceOf[IntWritable].get,
    -          (x: Any) => new IntWritable(x.asInstanceOf[Int]),
    +          (x: Any) => new Integer(x.asInstanceOf[IntWritable].get),
               implicitly[ClassTag[Int]])
     
    -      case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) => 
(
    +      case ct if (keyTag == implicitly[ClassTag[Text]]) => (
               (v: AnyRef) => v.asInstanceOf[Text].toString,
               (x: Any) => new Text(x.toString),
               implicitly[ClassTag[String]])
     
    -      case xx: ClassTag[Writable] if (xx == 
implicitly[ClassTag[LongWritable]]) => (
    +      case ct if (keyTag == implicitly[ClassTag[LongWritable]]) => (
               (v: AnyRef) => v.asInstanceOf[LongWritable].get,
    -          (x: Any) => new LongWritable(x.asInstanceOf[Int]),
    +          (x: Any) => new LongWritable(x.asInstanceOf[LongWritable].get),
    --- End diff --
    
    perhaps naming is to blame. key2val was meant to be the transformation from
    file key (i.e. writable) to actual non-reused type such as Int etc. val2key
    was meant to be inverse (and in the non-edited code it is), but it is not
    used in context of this method and therefore should be omited. i.e it
    should be simply
    
           val (key2val, exactTag) = .... match ...
    
    
    
    
    On Wed, Sep 24, 2014 at 2:23 PM, Andrew Palumbo <notificati...@github.com>
    wrote:
    
    > In spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala:
    >
    > >            (v: AnyRef) => v.asInstanceOf[LongWritable].get,
    > > -          (x: Any) => new LongWritable(x.asInstanceOf[Int]),
    > > +          (x: Any) => new 
LongWritable(x.asInstanceOf[LongWritable].get),
    >
    > I'm not sure here- we need to remove one of the functions them but I think
    > we should be using val2key here not key2val correct?
    >
    >      key2val(v: AnyRef) => v.asInstanceOf[IntWritable].get
    >      val2key(x: Any) => new Integer(x.asInstanceOf[IntWritable].get)
    >
    > so later when we map to the RDD:
    >
    > val drmRdd = rdd.map { t => val2keyFunc(t._1) -> t._2.get()}
    >
    > they will be of form [Integer][Vector]rather than [IntWritable][Vector]
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/mahout/pull/52/files#r18001536>.
    >


> SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for 
> Text-Keyed SequenceFiles
> -------------------------------------------------------------------------------------------------
>
>                 Key: MAHOUT-1615
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1615
>             Project: Mahout
>          Issue Type: Bug
>            Reporter: Andrew Palumbo
>             Fix For: 1.0
>
>
> When reading in seq2sparse output from HDFS in the spark-shell of form 
> <Text,VectorWriteable>  SparkEngine's drmFromHDFS method is creating rdds 
> with the same Key for all Pairs:  
> {code}
> mahout> val drmTFIDF= drmFromHDFS( path = 
> "/tmp/mahout-work-andy/20news-test-vectors/part-r-00000")
> {code}
> Has keys:
> {...} 
>     key: /talk.religion.misc/84570
>     key: /talk.religion.misc/84570
>     key: /talk.religion.misc/84570
> {...}
> for the entire set.  This is the last Key in the set.
> The problem can be traced to the first line of drmFromHDFS(...) in 
> SparkEngine.scala: 
> {code}
>  val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], 
> minPartitions = parMin)
>         // Get rid of VectorWritable
>         .map(t => (t._1, t._2.get()))
> {code}
> which gives the same key for all t._1.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to