[ https://issues.apache.org/jira/browse/SPARK-20486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-20486: --------------------------------- Labels: bulk-closed (was: ) > Encapsulate ALS in-block and out-block data structures and methods into a > separate class > ---------------------------------------------------------------------------------------- > > Key: SPARK-20486 > URL: https://issues.apache.org/jira/browse/SPARK-20486 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib > Affects Versions: 2.1.0 > Reporter: Daniel Li > Priority: Trivial > Labels: bulk-closed > > The in-block and out-block data structures in the ALS code is currently > calculated within the {{ALS.train}} method itself. I propose to move this > code, along with its helper functions, into a separate class to encapsulate > the creation of the blocks. This has the added benefit of allowing us to > include a comprehensive Scaladoc to this new class to explain in detail how > this core part of the algorithm works. > Proposal: > {code} > private[recommendation] final case class RatingBlocks[ID]( > userIn: RDD[(Int, InBlock[ID])], > userOut: RDD[(Int, OutBlock)], > itemIn: RDD[(Int, InBlock[ID])], > itemOut: RDD[(Int, OutBlock)] > ) > private[recommendation] object RatingBlocks { > def create[ID: ClassTag: Ordering]( > ratings: RDD[Rating[ID]], > numUserBlocks: Int, > numItemBlocks: Int, > storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): > RatingBlocks[ID] = { > val userPart = new ALSPartitioner(numUserBlocks) > val itemPart = new ALSPartitioner(numItemBlocks) > val blockRatings = > partitionRatings(ratings, userPart, itemPart) > .persist(storageLevel) > val (userInBlocks, userOutBlocks) = > makeBlocks("user", blockRatings, userPart, itemPart, storageLevel) > userOutBlocks.count() // materialize `blockRatings` and user blocks > val swappedBlockRatings = blockRatings.map { > case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, > localRatings)) => > ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, > localRatings)) > } > val (itemInBlocks, itemOutBlocks) = > makeBlocks("item", swappedBlockRatings, itemPart, userPart, > storageLevel) > itemOutBlocks.count() // materialize item blocks > blockRatings.unpersist() > new RatingBlocks(userInBlocks, userOutBlocks, itemInBlocks, itemOutBlocks) > } > private[this] def partitionRatings[ID: ClassTag](...) = { > // existing code goes here verbatim > } > private[this] def makeBlocks[ID: ClassTag](...) = { > // existing code goes here verbatim > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org