Repository: spark
Updated Branches:
  refs/heads/master d2d5e7fe2 -> d7053bea9


[SPARK-9903] [MLLIB] skip local processing in PrefixSpan if there are no small 
prefixes

There exists a chance that the prefixes keep growing to the maximum pattern 
length. Then the final local processing step becomes unnecessary. feynmanliang

Author: Xiangrui Meng <m...@databricks.com>

Closes #8136 from mengxr/SPARK-9903.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7053bea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7053bea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7053bea

Branch: refs/heads/master
Commit: d7053bea985679c514b3add029631ea23e1730ce
Parents: d2d5e7f
Author: Xiangrui Meng <m...@databricks.com>
Authored: Wed Aug 12 20:44:40 2015 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Wed Aug 12 20:44:40 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 37 +++++++++++---------
 1 file changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d7053bea/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index ad6715b5..dc4ae1d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -282,25 +282,30 @@ object PrefixSpan extends Logging {
       largePrefixes = newLargePrefixes
     }
 
-    // Switch to local processing.
-    val bcSmallPrefixes = sc.broadcast(smallPrefixes)
-    val distributedFreqPattern = postfixes.flatMap { postfix =>
-      bcSmallPrefixes.value.values.map { prefix =>
-        (prefix.id, postfix.project(prefix).compressed)
-      }.filter(_._2.nonEmpty)
-    }.groupByKey().flatMap { case (id, projPostfixes) =>
-      val prefix = bcSmallPrefixes.value(id)
-      val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength - 
prefix.length)
-      // TODO: We collect projected postfixes into memory. We should also 
compare the performance
-      // TODO: of keeping them on shuffle files.
-      localPrefixSpan.run(projPostfixes.toArray).map { case (pattern, count) =>
-        (prefix.items ++ pattern, count)
+    var freqPatterns = sc.parallelize(localFreqPatterns, 1)
+
+    val numSmallPrefixes = smallPrefixes.size
+    logInfo(s"number of small prefixes for local processing: 
$numSmallPrefixes")
+    if (numSmallPrefixes > 0) {
+      // Switch to local processing.
+      val bcSmallPrefixes = sc.broadcast(smallPrefixes)
+      val distributedFreqPattern = postfixes.flatMap { postfix =>
+        bcSmallPrefixes.value.values.map { prefix =>
+          (prefix.id, postfix.project(prefix).compressed)
+        }.filter(_._2.nonEmpty)
+      }.groupByKey().flatMap { case (id, projPostfixes) =>
+        val prefix = bcSmallPrefixes.value(id)
+        val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength - 
prefix.length)
+        // TODO: We collect projected postfixes into memory. We should also 
compare the performance
+        // TODO: of keeping them on shuffle files.
+        localPrefixSpan.run(projPostfixes.toArray).map { case (pattern, count) 
=>
+          (prefix.items ++ pattern, count)
+        }
       }
+      // Union local frequent patterns and distributed ones.
+      freqPatterns = freqPatterns ++ distributedFreqPattern
     }
 
-    // Union local frequent patterns and distributed ones.
-    val freqPatterns = (sc.parallelize(localFreqPatterns, 1) ++ 
distributedFreqPattern)
-      .persist(StorageLevel.MEMORY_AND_DISK)
     freqPatterns
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to