Repository: spark Updated Branches: refs/heads/master d2d5e7fe2 -> d7053bea9
[SPARK-9903] [MLLIB] skip local processing in PrefixSpan if there are no small prefixes There exists a chance that the prefixes keep growing to the maximum pattern length. Then the final local processing step becomes unnecessary. feynmanliang Author: Xiangrui Meng <m...@databricks.com> Closes #8136 from mengxr/SPARK-9903. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7053bea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7053bea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7053bea Branch: refs/heads/master Commit: d7053bea985679c514b3add029631ea23e1730ce Parents: d2d5e7f Author: Xiangrui Meng <m...@databricks.com> Authored: Wed Aug 12 20:44:40 2015 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Wed Aug 12 20:44:40 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 37 +++++++++++--------- 1 file changed, 21 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d7053bea/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index ad6715b5..dc4ae1d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -282,25 +282,30 @@ object PrefixSpan extends Logging { largePrefixes = newLargePrefixes } - // Switch to local processing. - val bcSmallPrefixes = sc.broadcast(smallPrefixes) - val distributedFreqPattern = postfixes.flatMap { postfix => - bcSmallPrefixes.value.values.map { prefix => - (prefix.id, postfix.project(prefix).compressed) - }.filter(_._2.nonEmpty) - }.groupByKey().flatMap { case (id, projPostfixes) => - val prefix = bcSmallPrefixes.value(id) - val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength - prefix.length) - // TODO: We collect projected postfixes into memory. We should also compare the performance - // TODO: of keeping them on shuffle files. - localPrefixSpan.run(projPostfixes.toArray).map { case (pattern, count) => - (prefix.items ++ pattern, count) + var freqPatterns = sc.parallelize(localFreqPatterns, 1) + + val numSmallPrefixes = smallPrefixes.size + logInfo(s"number of small prefixes for local processing: $numSmallPrefixes") + if (numSmallPrefixes > 0) { + // Switch to local processing. + val bcSmallPrefixes = sc.broadcast(smallPrefixes) + val distributedFreqPattern = postfixes.flatMap { postfix => + bcSmallPrefixes.value.values.map { prefix => + (prefix.id, postfix.project(prefix).compressed) + }.filter(_._2.nonEmpty) + }.groupByKey().flatMap { case (id, projPostfixes) => + val prefix = bcSmallPrefixes.value(id) + val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength - prefix.length) + // TODO: We collect projected postfixes into memory. We should also compare the performance + // TODO: of keeping them on shuffle files. + localPrefixSpan.run(projPostfixes.toArray).map { case (pattern, count) => + (prefix.items ++ pattern, count) + } } + // Union local frequent patterns and distributed ones. + freqPatterns = freqPatterns ++ distributedFreqPattern } - // Union local frequent patterns and distributed ones. - val freqPatterns = (sc.parallelize(localFreqPatterns, 1) ++ distributedFreqPattern) - .persist(StorageLevel.MEMORY_AND_DISK) freqPatterns } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org