[ https://issues.apache.org/jira/browse/MAHOUT-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147373#comment-14147373 ]
ASF GitHub Bot commented on MAHOUT-1615: ---------------------------------------- Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/52#discussion_r18014902 --- Diff: spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala --- @@ -127,33 +131,41 @@ object SparkEngine extends DistributedEngine { */ def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { - val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin) - // Get rid of VectorWritable - .map(t => (t._1, t._2.get())) + // HDFS Paramaters + val hConf= new Configuration() + val hPath= new Path(path) + val fs= FileSystem.get(hConf) - def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) = implicitly[ClassTag[K]] + /** Get the Key Class For the Sequence File */ + def getKeyClassTag[K:ClassTag] = ClassTag(new SequenceFile.Reader(fs, hPath, hConf).getKeyClass) + /** Get the Value Class For the Sequence File */ +// def getValueClassTag[V:ClassTag] = ClassTag(new SequenceFile.Reader(fs, hPath, hConf).getValueClass) - // Spark should've loaded the type info from the header, right? - val keyTag = getKeyClassTag(rdd) + // Spark doesn't check the Sequence File Header so we have to. + val keyTag = getKeyClassTag +// val ct= ClassTag(keyTag.getClass) + + // ClassTag to match on not lost by erasure + val ct= ClassTag(classOf[Writable]) val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match { - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[IntWritable]]) => ( + case ct if (keyTag == implicitly[ClassTag[IntWritable]]) => ( (v: AnyRef) => v.asInstanceOf[IntWritable].get, - (x: Any) => new IntWritable(x.asInstanceOf[Int]), + (x: Any) => new Integer(x.asInstanceOf[IntWritable].get), implicitly[ClassTag[Int]]) - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) => ( + case ct if (keyTag == implicitly[ClassTag[Text]]) => ( (v: AnyRef) => v.asInstanceOf[Text].toString, (x: Any) => new Text(x.toString), implicitly[ClassTag[String]]) - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[LongWritable]]) => ( + case ct if (keyTag == implicitly[ClassTag[LongWritable]]) => ( (v: AnyRef) => v.asInstanceOf[LongWritable].get, - (x: Any) => new LongWritable(x.asInstanceOf[Int]), + (x: Any) => new LongWritable(x.asInstanceOf[LongWritable].get), --- End diff -- perhaps naming is to blame. key2val was meant to be the transformation from file key (i.e. writable) to actual non-reused type such as Int etc. val2key was meant to be inverse (and in the non-edited code it is), but it is not used in context of this method and therefore should be omited. i.e it should be simply val (key2val, exactTag) = .... match ... On Wed, Sep 24, 2014 at 2:23 PM, Andrew Palumbo <notificati...@github.com> wrote: > In spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala: > > > (v: AnyRef) => v.asInstanceOf[LongWritable].get, > > - (x: Any) => new LongWritable(x.asInstanceOf[Int]), > > + (x: Any) => new LongWritable(x.asInstanceOf[LongWritable].get), > > I'm not sure here- we need to remove one of the functions them but I think > we should be using val2key here not key2val correct? > > key2val(v: AnyRef) => v.asInstanceOf[IntWritable].get > val2key(x: Any) => new Integer(x.asInstanceOf[IntWritable].get) > > so later when we map to the RDD: > > val drmRdd = rdd.map { t => val2keyFunc(t._1) -> t._2.get()} > > they will be of form [Integer][Vector]rather than [IntWritable][Vector] > > — > Reply to this email directly or view it on GitHub > <https://github.com/apache/mahout/pull/52/files#r18001536>. > > SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for > Text-Keyed SequenceFiles > ------------------------------------------------------------------------------------------------- > > Key: MAHOUT-1615 > URL: https://issues.apache.org/jira/browse/MAHOUT-1615 > Project: Mahout > Issue Type: Bug > Reporter: Andrew Palumbo > Fix For: 1.0 > > > When reading in seq2sparse output from HDFS in the spark-shell of form > <Text,VectorWriteable> SparkEngine's drmFromHDFS method is creating rdds > with the same Key for all Pairs: > {code} > mahout> val drmTFIDF= drmFromHDFS( path = > "/tmp/mahout-work-andy/20news-test-vectors/part-r-00000") > {code} > Has keys: > {...} > key: /talk.religion.misc/84570 > key: /talk.religion.misc/84570 > key: /talk.religion.misc/84570 > {...} > for the entire set. This is the last Key in the set. > The problem can be traced to the first line of drmFromHDFS(...) in > SparkEngine.scala: > {code} > val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], > minPartitions = parMin) > // Get rid of VectorWritable > .map(t => (t._1, t._2.get())) > {code} > which gives the same key for all t._1. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)