[ https://issues.apache.org/jira/browse/SPARK-20486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Daniel Li updated SPARK-20486: ------------------------------ Description: The in-block and out-block data structures in the ALS code is currently calculated within the {{ALS.train}} method itself. I propose to move this code, along with its helper functions, into a separate class to encapsulate the creation of the blocks. This has the added benefit of allowing us to include a comprehensive Scaladoc to this new class to explain in detail how this core part of the algorithm works. Proposal: {code} private[recommendation] final case class RatingBlocks[ID]( userIn: RDD[(Int, InBlock[ID])], userOut: RDD[(Int, OutBlock)], itemIn: RDD[(Int, InBlock[ID])], itemOut: RDD[(Int, OutBlock)] ) private[recommendation] object RatingBlocks { def create[ID: ClassTag: Ordering]( ratings: RDD[Rating[ID]], numUserBlocks: Int, numItemBlocks: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): RatingBlocks[ID] = { val userPart = new ALSPartitioner(numUserBlocks) val itemPart = new ALSPartitioner(numItemBlocks) val blockRatings = partitionRatings(ratings, userPart, itemPart) .persist(storageLevel) val (userInBlocks, userOutBlocks) = makeBlocks("user", blockRatings, userPart, itemPart, storageLevel) userOutBlocks.count() // materialize `blockRatings` and user blocks val swappedBlockRatings = blockRatings.map { case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) => ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } val (itemInBlocks, itemOutBlocks) = makeBlocks("item", swappedBlockRatings, itemPart, userPart, storageLevel) itemOutBlocks.count() // materialize item blocks blockRatings.unpersist() new RatingBlocks(userInBlocks, userOutBlocks, itemInBlocks, itemOutBlocks) } private[this] def partitionRatings[ID: ClassTag](...) = { // existing code goes here verbatim } private[this] def makeBlocks[ID: ClassTag](...) = { // existing code goes here verbatim } } {code} was: The in-block and out-block data structures in the ALS code is currently calculated within the {{ALS.train}} method itself. I propose to move this code, along with its helper functions, into a separate class to encapsulate the creation of the blocks. This has the added benefit of allowing us to include a comprehensive Scaladoc to this new class to explain in detail how this core part of the algorithm works. Proposal: {code} private[recommendation] final case class RatingBlocks[ID]( userIn: RDD[(Int, InBlock[ID])], userOut: RDD[(Int, OutBlock)], itemIn: RDD[(Int, InBlock[ID])], itemOut: RDD[(Int, OutBlock)] ) private[recommendation] object RatingBlocks { def create[ID: ClassTag: Ordering]( ratings: RDD[Rating[ID]], numUserBlocks: Int, numItemBlocks: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): RatingBlocks[ID] = { // In-block and out-block code currently in `ALS.train` goes here } private[this] def partitionRatings[ID: ClassTag](...) = { ... } private[this] def makeBlocks[ID: ClassTag](...) = { ... } } {code} > Encapsulate ALS in-block and out-block data structures and methods into a > separate class > ---------------------------------------------------------------------------------------- > > Key: SPARK-20486 > URL: https://issues.apache.org/jira/browse/SPARK-20486 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib > Affects Versions: 2.1.0 > Reporter: Daniel Li > Priority: Trivial > > The in-block and out-block data structures in the ALS code is currently > calculated within the {{ALS.train}} method itself. I propose to move this > code, along with its helper functions, into a separate class to encapsulate > the creation of the blocks. This has the added benefit of allowing us to > include a comprehensive Scaladoc to this new class to explain in detail how > this core part of the algorithm works. > Proposal: > {code} > private[recommendation] final case class RatingBlocks[ID]( > userIn: RDD[(Int, InBlock[ID])], > userOut: RDD[(Int, OutBlock)], > itemIn: RDD[(Int, InBlock[ID])], > itemOut: RDD[(Int, OutBlock)] > ) > private[recommendation] object RatingBlocks { > def create[ID: ClassTag: Ordering]( > ratings: RDD[Rating[ID]], > numUserBlocks: Int, > numItemBlocks: Int, > storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): > RatingBlocks[ID] = { > val userPart = new ALSPartitioner(numUserBlocks) > val itemPart = new ALSPartitioner(numItemBlocks) > val blockRatings = > partitionRatings(ratings, userPart, itemPart) > .persist(storageLevel) > val (userInBlocks, userOutBlocks) = > makeBlocks("user", blockRatings, userPart, itemPart, storageLevel) > userOutBlocks.count() // materialize `blockRatings` and user blocks > val swappedBlockRatings = blockRatings.map { > case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, > localRatings)) => > ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, > localRatings)) > } > val (itemInBlocks, itemOutBlocks) = > makeBlocks("item", swappedBlockRatings, itemPart, userPart, > storageLevel) > itemOutBlocks.count() // materialize item blocks > blockRatings.unpersist() > new RatingBlocks(userInBlocks, userOutBlocks, itemInBlocks, itemOutBlocks) > } > private[this] def partitionRatings[ID: ClassTag](...) = { > // existing code goes here verbatim > } > private[this] def makeBlocks[ID: ClassTag](...) = { > // existing code goes here verbatim > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org