[
https://issues.apache.org/jira/browse/MAHOUT-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14145908#comment-14145908
]
ASF GitHub Bot commented on MAHOUT-1615:
----------------------------------------
Github user dlyubimov commented on a diff in the pull request:
https://github.com/apache/mahout/pull/52#discussion_r17954424
--- Diff:
spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---
@@ -127,33 +131,41 @@ object SparkEngine extends DistributedEngine {
*/
def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc:
DistributedContext): CheckpointedDrm[_] = {
- val rdd = sc.sequenceFile(path, classOf[Writable],
classOf[VectorWritable], minPartitions = parMin)
- // Get rid of VectorWritable
- .map(t => (t._1, t._2.get()))
+ // HDFS Paramaters
+ val hConf= new Configuration()
+ val hPath= new Path(path)
+ val fs= FileSystem.get(hConf)
- def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) =
implicitly[ClassTag[K]]
+ /** Get the Key Class For the Sequence File */
+ def getKeyClassTag[K:ClassTag] = ClassTag(new SequenceFile.Reader(fs,
hPath, hConf).getKeyClass)
+ /** Get the Value Class For the Sequence File */
+// def getValueClassTag[V:ClassTag] = ClassTag(new
SequenceFile.Reader(fs, hPath, hConf).getValueClass)
- // Spark should've loaded the type info from the header, right?
- val keyTag = getKeyClassTag(rdd)
+ // Spark doesn't check the Sequence File Header so we have to.
+ val keyTag = getKeyClassTag
+// val ct= ClassTag(keyTag.getClass)
+
+ // ClassTag to match on not lost by erasure
+ val ct= ClassTag(classOf[Writable])
val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match {
- case xx: ClassTag[Writable] if (xx ==
implicitly[ClassTag[IntWritable]]) => (
+ case ct if (keyTag == implicitly[ClassTag[IntWritable]]) => (
(v: AnyRef) => v.asInstanceOf[IntWritable].get,
- (x: Any) => new IntWritable(x.asInstanceOf[Int]),
+ (x: Any) => new Integer(x.asInstanceOf[IntWritable].get),
implicitly[ClassTag[Int]])
- case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) =>
(
+ case ct if (keyTag == implicitly[ClassTag[Text]]) => (
(v: AnyRef) => v.asInstanceOf[Text].toString,
(x: Any) => new Text(x.toString),
implicitly[ClassTag[String]])
- case xx: ClassTag[Writable] if (xx ==
implicitly[ClassTag[LongWritable]]) => (
+ case ct if (keyTag == implicitly[ClassTag[LongWritable]]) => (
(v: AnyRef) => v.asInstanceOf[LongWritable].get,
- (x: Any) => new LongWritable(x.asInstanceOf[Int]),
+ (x: Any) => new LongWritable(x.asInstanceOf[LongWritable].get),
--- End diff --
this can be removed entirely. use key2val instead.
> SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for
> Text-Keyed SequenceFiles
> -------------------------------------------------------------------------------------------------
>
> Key: MAHOUT-1615
> URL: https://issues.apache.org/jira/browse/MAHOUT-1615
> Project: Mahout
> Issue Type: Bug
> Reporter: Andrew Palumbo
> Fix For: 1.0
>
>
> When reading in seq2sparse output from HDFS in the spark-shell of form
> <Text,VectorWriteable> SparkEngine's drmFromHDFS method is creating rdds
> with the same Key for all Pairs:
> {code}
> mahout> val drmTFIDF= drmFromHDFS( path =
> "/tmp/mahout-work-andy/20news-test-vectors/part-r-00000")
> {code}
> Has keys:
> {...}
> key: /talk.religion.misc/84570
> key: /talk.religion.misc/84570
> key: /talk.religion.misc/84570
> {...}
> for the entire set. This is the last Key in the set.
> The problem can be traced to the first line of drmFromHDFS(...) in
> SparkEngine.scala:
> {code}
> val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable],
> minPartitions = parMin)
> // Get rid of VectorWritable
> .map(t => (t._1, t._2.get()))
> {code}
> which gives the same key for all t._1.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)