[GitHub] spark pull request: [SPARK-9540] [MLLIB] optimize PrefixSpan imple...

2015-08-04 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7937#discussion_r36264370
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala ---
@@ -35,83 +35,81 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 */
 
 val sequences = Array(
-  Array(1, -1, 3, -1, 4, -1, 5),
-  Array(2, -1, 3, -1, 1),
-  Array(2, -1, 4, -1, 1),
-  Array(3, -1, 1, -1, 3, -1, 4, -1, 5),
-  Array(3, -1, 4, -1, 4, -1, 3),
-  Array(6, -1, 5, -1, 3))
+  Array(0, 1, 0, 3, 0, 4, 0, 5, 0),
+  Array(0, 2, 0, 3, 0, 1, 0),
+  Array(0, 2, 0, 4, 0, 1, 0),
+  Array(0, 3, 0, 1, 0, 3, 0, 4, 0, 5, 0),
+  Array(0, 3, 0, 4, 0, 4, 0, 3, 0),
+  Array(0, 6, 0, 5, 0, 3, 0))
 
 val rdd = sc.parallelize(sequences, 2).cache()
 
-val prefixspan = new PrefixSpan()
-  .setMinSupport(0.33)
-  .setMaxPatternLength(50)
-val result1 = prefixspan.run(rdd)
+val result1 = PrefixSpan.genFreqPatterns(
+  rdd, minCount = 2L, maxPatternLength = 50, maxLocalProjDBSize = 16L)
 val expectedValue1 = Array(
-  (Array(1), 4L),
-  (Array(1, -1, 3), 2L),
-  (Array(1, -1, 3, -1, 4), 2L),
-  (Array(1, -1, 3, -1, 4, -1, 5), 2L),
-  (Array(1, -1, 3, -1, 5), 2L),
-  (Array(1, -1, 4), 2L),
-  (Array(1, -1, 4, -1, 5), 2L),
-  (Array(1, -1, 5), 2L),
-  (Array(2), 2L),
-  (Array(2, -1, 1), 2L),
-  (Array(3), 5L),
-  (Array(3, -1, 1), 2L),
-  (Array(3, -1, 3), 2L),
-  (Array(3, -1, 4), 3L),
-  (Array(3, -1, 4, -1, 5), 2L),
-  (Array(3, -1, 5), 2L),
-  (Array(4), 4L),
-  (Array(4, -1, 5), 2L),
-  (Array(5), 3L)
+  (Array(0, 1, 0), 4L),
+  (Array(0, 1, 0, 3, 0), 2L),
+  (Array(0, 1, 0, 3, 0, 4, 0), 2L),
+  (Array(0, 1, 0, 3, 0, 4, 0, 5, 0), 2L),
+  (Array(0, 1, 0, 3, 0, 5, 0), 2L),
+  (Array(0, 1, 0, 4, 0), 2L),
+  (Array(0, 1, 0, 4, 0, 5, 0), 2L),
+  (Array(0, 1, 0, 5, 0), 2L),
+  (Array(0, 2, 0), 2L),
+  (Array(0, 2, 0, 1, 0), 2L),
+  (Array(0, 3, 0), 5L),
+  (Array(0, 3, 0, 1, 0), 2L),
+  (Array(0, 3, 0, 3, 0), 2L),
+  (Array(0, 3, 0, 4, 0), 3L),
+  (Array(0, 3, 0, 4, 0, 5, 0), 2L),
+  (Array(0, 3, 0, 5, 0), 2L),
+  (Array(0, 4, 0), 4L),
+  (Array(0, 4, 0, 5, 0), 2L),
+  (Array(0, 5, 0), 3L)
 )
 compareInternalResults(expectedValue1, result1.collect())
 
-prefixspan.setMinSupport(0.5).setMaxPatternLength(50)
-val result2 = prefixspan.run(rdd)
+val result2 = PrefixSpan.genFreqPatterns(
+  rdd, minCount = 3, maxPatternLength = 50, maxLocalProjDBSize = 32L)
 val expectedValue2 = Array(
-  (Array(1), 4L),
-  (Array(3), 5L),
-  (Array(3, -1, 4), 3L),
-  (Array(4), 4L),
-  (Array(5), 3L)
+  (Array(0, 1, 0), 4L),
+  (Array(0, 3, 0), 5L),
+  (Array(0, 3, 0, 4, 0), 3L),
+  (Array(0, 4, 0), 4L),
+  (Array(0, 5, 0), 3L)
 )
 compareInternalResults(expectedValue2, result2.collect())
 
-prefixspan.setMinSupport(0.33).setMaxPatternLength(2)
-val result3 = prefixspan.run(rdd)
+val result3 = PrefixSpan.genFreqPatterns(
+  rdd, minCount = 2, maxPatternLength = 2, maxLocalProjDBSize = 32L)
 val expectedValue3 = Array(
-  (Array(1), 4L),
-  (Array(1, -1, 3), 2L),
-  (Array(1, -1, 4), 2L),
-  (Array(1, -1, 5), 2L),
-  (Array(2, -1, 1), 2L),
-  (Array(2), 2L),
-  (Array(3), 5L),
-  (Array(3, -1, 1), 2L),
-  (Array(3, -1, 3), 2L),
-  (Array(3, -1, 4), 3L),
-  (Array(3, -1, 5), 2L),
-  (Array(4), 4L),
-  (Array(4, -1, 5), 2L),
-  (Array(5), 3L)
+  (Array(0, 1, 0), 4L),
+  (Array(0, 1, 0, 3, 0), 2L),
+  (Array(0, 1, 0, 4, 0), 2L),
+  (Array(0, 1, 0, 5, 0), 2L),
+  (Array(0, 2, 0, 1, 0), 2L),
+  (Array(0, 2, 0), 2L),
+  (Array(0, 3, 0), 5L),
+  (Array(0, 3, 0, 1, 0), 2L),
+  (Array(0, 3, 0, 3, 0), 2L),
+  (Array(0, 3, 0, 4, 0), 3L),
+  (Array(0, 3, 0, 5, 0), 2L),
+  (Array(0, 4, 0), 4L),
+  (Array(0, 4, 0, 5, 0), 2L),
+  (Array(0, 5, 0), 3L)
 )
 compareInternalResults(expectedValue3, result3.collect())
   }
 
   test("PrefixSpan internal (integer seq, -1 delim) run, variable-size 
itemsets") {
 val sequences = Array(
-  Array(1, -1, 1, 2, 3, -1, 1, 3, -1, 4, -1, 3, 6),
-  Array(1, 4, -1, 3, -1, 2, 3, -1, 1, 5),
-  Array(5, 6, -1

[GitHub] spark pull request: [SPARK-9540] [MLLIB] optimize PrefixSpan imple...

2015-08-04 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7937#discussion_r36263427
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala ---
@@ -35,83 +35,81 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 */
 
 val sequences = Array(
-  Array(1, -1, 3, -1, 4, -1, 5),
-  Array(2, -1, 3, -1, 1),
-  Array(2, -1, 4, -1, 1),
-  Array(3, -1, 1, -1, 3, -1, 4, -1, 5),
-  Array(3, -1, 4, -1, 4, -1, 3),
-  Array(6, -1, 5, -1, 3))
+  Array(0, 1, 0, 3, 0, 4, 0, 5, 0),
--- End diff --

If we use Array(1, 0, 3, 0, 4, 0, 5), we can save two integer space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-29 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35828463
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -44,27 +46,43 @@ class PrefixSpan private (
 private var maxPatternLength: Int) extends Logging with Serializable {
 
   /**
+   * The maximum number of items allowed in a projected database before 
local processing. If a
+   * projected database exceeds this size, another iteration of 
distributed PrefixSpan is run.
+   */
+  private val maxLocalProjDBSize: Long = 1
+
+  /**
* Constructs a default instance with default parameters
* {minSupport: `0.1`, maxPatternLength: `10`}.
*/
   def this() = this(0.1, 10)
 
   /**
+   * Get the minimal support (i.e. the frequency of occurrence before a 
pattern is considered
+   * frequent).
+   */
+  def getMinSupport(): Double = this.minSupport
+
+  /**
* Sets the minimal support level (default: `0.1`).
*/
   def setMinSupport(minSupport: Double): this.type = {
-require(minSupport >= 0 && minSupport <= 1,
-  "The minimum support value must be between 0 and 1, including 0 and 
1.")
+require(minSupport >= 0 && minSupport <= 1, "The minimum support value 
must be in [0, 1].")
 this.minSupport = minSupport
 this
   }
 
   /**
+   * Gets the maximal pattern length (i.e. the length of the longest 
sequential pattern to consider.
+   */
+  def getMaxPatternLength(): Double = this.maxPatternLength
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-29 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35828436
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -44,27 +46,43 @@ class PrefixSpan private (
 private var maxPatternLength: Int) extends Logging with Serializable {
 
   /**
+   * The maximum number of items allowed in a projected database before 
local processing. If a
+   * projected database exceeds this size, another iteration of 
distributed PrefixSpan is run.
+   */
+  private val maxLocalProjDBSize: Long = 1
+
+  /**
* Constructs a default instance with default parameters
* {minSupport: `0.1`, maxPatternLength: `10`}.
*/
   def this() = this(0.1, 10)
 
   /**
+   * Get the minimal support (i.e. the frequency of occurrence before a 
pattern is considered
+   * frequent).
+   */
+  def getMinSupport(): Double = this.minSupport
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-29 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35828391
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -44,27 +46,43 @@ class PrefixSpan private (
 private var maxPatternLength: Int) extends Logging with Serializable {
 
   /**
+   * The maximum number of items allowed in a projected database before 
local processing. If a
+   * projected database exceeds this size, another iteration of 
distributed PrefixSpan is run.
+   */
+  private val maxLocalProjDBSize: Long = 1
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-125079131
  
@feynmanliang You are right, it is worth to prevent executor failure. I 
very much agree with it. And I tested it according to your suggestion.
The results of the performance test are not stable, I try to find out the 
reason, perhaps due to the environment, I will post it after solving this 
problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-125075528
  
@feynmanliang About splitPrefixSuffixPairs,  I compared these two methods. 
I find your method's running time more than mine. And the result is not 
correct. I don't know why it was so, please check it, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35504114
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
+  splitPrefixSuffixPairs(prefixSuffixPairs)
+largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+var patternLength: Int = 1
+while (patternLength < maxPatternLength &&
+  largePrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
largePrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count()
+  largePrefixSuffixPairs.unpersist()
+  val splitedPrefixSuffixPairs = 
splitPrefixSuffixPairs(nextPrefixSuffixPairs)
+  largePrefixSuffixPairs = splitedPrefixSuffixPairs._2
+  largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+  smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ 
splitedPrefixSuffixPairs._1
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+  patternLength = patternLength + 1
+}
+if (smallPrefixSuffixPairs.count() > 0) {
+  val projectedDatabase = smallPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, 
count) }
+  }
+
+
+  /**
+   * Split prefix suffix pairs to two parts:
+   * suffixes' size less than maxSuffixesBeforeLocalProcessing and
+   * suffixes' size more than maxSuffixesBeforeLocalProcessing
+   * @param prefixSuffixPairs prefix (length n) and suffix pairs,
+   * @return small size prefix suffix pairs and big size prefix suffix 
pairs
+   * (RDD[prefix, suffix], RDD[prefix, suffix ])
+   */
+  private def splitPrefixSuffixPairs(
+  prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]):
+  (RDD[(ArrayBuffer[Int], Array[Int])], RDD[(ArrayBuffer[Int], 
Array[Int])]) = {
+val suffixSizeMap = prefixSuffixPairs
+  .map(x => (x._1, x._2.length))
+  .reduceByKey(_ + _)
--- End diff --

@feynmanliang  I compared these two methods. I find your method's running 
time more  than mine. And the result is not correct. I don't know why it was 
so, please check it, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35502442
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
+  splitPrefixSuffixPairs(prefixSuffixPairs)
+largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
--- End diff --

OK, I tested, and the prefixSuffixPairs.persist (7s)  is better than 
largePrefixSuffixPairs.persist (11s) .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35500886
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -43,6 +45,8 @@ class PrefixSpan private (
 private var minSupport: Double,
 private var maxPatternLength: Int) extends Logging with Serializable {
 
+  private val maxSuffixesBeforeLocalProcessing: Long = 1
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35500812
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
+  splitPrefixSuffixPairs(prefixSuffixPairs)
+largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+var patternLength: Int = 1
+while (patternLength < maxPatternLength &&
+  largePrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
largePrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count()
+  largePrefixSuffixPairs.unpersist()
+  val splitedPrefixSuffixPairs = 
splitPrefixSuffixPairs(nextPrefixSuffixPairs)
+  largePrefixSuffixPairs = splitedPrefixSuffixPairs._2
+  largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+  smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ 
splitedPrefixSuffixPairs._1
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+  patternLength = patternLength + 1
+}
+if (smallPrefixSuffixPairs.count() > 0) {
+  val projectedDatabase = smallPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35500699
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
+  splitPrefixSuffixPairs(prefixSuffixPairs)
+largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+var patternLength: Int = 1
+while (patternLength < maxPatternLength &&
+  largePrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
largePrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count()
+  largePrefixSuffixPairs.unpersist()
+  val splitedPrefixSuffixPairs = 
splitPrefixSuffixPairs(nextPrefixSuffixPairs)
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35500517
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
+  splitPrefixSuffixPairs(prefixSuffixPairs)
+largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+var patternLength: Int = 1
+while (patternLength < maxPatternLength &&
+  largePrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
largePrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count()
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35500511
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
+  splitPrefixSuffixPairs(prefixSuffixPairs)
+largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+var patternLength: Int = 1
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35500442
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35500446
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
+  splitPrefixSuffixPairs(prefixSuffixPairs)
+largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+var patternLength: Int = 1
+while (patternLength < maxPatternLength &&
+  largePrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
largePrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count()
+  largePrefixSuffixPairs.unpersist()
+  val splitedPrefixSuffixPairs = 
splitPrefixSuffixPairs(nextPrefixSuffixPairs)
+  largePrefixSuffixPairs = splitedPrefixSuffixPairs._2
+  largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+  smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ 
splitedPrefixSuffixPairs._1
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+  patternLength = patternLength + 1
+}
+if (smallPrefixSuffixPairs.count() > 0) {
+  val projectedDatabase = smallPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, 
count) }
+  }
+
+
+  /**
+   * Split prefix suffix pairs to two parts:
+   * suffixes' size less than maxSuffixesBeforeLocalProcessing and
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-26 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r35500444
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,106 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
+  splitPrefixSuffixPairs(prefixSuffixPairs)
+largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+var patternLength: Int = 1
+while (patternLength < maxPatternLength &&
+  largePrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
largePrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count()
+  largePrefixSuffixPairs.unpersist()
+  val splitedPrefixSuffixPairs = 
splitPrefixSuffixPairs(nextPrefixSuffixPairs)
+  largePrefixSuffixPairs = splitedPrefixSuffixPairs._2
+  largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
+  smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ 
splitedPrefixSuffixPairs._1
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8999][MLlib]Support non-temporal sequen...

2015-07-25 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7594#issuecomment-124829693
  
@mengxr I compared two approaches (Array[Array[Int]] and Array[Int]), I 
found Array[Array[Int]] performance is better than Array[Int].
The dataset I used is BMSWebView2( KDD CUP 2000). When the support is 1500, 
the running time of Array[Array[Int]] is 44s, the running time of Array[Int] is 
69s. These time values are the average of three measurements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8999][MLlib]Support non-temporal sequen...

2015-07-24 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7646#issuecomment-124565961
  
@mengxr About benchmark data sets for performance testing, can you give 
some suggestions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8999][MLlib]Support non-temporal sequen...

2015-07-24 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7646#issuecomment-124564177
  
@mengxr @feynmanliang Please review. Thx.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8999][MLlib]Support non-temporal sequen...

2015-07-24 Thread zhangjiajin
GitHub user zhangjiajin opened a pull request:

https://github.com/apache/spark/pull/7646

[SPARK-8999][MLlib]Support non-temporal sequence in PrefixSpan (Array[Int])

Support non-temporal sequence in PrefixSpan (Array[Int])

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangjiajin/spark multiItems_2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7646.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7646


commit c6ceb63a557c1d9c3dcccf44a16ab32528b012f2
Author: zhangjiajin 
Date:   2015-07-07T07:30:10Z

Add new algorithm PrefixSpan and test file.

commit d9d8137c157374f9d463c0ee387536ed7448ca5f
Author: zhang jiajin 
Date:   2015-07-08T10:22:16Z

Delete Prefixspan.scala

Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala

commit c1d13d01f218b5f7ad3a68fa29202b2839090f7f
Author: zhang jiajin 
Date:   2015-07-08T10:23:31Z

Delete PrefixspanSuite.scala

Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete 
PrefixspanSuite.scala.

commit a7e50d43fac419e1aba3e668b71b1d08bef0
Author: zhangjiajin 
Date:   2015-07-14T02:21:04Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

commit f06772fc1347fe412a1ccdb9c27df8d0573ca462
Author: zhangjiajin 
Date:   2015-07-14T02:46:05Z

fix a scala style error.

commit b572f54147652077dc198eb6ecf041b0ba8bc63e
Author: zhangjiajin 
Date:   2015-07-15T02:57:41Z

initialize file before rebase.

commit 216ab0cc98bc1f8fd27e97f021640d76c153b860
Author: zhangjiajin 
Date:   2015-07-24T15:39:33Z

Support non-temporal sequence in PrefixSpan




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8999][MLlib]Support non-temporal sequen...

2015-07-22 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7594#issuecomment-123922017
  
@mengxr OK. I will test two approaches (Array[Array[Int]] and Array[Int]). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8999][MLlib]Support non-temporal sequen...

2015-07-22 Thread zhangjiajin
GitHub user zhangjiajin opened a pull request:

https://github.com/apache/spark/pull/7594

[SPARK-8999][MLlib]Support non-temporal sequence in PrefixSpan

Support non-temporal sequence in PrefixSpan

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangjiajin/spark multiItems

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7594.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7594


commit c6ceb63a557c1d9c3dcccf44a16ab32528b012f2
Author: zhangjiajin 
Date:   2015-07-07T07:30:10Z

Add new algorithm PrefixSpan and test file.

commit d9d8137c157374f9d463c0ee387536ed7448ca5f
Author: zhang jiajin 
Date:   2015-07-08T10:22:16Z

Delete Prefixspan.scala

Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala

commit c1d13d01f218b5f7ad3a68fa29202b2839090f7f
Author: zhang jiajin 
Date:   2015-07-08T10:23:31Z

Delete PrefixspanSuite.scala

Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete 
PrefixspanSuite.scala.

commit a7e50d43fac419e1aba3e668b71b1d08bef0
Author: zhangjiajin 
Date:   2015-07-14T02:21:04Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

commit f06772fc1347fe412a1ccdb9c27df8d0573ca462
Author: zhangjiajin 
Date:   2015-07-14T02:46:05Z

fix a scala style error.

commit b572f54147652077dc198eb6ecf041b0ba8bc63e
Author: zhangjiajin 
Date:   2015-07-15T02:57:41Z

initialize file before rebase.

commit 9ed36d57ed82054d2be2fc30e7980aca4a90a0ba
Author: zhangjiajin 
Date:   2015-07-22T09:53:35Z

Support non-temporal sequence in PrefixSpan




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-20 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-122877168
  
@feynmanliang @mengxr I'm working on performance test, I compared the my 
first version, the performance has greatly improved. But the most recent update 
(add maxSuffixesBeforeLocalProcessing), the performance decrease very badly, 
please help me take a look at the code for the current, see how to optimize? 
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-17 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34943941
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,69 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
--- End diff --

OK. Because the pairs maybe very big, so, I use 
persist(StorageLevel.MEMORY_AND_DISK).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-17 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34943942
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +86,69 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(ArrayBuffer(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+var patternLength: Int = 1
+while (patternLength < maxPatternLength &&
+  patternsCount <= minPatternsBeforeLocalProcessing &&
+  currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count()
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
--- End diff --

OK. Because the pairs maybe very big, so, I use 
persist(StorageLevel.MEMORY_AND_DISK).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-17 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-122459599
  
@feynmanliang  If the original prefix is "ABC" before calling 
LocalPrefixSpan.run. After calling LocalPrefixSpan.run, the prefix is "EDABC", 
reverse it, the prefix is "CBADE", but the correct prefix is "ABCDE", it's 
wrong. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-17 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-122261569
  
@feynmanliang 
In the file "LocalPrefixSpan.scala", I have a question:

L48:   val newPrefixes = item :: prefixes
The elements in newPrefixes is reversed. 

Why not :  val newPrefixes = prefixes :+ item
What's the different ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-16 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-122143351
  
I'm confused, the groupBy just reorganize data, not generate new data, why 
does excutor overload after shuffling ?

The following diagrams are from paper "Mining Sequential Patterns by 
Pattern-Growth: The PrefixSpan Approach":


![image](https://cloud.githubusercontent.com/assets/13159256/8738656/262676ec-2c65-11e5-9c7e-5e79e5a03b38.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-16 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-121902394
  
@feynmanliang  This assumes that all the values (suffixes) associated to a 
key (prefix) will fit on an executor, but I don't think that patternsCount > 
minPatternsBeforeShuffle will guarantee that. Better to count the suffixes for 
each prefix using aggregateByKey before doing local processing.

The minPatternsBeforeLocalProcessing is related to the executors. The 
suffixes threshold (call it maxSuffixesThreshold) is related with the input 
sequences size. How to set the default value of maxSuffixesThreshold ?

You worry about a exector will overload. If an executor has multiple prefix 
maybe reduce the influence of this problem. For example, the number of 
executors is 4, the minPatternsBeforeLocalProcessing is 20, means one executor 
have 5 random prefixes and its suffixes.

The following diagram show the two methods' different: 


![234](https://cloud.githubusercontent.com/assets/13159256/8719603/74bf13c6-2bdf-11e5-9f5d-877fae4fc4c8.PNG)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34752412
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
+   * @return pattern and counts, and prefix suffix pairs
+   * (Array[pattern, count], RDD[prefix, suffix ])
+   */
+  private def getPatternCountsAndPrefixSuffixPairs(
+  minCount: Long,
+  prefixSuffixPairs: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = prefixSuffixPairs
+  .flatMap { case (prefix, suffix) =>
+  suffix.distinct.map(y => ((prefix.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map{ case ((prefix, item), count) => (prefix.toArray :+ item, 
count) }
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34750033
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
+   * @return pattern and counts, and prefix suffix pairs
+   * (Array[pattern, count], RDD[prefix, suffix ])
+   */
+  private def getPatternCountsAndPrefixSuffixPairs(
+  minCount: Long,
+  prefixSuffixPairs: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = prefixSuffixPairs
+  .flatMap { case (prefix, suffix) =>
+  suffix.distinct.map(y => ((prefix.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map{ case ((prefix, item), count) => (prefix.toArray :+ item, 
count) }
+val prefixlength = prefixSuffixPairs.first()._1.length
+if (prefixlength + 1 >= maxPatternLength) {
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34747088
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
+   * @return pattern and counts, and prefix suffix pairs
+   * (Array[pattern, count], RDD[prefix, suffix ])
+   */
+  private def getPatternCountsAndPrefixSuffixPairs(
+  minCount: Long,
+  prefixSuffixPairs: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = prefixSuffixPairs
+  .flatMap { case (prefix, suffix) =>
+  suffix.distinct.map(y => ((prefix.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map{ case ((prefix, item), count) => (prefix.toArray :+ item, 
count) }
+val prefixlength = prefixSuffixPairs.first()._1.length
+if (prefixlength + 1 >= maxPatternLength) {
+  (patternAndCounts, prefixSuffixPairs.filter(x => false))
+} else {
+  val frequentItemsMap = prefixAndFreqentItemAndCounts
+.keys
+.groupByKey()
+.mapValues(_.toSet)
+.collect
+.toMap
+  val nextPrefixSuffixPairs = prefixSuffixPairs
+.filter(x => frequentItemsMap.contains(x._1))
+.flatMap { case (prefix, suffix) =>
+val frequentItemSet = frequentItemsMap(prefix)
+val filteredSuffix = suffix.filter(frequentItemSet.contains(_))
+val nextSuffixes = frequentItemSet.map{ item =>
--- End diff --

OK. Great skill.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34746404
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
+   * @return pattern and counts, and prefix suffix pairs
+   * (Array[pattern, count], RDD[prefix, suffix ])
+   */
+  private def getPatternCountsAndPrefixSuffixPairs(
+  minCount: Long,
+  prefixSuffixPairs: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = prefixSuffixPairs
+  .flatMap { case (prefix, suffix) =>
+  suffix.distinct.map(y => ((prefix.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map{ case ((prefix, item), count) => (prefix.toArray :+ item, 
count) }
+val prefixlength = prefixSuffixPairs.first()._1.length
+if (prefixlength + 1 >= maxPatternLength) {
+  (patternAndCounts, prefixSuffixPairs.filter(x => false))
+} else {
+  val frequentItemsMap = prefixAndFreqentItemAndCounts
+.keys
+.groupByKey()
+.mapValues(_.toSet)
+.collect
+.toMap
+  val nextPrefixSuffixPairs = prefixSuffixPairs
+.filter(x => frequentItemsMap.contains(x._1))
+.flatMap { case (prefix, suffix) =>
+val frequentItemSet = frequentItemsMap(prefix)
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34745701
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
+   * @return pattern and counts, and prefix suffix pairs
+   * (Array[pattern, count], RDD[prefix, suffix ])
+   */
+  private def getPatternCountsAndPrefixSuffixPairs(
+  minCount: Long,
+  prefixSuffixPairs: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = prefixSuffixPairs
+  .flatMap { case (prefix, suffix) =>
+  suffix.distinct.map(y => ((prefix.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map{ case ((prefix, item), count) => (prefix.toArray :+ item, 
count) }
+val prefixlength = prefixSuffixPairs.first()._1.length
+if (prefixlength + 1 >= maxPatternLength) {
+  (patternAndCounts, prefixSuffixPairs.filter(x => false))
+} else {
+  val frequentItemsMap = prefixAndFreqentItemAndCounts
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34745388
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
+   * @return pattern and counts, and prefix suffix pairs
+   * (Array[pattern, count], RDD[prefix, suffix ])
+   */
+  private def getPatternCountsAndPrefixSuffixPairs(
+  minCount: Long,
+  prefixSuffixPairs: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = prefixSuffixPairs
+  .flatMap { case (prefix, suffix) =>
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34745262
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
+   * @return pattern and counts, and prefix suffix pairs
+   * (Array[pattern, count], RDD[prefix, suffix ])
+   */
+  private def getPatternCountsAndPrefixSuffixPairs(
+  minCount: Long,
+  prefixSuffixPairs: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = prefixSuffixPairs
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34745193
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34745225
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentPrefixSuffixPairs = nextPrefixSuffixPairs
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val projectedDatabase = currentPrefixSuffixPairs
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
projectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and prefix suffix pairs
+   * @param minCount minimum count
+   * @param prefixSuffixPairs prefix and suffix pairs,
+   * @return pattern and counts, and prefix suffix pairs
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34745105
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -82,20 +84,70 @@ class PrefixSpan private (
   logWarning("Input data is not cached.")
 }
 val minCount = getMinCount(sequences)
-val lengthOnePatternsAndCounts =
-  getFreqItemAndCounts(minCount, sequences).collect()
-val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
-  lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, 
sequences)
+val prefixSuffixPairs = getPrefixSuffixPairs(
+  lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
+var patternsCount: Long = lengthOnePatternsAndCounts.count()
+var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => 
(Array(x._1), x._2))
+var currentPrefixSuffixPairs = prefixSuffixPairs
+while (patternsCount <= minPatternsBeforeShuffle && 
currentPrefixSuffixPairs.count() != 0) {
+  val (nextPatternAndCounts, nextPrefixSuffixPairs) =
+getPatternCountsAndPrefixSuffixPairs(minCount, 
currentPrefixSuffixPairs)
+  patternsCount = nextPatternAndCounts.count().toInt
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34745070
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -43,6 +43,8 @@ class PrefixSpan private (
 private var minSupport: Double,
 private var maxPatternLength: Int) extends Logging with Serializable {
 
+  private val minPatternsBeforeShuffle: Int = 20
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-121557045
  
@feynmanliang If we want get the size of projected database, we must group 
by the prefix and suffix pairs. When the prefix length is small, and sequences 
are very long, and the size of items is small, the size of projected database 
equals the size of patterns. Otherwise, the size of projected database is small 
than the size of patterns.

![123](https://cloud.githubusercontent.com/assets/13159256/8695636/ce1bedda-2b18-11e5-8acd-1f08e161dca5.PNG)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34657764
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34657699
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and projected database
+   * @param minCount minimum count
+   * @param prefixAndProjectedDatabase prefix and projected database,
+   * @return pattern and counts, and projected database
+   * (Array[pattern, count], RDD[prefix, projected database ])
+   */
+  private def getPatternCountsAndProjectedDatabase(
+  minCount: Long,
+  prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = 
prefixAndProjectedDatabase.flatMap{ x =>
+  x._2.distinct.map(y => ((x._1.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))
+val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length
--- End diff --

It is not easy that we move prefix length check into the loop on L96.
The current process:

1. get length-1 patterns
2. get length-1 prefix suffix pairs
3. while ( (current patterns count < minPatternsBeforeShuffle) and (prefix 
suffix pairs is not empty) ) {
4.   get next pattern patterns and prefix suffix pairs
5. }
6. shuffle and next work

if we move the check into the loop, we must split 
getPatternCountsAndPrefixSuffixPairs() to two method: getPatternCounts() and 
getPrefixSuffixPairs(),
ps. the method of get length-1 patterns and prefix suffix pairs are 
different from get length-n (n>1) patterns and prefix suffix pairs.

1. get length-1 patterns
2. get length-1 prefix suffix pairs
3. get length-2 patterns ( call new method getPatternCounts()  )
4. while ( (current patterns count < minPatternsBeforeShuffle) and (prefix 
suffix pairs is not empty) and (pattern length < maxPatternLength) ) {
5.   get next prefix suffix pairs
6.   get next pattern patterns
7. }
8. get lenght-n+1 suffix pairs for shuffle
9. shuffle and next work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34653266
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and projected database
+   * @param minCount minimum count
+   * @param prefixAndProjectedDatabase prefix and projected database,
+   * @return pattern and counts, and projected database
+   * (Array[pattern, count], RDD[prefix, projected database ])
+   */
+  private def getPatternCountsAndProjectedDatabase(
+  minCount: Long,
+  prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = 
prefixAndProjectedDatabase.flatMap{ x =>
+  x._2.distinct.map(y => ((x._1.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))
+val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length
+if (prefixlength + 1 >= maxPatternLength) {
+  (patternAndCounts, prefixAndProjectedDatabase.filter(x => false))
+} else {
+  val frequentItemsMap = prefixAndFreqentItemAndCounts
+.keys.map(x => (x._1, x._2))
+.groupByKey()
+.mapValues(_.toSet)
+.collect
+.toMap
+  val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase
+.filter(x => frequentItemsMap.contains(x._1))
+.flatMap { x =>
+val frequentItemSet = frequentItemsMap(x._1)
+val filteredSequence = x._2.filter(frequentItemSet.contains(_))
+val subProjectedDabase = frequentItemSet.map{ y =>
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34652323
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and projected database
+   * @param minCount minimum count
+   * @param prefixAndProjectedDatabase prefix and projected database,
+   * @return pattern and counts, and projected database
+   * (Array[pattern, count], RDD[prefix, projected database ])
+   */
+  private def getPatternCountsAndProjectedDatabase(
+  minCount: Long,
+  prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = 
prefixAndProjectedDatabase.flatMap{ x =>
+  x._2.distinct.map(y => ((x._1.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))
+val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length
+if (prefixlength + 1 >= maxPatternLength) {
+  (patternAndCounts, prefixAndProjectedDatabase.filter(x => false))
+} else {
+  val frequentItemsMap = prefixAndFreqentItemAndCounts
+.keys.map(x => (x._1, x._2))
+.groupByKey()
+.mapValues(_.toSet)
+.collect
+.toMap
+  val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase
+.filter(x => frequentItemsMap.contains(x._1))
+.flatMap { x =>
+val frequentItemSet = frequentItemsMap(x._1)
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34652312
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and projected database
+   * @param minCount minimum count
+   * @param prefixAndProjectedDatabase prefix and projected database,
+   * @return pattern and counts, and projected database
+   * (Array[pattern, count], RDD[prefix, projected database ])
+   */
+  private def getPatternCountsAndProjectedDatabase(
+  minCount: Long,
+  prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = 
prefixAndProjectedDatabase.flatMap{ x =>
+  x._2.distinct.map(y => ((x._1.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))
+val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length
+if (prefixlength + 1 >= maxPatternLength) {
+  (patternAndCounts, prefixAndProjectedDatabase.filter(x => false))
+} else {
+  val frequentItemsMap = prefixAndFreqentItemAndCounts
+.keys.map(x => (x._1, x._2))
+.groupByKey()
+.mapValues(_.toSet)
+.collect
+.toMap
+  val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase
+.filter(x => frequentItemsMap.contains(x._1))
+.flatMap { x =>
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34652339
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and projected database
+   * @param minCount minimum count
+   * @param prefixAndProjectedDatabase prefix and projected database,
+   * @return pattern and counts, and projected database
+   * (Array[pattern, count], RDD[prefix, projected database ])
+   */
+  private def getPatternCountsAndProjectedDatabase(
+  minCount: Long,
+  prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = 
prefixAndProjectedDatabase.flatMap{ x =>
+  x._2.distinct.map(y => ((x._1.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))
+val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length
+if (prefixlength + 1 >= maxPatternLength) {
+  (patternAndCounts, prefixAndProjectedDatabase.filter(x => false))
+} else {
+  val frequentItemsMap = prefixAndFreqentItemAndCounts
+.keys.map(x => (x._1, x._2))
+.groupByKey()
+.mapValues(_.toSet)
+.collect
+.toMap
+  val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase
+.filter(x => frequentItemsMap.contains(x._1))
+.flatMap { x =>
+val frequentItemSet = frequentItemsMap(x._1)
+val filteredSequence = x._2.filter(frequentItemSet.contains(_))
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34652091
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and projected database
+   * @param minCount minimum count
+   * @param prefixAndProjectedDatabase prefix and projected database,
+   * @return pattern and counts, and projected database
+   * (Array[pattern, count], RDD[prefix, projected database ])
+   */
+  private def getPatternCountsAndProjectedDatabase(
+  minCount: Long,
+  prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = 
prefixAndProjectedDatabase.flatMap{ x =>
+  x._2.distinct.map(y => ((x._1.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))
+val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length
+if (prefixlength + 1 >= maxPatternLength) {
+  (patternAndCounts, prefixAndProjectedDatabase.filter(x => false))
+} else {
+  val frequentItemsMap = prefixAndFreqentItemAndCounts
+.keys.map(x => (x._1, x._2))
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34650869
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and projected database
+   * @param minCount minimum count
+   * @param prefixAndProjectedDatabase prefix and projected database,
+   * @return pattern and counts, and projected database
+   * (Array[pattern, count], RDD[prefix, projected database ])
+   */
+  private def getPatternCountsAndProjectedDatabase(
+  minCount: Long,
+  prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]):
+  (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = {
+val prefixAndFreqentItemAndCounts = 
prefixAndProjectedDatabase.flatMap{ x =>
+  x._2.distinct.map(y => ((x._1.toSeq, y), 1L))
+}.reduceByKey(_ + _)
+  .filter(_._2 >= minCount)
+val patternAndCounts = prefixAndFreqentItemAndCounts
+  .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34650625
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-15 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34650635
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
+  currentProjectedDatabase = nextProjectedDatabase
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+if (patternsCount > 0) {
+  val groupedProjectedDatabase = currentProjectedDatabase
+.map(x => (x._1.toSeq, x._2))
+.groupByKey()
+.map(x => (x._1.toArray, x._2.toArray))
+  val nextPatternAndCounts = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
+  allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
+}
+allPatternAndCounts
+  }
+
+  /**
+   * Get the pattern and counts, and projected database
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34650468
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
+  lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+var currentProjectedDatabase = prefixAndProjectedDatabase
+while (patternsCount <= minPatternsBeforeShuffle &&
+  currentProjectedDatabase.count() != 0) {
+  val (nextPatternAndCounts, nextProjectedDatabase) =
+getPatternCountsAndProjectedDatabase(minCount, 
currentProjectedDatabase)
+  patternsCount = nextPatternAndCounts.count().toInt
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34650374
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
   lengthOnePatternsAndCounts.map(_._1), sequences)
-val groupedProjectedDatabase = prefixAndProjectedDatabase
-  .map(x => (x._1.toSeq, x._2))
-  .groupByKey()
-  .map(x => (x._1.toArray, x._2.toArray))
-val nextPatterns = getPatternsInLocal(minCount, 
groupedProjectedDatabase)
-val lengthOnePatternsAndCountsRdd =
-  sequences.sparkContext.parallelize(
-lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
-val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
-allPatterns
+
+var patternsCount = lengthOnePatternsAndCounts.length
+var allPatternAndCounts = sequences.sparkContext.parallelize(
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7412#discussion_r34649591
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -86,16 +88,69 @@ class PrefixSpan private (
   getFreqItemAndCounts(minCount, sequences).collect()
 val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
--- End diff --

OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7412#issuecomment-121480109
  
@mengxr This is new PR, please review it. TKS.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
GitHub user zhangjiajin opened a pull request:

https://github.com/apache/spark/pull/7412

[SPARK-8998][MLlib] Collect enough frequent prefixes before projection in 
PrefixSpan (new)

Collect enough frequent prefixes before projection in PrefixSpan

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangjiajin/spark CollectEnoughPrefixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7412


commit 91fd7e66d0c363e68bc9ebe2bf3e03c26ef348d2
Author: zhangjiajin 
Date:   2015-07-07T07:30:10Z

Add new algorithm PrefixSpan and test file.

commit 575995f69dadad825d97f2248599eb62c1743fe7
Author: zhangjiajin 
Date:   2015-07-08T09:07:37Z

Modified the code according to the review comments.

commit 951fd424ff189f9bf5619a84f3f19e942f592396
Author: zhang jiajin 
Date:   2015-07-08T10:22:16Z

Delete Prefixspan.scala

Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala

commit a2eb14c7fb6abb70eaa046baf78da205c7a4ca7d
Author: zhang jiajin 
Date:   2015-07-08T10:23:31Z

Delete PrefixspanSuite.scala

Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete 
PrefixspanSuite.scala.

commit 89bc368f76c40ad0090a928cec49cd9d28ce666e
Author: zhangjiajin 
Date:   2015-07-08T10:50:38Z

Fixed a Scala style error.

commit 1dd33ad82499b9ad1b446b96f2f88519ffbe9a1b
Author: zhangjiajin 
Date:   2015-07-09T14:40:29Z

Modified the code according to the review comments.

commit 4c60fb36148206abd67fe51cea667ee3d63e490e
Author: zhangjiajin 
Date:   2015-07-09T15:01:45Z

Fix some Scala style errors.

commit ba5df346543e9aee119bd781b257860b65bbe7df
Author: zhangjiajin 
Date:   2015-07-09T15:10:25Z

Fix a Scala style error.

commit 574e56ccfb271d0ed86c3eba95d1a11a8688495d
Author: zhangjiajin 
Date:   2015-07-10T11:49:06Z

Add new object LocalPrefixSpan, and do some optimization.

commit ca9c4c8fa84202d8d533c51c277138461ba096a7
Author: zhangjiajin 
Date:   2015-07-11T02:40:24Z

Modified the code according to the review comments.

commit 22b0ef463beb0e0fe9cc696989245da79722a3a6
Author: zhangjiajin 
Date:   2015-07-14T02:21:04Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

commit 078d4101f56c68c6f191de57f9e542a80f2c89b5
Author: zhangjiajin 
Date:   2015-07-14T02:46:05Z

fix a scala style error.

commit 4dd1c8a2393b91dc1841c3b01dad7163371dd434
Author: zhangjiajin 
Date:   2015-07-15T02:57:41Z

initialize file before rebase.

commit a8fde870aae9f5fe31ac04a50da20ec906626826
Author: zhangjiajin 
Date:   2015-07-15T03:25:34Z

Merge branch 'master' of https://github.com/apache/spark

Initilize local master branch.

commit 6560c6916edeff900e54c6b5ee5b7c44cac87724
Author: zhangjiajin 
Date:   2015-07-15T03:44:42Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixeSpan




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin closed the pull request at:

https://github.com/apache/spark/pull/7383


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7383#issuecomment-121478653
  
@mengxr OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
GitHub user zhangjiajin reopened a pull request:

https://github.com/apache/spark/pull/7383

[SPARK-8998][MLlib] Collect enough frequent prefixes before projection in 
PrefixSpan

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangjiajin/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7383.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7383


commit 91fd7e66d0c363e68bc9ebe2bf3e03c26ef348d2
Author: zhangjiajin 
Date:   2015-07-07T07:30:10Z

Add new algorithm PrefixSpan and test file.

commit 575995f69dadad825d97f2248599eb62c1743fe7
Author: zhangjiajin 
Date:   2015-07-08T09:07:37Z

Modified the code according to the review comments.

commit 951fd424ff189f9bf5619a84f3f19e942f592396
Author: zhang jiajin 
Date:   2015-07-08T10:22:16Z

Delete Prefixspan.scala

Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala

commit a2eb14c7fb6abb70eaa046baf78da205c7a4ca7d
Author: zhang jiajin 
Date:   2015-07-08T10:23:31Z

Delete PrefixspanSuite.scala

Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete 
PrefixspanSuite.scala.

commit 89bc368f76c40ad0090a928cec49cd9d28ce666e
Author: zhangjiajin 
Date:   2015-07-08T10:50:38Z

Fixed a Scala style error.

commit 1dd33ad82499b9ad1b446b96f2f88519ffbe9a1b
Author: zhangjiajin 
Date:   2015-07-09T14:40:29Z

Modified the code according to the review comments.

commit 4c60fb36148206abd67fe51cea667ee3d63e490e
Author: zhangjiajin 
Date:   2015-07-09T15:01:45Z

Fix some Scala style errors.

commit ba5df346543e9aee119bd781b257860b65bbe7df
Author: zhangjiajin 
Date:   2015-07-09T15:10:25Z

Fix a Scala style error.

commit 574e56ccfb271d0ed86c3eba95d1a11a8688495d
Author: zhangjiajin 
Date:   2015-07-10T11:49:06Z

Add new object LocalPrefixSpan, and do some optimization.

commit ca9c4c8fa84202d8d533c51c277138461ba096a7
Author: zhangjiajin 
Date:   2015-07-11T02:40:24Z

Modified the code according to the review comments.

commit 22b0ef463beb0e0fe9cc696989245da79722a3a6
Author: zhangjiajin 
Date:   2015-07-14T02:21:04Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

commit 078d4101f56c68c6f191de57f9e542a80f2c89b5
Author: zhangjiajin 
Date:   2015-07-14T02:46:05Z

fix a scala style error.

commit 4dd1c8a2393b91dc1841c3b01dad7163371dd434
Author: zhangjiajin 
Date:   2015-07-15T02:57:41Z

initialize file before rebase.

commit a8fde870aae9f5fe31ac04a50da20ec906626826
Author: zhangjiajin 
Date:   2015-07-15T03:25:34Z

Merge branch 'master' of https://github.com/apache/spark

Initilize local master branch.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin closed the pull request at:

https://github.com/apache/spark/pull/7383


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
GitHub user zhangjiajin reopened a pull request:

https://github.com/apache/spark/pull/7383

[SPARK-8998][MLlib] Collect enough frequent prefixes before projection in 
PrefixSpan

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangjiajin/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7383.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7383


commit 91fd7e66d0c363e68bc9ebe2bf3e03c26ef348d2
Author: zhangjiajin 
Date:   2015-07-07T07:30:10Z

Add new algorithm PrefixSpan and test file.

commit 575995f69dadad825d97f2248599eb62c1743fe7
Author: zhangjiajin 
Date:   2015-07-08T09:07:37Z

Modified the code according to the review comments.

commit 951fd424ff189f9bf5619a84f3f19e942f592396
Author: zhang jiajin 
Date:   2015-07-08T10:22:16Z

Delete Prefixspan.scala

Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala

commit a2eb14c7fb6abb70eaa046baf78da205c7a4ca7d
Author: zhang jiajin 
Date:   2015-07-08T10:23:31Z

Delete PrefixspanSuite.scala

Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete 
PrefixspanSuite.scala.

commit 89bc368f76c40ad0090a928cec49cd9d28ce666e
Author: zhangjiajin 
Date:   2015-07-08T10:50:38Z

Fixed a Scala style error.

commit 1dd33ad82499b9ad1b446b96f2f88519ffbe9a1b
Author: zhangjiajin 
Date:   2015-07-09T14:40:29Z

Modified the code according to the review comments.

commit 4c60fb36148206abd67fe51cea667ee3d63e490e
Author: zhangjiajin 
Date:   2015-07-09T15:01:45Z

Fix some Scala style errors.

commit ba5df346543e9aee119bd781b257860b65bbe7df
Author: zhangjiajin 
Date:   2015-07-09T15:10:25Z

Fix a Scala style error.

commit 574e56ccfb271d0ed86c3eba95d1a11a8688495d
Author: zhangjiajin 
Date:   2015-07-10T11:49:06Z

Add new object LocalPrefixSpan, and do some optimization.

commit ca9c4c8fa84202d8d533c51c277138461ba096a7
Author: zhangjiajin 
Date:   2015-07-11T02:40:24Z

Modified the code according to the review comments.

commit 22b0ef463beb0e0fe9cc696989245da79722a3a6
Author: zhangjiajin 
Date:   2015-07-14T02:21:04Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

commit 078d4101f56c68c6f191de57f9e542a80f2c89b5
Author: zhangjiajin 
Date:   2015-07-14T02:46:05Z

fix a scala style error.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin closed the pull request at:

https://github.com/apache/spark/pull/7258


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-14 Thread zhangjiajin
GitHub user zhangjiajin reopened a pull request:

https://github.com/apache/spark/pull/7258

[SPARK-6487][MLlib] Add sequential pattern mining algorithm PrefixSpan to 
Spark MLlib

Add parallel PrefixSpan algorithm and test file.
Support non-temporal sequences.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangjiajin/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7258.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7258


commit 91fd7e66d0c363e68bc9ebe2bf3e03c26ef348d2
Author: zhangjiajin 
Date:   2015-07-07T07:30:10Z

Add new algorithm PrefixSpan and test file.

commit 575995f69dadad825d97f2248599eb62c1743fe7
Author: zhangjiajin 
Date:   2015-07-08T09:07:37Z

Modified the code according to the review comments.

commit 951fd424ff189f9bf5619a84f3f19e942f592396
Author: zhang jiajin 
Date:   2015-07-08T10:22:16Z

Delete Prefixspan.scala

Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala

commit a2eb14c7fb6abb70eaa046baf78da205c7a4ca7d
Author: zhang jiajin 
Date:   2015-07-08T10:23:31Z

Delete PrefixspanSuite.scala

Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete 
PrefixspanSuite.scala.

commit 89bc368f76c40ad0090a928cec49cd9d28ce666e
Author: zhangjiajin 
Date:   2015-07-08T10:50:38Z

Fixed a Scala style error.

commit 1dd33ad82499b9ad1b446b96f2f88519ffbe9a1b
Author: zhangjiajin 
Date:   2015-07-09T14:40:29Z

Modified the code according to the review comments.

commit 4c60fb36148206abd67fe51cea667ee3d63e490e
Author: zhangjiajin 
Date:   2015-07-09T15:01:45Z

Fix some Scala style errors.

commit ba5df346543e9aee119bd781b257860b65bbe7df
Author: zhangjiajin 
Date:   2015-07-09T15:10:25Z

Fix a Scala style error.

commit 574e56ccfb271d0ed86c3eba95d1a11a8688495d
Author: zhangjiajin 
Date:   2015-07-10T11:49:06Z

Add new object LocalPrefixSpan, and do some optimization.

commit ca9c4c8fa84202d8d533c51c277138461ba096a7
Author: zhangjiajin 
Date:   2015-07-11T02:40:24Z

Modified the code according to the review comments.

commit 22b0ef463beb0e0fe9cc696989245da79722a3a6
Author: zhangjiajin 
Date:   2015-07-14T02:21:04Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

commit 078d4101f56c68c6f191de57f9e542a80f2c89b5
Author: zhangjiajin 
Date:   2015-07-14T02:46:05Z

fix a scala style error.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
GitHub user zhangjiajin reopened a pull request:

https://github.com/apache/spark/pull/7383

[SPARK-8998][MLlib] Collect enough frequent prefixes before projection in 
PrefixSpan

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangjiajin/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7383.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7383


commit 91fd7e66d0c363e68bc9ebe2bf3e03c26ef348d2
Author: zhangjiajin 
Date:   2015-07-07T07:30:10Z

Add new algorithm PrefixSpan and test file.

commit 575995f69dadad825d97f2248599eb62c1743fe7
Author: zhangjiajin 
Date:   2015-07-08T09:07:37Z

Modified the code according to the review comments.

commit 951fd424ff189f9bf5619a84f3f19e942f592396
Author: zhang jiajin 
Date:   2015-07-08T10:22:16Z

Delete Prefixspan.scala

Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala

commit a2eb14c7fb6abb70eaa046baf78da205c7a4ca7d
Author: zhang jiajin 
Date:   2015-07-08T10:23:31Z

Delete PrefixspanSuite.scala

Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete 
PrefixspanSuite.scala.

commit 89bc368f76c40ad0090a928cec49cd9d28ce666e
Author: zhangjiajin 
Date:   2015-07-08T10:50:38Z

Fixed a Scala style error.

commit 1dd33ad82499b9ad1b446b96f2f88519ffbe9a1b
Author: zhangjiajin 
Date:   2015-07-09T14:40:29Z

Modified the code according to the review comments.

commit 4c60fb36148206abd67fe51cea667ee3d63e490e
Author: zhangjiajin 
Date:   2015-07-09T15:01:45Z

Fix some Scala style errors.

commit ba5df346543e9aee119bd781b257860b65bbe7df
Author: zhangjiajin 
Date:   2015-07-09T15:10:25Z

Fix a Scala style error.

commit 574e56ccfb271d0ed86c3eba95d1a11a8688495d
Author: zhangjiajin 
Date:   2015-07-10T11:49:06Z

Add new object LocalPrefixSpan, and do some optimization.

commit ca9c4c8fa84202d8d533c51c277138461ba096a7
Author: zhangjiajin 
Date:   2015-07-11T02:40:24Z

Modified the code according to the review comments.

commit 22b0ef463beb0e0fe9cc696989245da79722a3a6
Author: zhangjiajin 
Date:   2015-07-14T02:21:04Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

commit 078d4101f56c68c6f191de57f9e542a80f2c89b5
Author: zhangjiajin 
Date:   2015-07-14T02:46:05Z

fix a scala style error.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin closed the pull request at:

https://github.com/apache/spark/pull/7383


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin closed the pull request at:

https://github.com/apache/spark/pull/7383


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-14 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7383#issuecomment-121149967
  
@mengxr This PR includes the previous PR. Maybe the previous PR have not be 
properly closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8998][MLlib] Collect enough frequent pr...

2015-07-13 Thread zhangjiajin
GitHub user zhangjiajin opened a pull request:

https://github.com/apache/spark/pull/7383

[SPARK-8998][MLlib] Collect enough frequent prefixes before projection in 
PrefixSpan

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangjiajin/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7383.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7383


commit 91fd7e66d0c363e68bc9ebe2bf3e03c26ef348d2
Author: zhangjiajin 
Date:   2015-07-07T07:30:10Z

Add new algorithm PrefixSpan and test file.

commit 575995f69dadad825d97f2248599eb62c1743fe7
Author: zhangjiajin 
Date:   2015-07-08T09:07:37Z

Modified the code according to the review comments.

commit 951fd424ff189f9bf5619a84f3f19e942f592396
Author: zhang jiajin 
Date:   2015-07-08T10:22:16Z

Delete Prefixspan.scala

Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala

commit a2eb14c7fb6abb70eaa046baf78da205c7a4ca7d
Author: zhang jiajin 
Date:   2015-07-08T10:23:31Z

Delete PrefixspanSuite.scala

Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete 
PrefixspanSuite.scala.

commit 89bc368f76c40ad0090a928cec49cd9d28ce666e
Author: zhangjiajin 
Date:   2015-07-08T10:50:38Z

Fixed a Scala style error.

commit 1dd33ad82499b9ad1b446b96f2f88519ffbe9a1b
Author: zhangjiajin 
Date:   2015-07-09T14:40:29Z

Modified the code according to the review comments.

commit 4c60fb36148206abd67fe51cea667ee3d63e490e
Author: zhangjiajin 
Date:   2015-07-09T15:01:45Z

Fix some Scala style errors.

commit ba5df346543e9aee119bd781b257860b65bbe7df
Author: zhangjiajin 
Date:   2015-07-09T15:10:25Z

Fix a Scala style error.

commit 574e56ccfb271d0ed86c3eba95d1a11a8688495d
Author: zhangjiajin 
Date:   2015-07-10T11:49:06Z

Add new object LocalPrefixSpan, and do some optimization.

commit ca9c4c8fa84202d8d533c51c277138461ba096a7
Author: zhangjiajin 
Date:   2015-07-11T02:40:24Z

Modified the code according to the review comments.

commit 22b0ef463beb0e0fe9cc696989245da79722a3a6
Author: zhangjiajin 
Date:   2015-07-14T02:21:04Z

Add feature: Collect enough frequent prefixes before projection in 
PrefixSpan.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-12 Thread zhangjiajin
Github user zhangjiajin closed the pull request at:

https://github.com/apache/spark/pull/7258


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7258#issuecomment-120572678
  
OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34409302
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's support value.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  /**
+   * Get the absolute minimum support value (sequences count * minSupport).
+   * @param sequences input data set, contains a set of sequences,
+   * @return absolute minimum support value,
+   */
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = {
+if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong
+  }
+
+  /**
+   * Generates frequent items by filtering the input data using minimal 
support level.
+   * @param minCount the absolute minimum support
+   * @param sequences original sequences data
+   * @return array of frequent pattern ordered by their frequencies
+   */
+  private def getFreqItemAndCounts(
+  minCount: Long,
+  sequences: RDD[Array[Int]]): RDD[(Int, Long)] = {
+sequences.flatMap(_.distinct.map((_, 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34408696
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1,
+  "The minimum support value must be between 0 and 1, including 0 and 
1.")
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1,
+  "The maximum pattern length value must be greater than 0.")
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's count.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getMinCount(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val projectedDatabase = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(minCount, projectedDatabase)
+val lengthOnePatternsAndCountsRdd =
+  sequences.sparkContext.parallelize(
+lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
+allPatterns
+  }
+
+  /**
+   * Get the minimum count (sequences count * minSupport).
+   * @param sequences input data set, contains a set of sequences,
+   * @return minimum count,
+   */
+  private def getMinCount(sequences: RDD[Array[Int]]): Long = {
+if (minSupport == 0) 0L else math.ceil(sequences.count() * 
minSupport).toLong
+  }
+
+  /**
+   * Generates frequent items by filtering the input data using minimal 
count level.
+   * @param minCount the absolute minimum count
+   * @param sequences original sequences data
 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34408693
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1,
+  "The minimum support value must be between 0 and 1, including 0 and 
1.")
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1,
+  "The maximum pattern length value must be greater than 0.")
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's count.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getMinCount(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7258#issuecomment-120564465
  
@feynmanliang comments: Delete makePrefixProjectedDatabases, move the 
groupByKey() to the last call in this method (no need to include the two map()s 
on L161 and L163 since they don't do anything)

Because the pair's key is Array(), groupByKey() don't work well, so, the 
Array must be converted to seq before groupByKey, and be converted back after 
groupByKey.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34408636
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1,
+  "The minimum support value must be between 0 and 1, including 0 and 
1.")
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1,
+  "The maximum pattern length value must be greater than 0.")
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's count.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getMinCount(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val projectedDatabase = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(minCount, projectedDatabase)
+val lengthOnePatternsAndCountsRdd =
+  sequences.sparkContext.parallelize(
+lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
+val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
+allPatterns
+  }
+
+  /**
+   * Get the minimum count (sequences count * minSupport).
+   * @param sequences input data set, contains a set of sequences,
+   * @return minimum count,
+   */
+  private def getMinCount(sequences: RDD[Array[Int]]): Long = {
+if (minSupport == 0) 0L else math.ceil(sequences.count() * 
minSupport).toLong
+  }
+
+  /**
+   * Generates frequent items by filtering the input data using minimal 
count level.
+   * @param minCount the absolute minimum count
+   * @param sequences original sequences data
 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34408260
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * Calculate all patterns of a projected database in local.
+ */
+@Experimental
+private[fpm] object LocalPrefixSpan extends Logging with Serializable {
+
+  /**
+   * Calculate all patterns of a projected database in local.
+   * @param minCount minimum count
+   * @param maxPatternLength maximum pattern length
+   * @param prefix prefix
+   * @param projectedDatabase the projected dabase
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's count.
+   */
+  def run(
+  minCount: Long,
+  maxPatternLength: Int,
+  prefix: Array[Int],
+  projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = {
+getPatternsWithPrefix(minCount, maxPatternLength, prefix, 
projectedDatabase)
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34408256
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * Calculate all patterns of a projected database in local.
+ */
+@Experimental
+private[fpm] object LocalPrefixSpan extends Logging with Serializable {
+
+  /**
+   * Calculate all patterns of a projected database in local.
+   * @param minCount minimum count
+   * @param maxPatternLength maximum pattern length
+   * @param prefix prefix
+   * @param projectedDatabase the projected dabase
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34344040
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's support value.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  /**
+   * Get the absolute minimum support value (sequences count * minSupport).
+   * @param sequences input data set, contains a set of sequences,
+   * @return absolute minimum support value,
+   */
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = {
+if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong
+  }
+
+  /**
+   * Generates frequent items by filtering the input data using minimal 
support level.
+   * @param minCount the absolute minimum support
+   * @param sequences original sequences data
+   * @return array of frequent pattern ordered by their frequencies
+   */
+  private def getFreqItemAndCounts(
+  minCount: Long,
+  sequences: RDD[Array[Int]]): RDD[(Int, Long)] = {
+sequences.flatMap(_.distinct.map((_, 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34343241
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's support value.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  /**
+   * Get the absolute minimum support value (sequences count * minSupport).
+   * @param sequences input data set, contains a set of sequences,
+   * @return absolute minimum support value,
+   */
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = {
+if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong
+  }
+
+  /**
+   * Generates frequent items by filtering the input data using minimal 
support level.
+   * @param minCount the absolute minimum support
+   * @param sequences original sequences data
+   * @return array of frequent pattern ordered by their frequencies
+   */
+  private def getFreqItemAndCounts(
+  minCount: Long,
+  sequences: RDD[Array[Int]]): RDD[(Int, Long)] = {
+sequences.flatMap(_.distinct.map((_, 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34334197
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's support value.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  /**
+   * Get the absolute minimum support value (sequences count * minSupport).
+   * @param sequences input data set, contains a set of sequences,
+   * @return absolute minimum support value,
+   */
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = {
+if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong
+  }
+
+  /**
+   * Generates frequent items by filtering the input data using minimal 
support level.
+   * @param minCount the absolute minimum support
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at inf

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34333999
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's support value.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  /**
+   * Get the absolute minimum support value (sequences count * minSupport).
+   * @param sequences input data set, contains a set of sequences,
+   * @return absolute minimum support value,
+   */
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = {
+if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spar

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34333634
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's support value.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  /**
+   * Get the absolute minimum support value (sequences count * minSupport).
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34333569
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's support value.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(minCount, sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34333492
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences.
+   * @param sequences input data set, contains a set of sequences,
+   *  a sequence is an ordered list of elements.
+   * @return a set of sequential pattern pairs,
+   * the key of pair is pattern (a list of elements),
+   * the value of pair is the pattern's support value.
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+if (sequences.getStorageLevel == StorageLevel.NONE) {
+  logWarning("Input data is not cached.")
+}
+val minCount = getAbsoluteMinSupport(sequences)
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34333437
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length (default: `10`).
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+require(maxPatternLength >= 1)
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-10 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34333429
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan private (
+private var minSupport: Double,
+private var maxPatternLength: Int) extends Logging with Serializable {
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: `10`}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+require(minSupport >= 0 && minSupport <= 1)
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on the pull request:

https://github.com/apache/spark/pull/7258#issuecomment-120057018
  
@mengxr I don't know why method 2 is projection before filtering. I think 
the method two is exactly what you want. The only need to add functionality to 
the current code is step 4 (Do we have enough candidates to distribute the 
work? If no, go to 1 and generate candidates with length + 1.)


![image](https://cloud.githubusercontent.com/assets/13159256/8600512/e4478ea4-2698-11e5-9631-80d26807c03f.png)


![image](https://cloud.githubusercontent.com/assets/13159256/8599886/6fbbca30-2695-11e5-91c4-98272003d303.png)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34242017
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan(
+private var minSupport: Double,
+private var maxPatternLength: Int) extends java.io.Serializable {
+
+private var absMinSupport: Int = 0
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: 10}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length.
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Calculate sequential patterns:
+   * a) find and collect length-one patterns
+   * b) for each length-one patterns and each sequence,
+   *emit (pattern (prefix), suffix sequence) as key-value pairs
+   * c) group by key and then map value iterator to array
+   * d) local PrefixSpan on each prefix
+   * @return sequential patterns
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = {
+absMinSupport = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = {
+val result = if (minSupport <= 0) {
+  0
+} else {
+  val count = sequences.count()
+  val support = if (minSupport <= 1) minSupport else 1
+  (support * count).toInt
+}
+result
+  }
+
+  /**
+   * Find the patterns that it's length is one
+   * @param sequences original sequences data
+   * @return length-one patterns and projection table
+   */
+  private def findLengthOnePatterns(
+  sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], 
Array[Int])]) = {
+val LengthOnePatternAndCounts = sequences
+  .flatMap(_.distinct.map((_, 1)))
+  .reduceByKey(_ + _)
+val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts
+  .filter(_._2 < absMinSupport)
+  .map(_._1)
+  .collect()
+val frequentLengthOnePatterns = LengthOnePatternAndCounts
+  .filter(_._2 >= absMinSupport)
+val frequentLengthOnePatternsArray = frequentLengthOnePatterns
+  .map(_._1)
+  .collect()
+val filteredSequences =
+ 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34241848
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.rdd.RDD
+
+class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext {
+
+  test("Prefixspan sequences mining using Integer type") {
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34237940
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.rdd.RDD
+
+class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext {
+
+  test("Prefixspan sequences mining using Integer type") {
+val sequences = Array(
+  Array(3, 1, 3, 4, 5),
+  Array(2, 3, 1),
+  Array(3, 4, 4, 3),
+  Array(1, 3, 4, 5),
+  Array(2, 4, 1),
+  Array(6, 5, 3))
+
+val rdd = sc.parallelize(sequences, 2).cache()
+
+def formatResultString(data: RDD[(Seq[Int], Int)]): String = {
+  data.map(x => x._1.mkString(",") + ": " + x._2)
+.collect()
+.sortWith(_<_)
+.mkString("; ")
+}
+
+val prefixspan = new PrefixSpan()
+  .setMinSupport(0.34)
+  .setMaxPatternLength(50)
+val result1 = prefixspan.run(rdd)
+val len1 = result1.count().toInt
+val actualValue1 = formatResultString(result1)
+val expectedValue1 =
+  "1,3,4,5: 2; 1,3,4: 2; 1,3,5: 2; 1,3: 2; 1,4,5: 2;" +
--- End diff --

Fixed, instead of Array[(Array[Int], Long)]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34237882
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.rdd.RDD
+
+class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext {
+
+  test("Prefixspan sequences mining using Integer type") {
+val sequences = Array(
+  Array(3, 1, 3, 4, 5),
+  Array(2, 3, 1),
+  Array(3, 4, 4, 3),
+  Array(1, 3, 4, 5),
+  Array(2, 4, 1),
+  Array(6, 5, 3))
+
+val rdd = sc.parallelize(sequences, 2).cache()
+
+def formatResultString(data: RDD[(Seq[Int], Int)]): String = {
+  data.map(x => x._1.mkString(",") + ": " + x._2)
+.collect()
+.sortWith(_<_)
--- End diff --

Fixed, removed this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34237868
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.rdd.RDD
+
+class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext {
+
+  test("Prefixspan sequences mining using Integer type") {
+val sequences = Array(
+  Array(3, 1, 3, 4, 5),
+  Array(2, 3, 1),
+  Array(3, 4, 4, 3),
+  Array(1, 3, 4, 5),
+  Array(2, 4, 1),
+  Array(6, 5, 3))
+
+val rdd = sc.parallelize(sequences, 2).cache()
+
+def formatResultString(data: RDD[(Seq[Int], Int)]): String = {
+  data.map(x => x._1.mkString(",") + ": " + x._2)
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34230466
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan(
+private var minSupport: Double,
+private var maxPatternLength: Int) extends java.io.Serializable {
+
+private var absMinSupport: Int = 0
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: 10}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length.
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Calculate sequential patterns:
+   * a) find and collect length-one patterns
+   * b) for each length-one patterns and each sequence,
+   *emit (pattern (prefix), suffix sequence) as key-value pairs
+   * c) group by key and then map value iterator to array
+   * d) local PrefixSpan on each prefix
+   * @return sequential patterns
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = {
+absMinSupport = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = {
+val result = if (minSupport <= 0) {
+  0
+} else {
+  val count = sequences.count()
+  val support = if (minSupport <= 1) minSupport else 1
+  (support * count).toInt
+}
+result
+  }
+
+  /**
+   * Find the patterns that it's length is one
+   * @param sequences original sequences data
+   * @return length-one patterns and projection table
+   */
+  private def findLengthOnePatterns(
+  sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], 
Array[Int])]) = {
+val LengthOnePatternAndCounts = sequences
+  .flatMap(_.distinct.map((_, 1)))
+  .reduceByKey(_ + _)
+val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts
+  .filter(_._2 < absMinSupport)
+  .map(_._1)
+  .collect()
+val frequentLengthOnePatterns = LengthOnePatternAndCounts
+  .filter(_._2 >= absMinSupport)
+val frequentLengthOnePatternsArray = frequentLengthOnePatterns
+  .map(_._1)
+  .collect()
+val filteredSequences =
+ 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34230381
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan(
+private var minSupport: Double,
+private var maxPatternLength: Int) extends java.io.Serializable {
+
+private var absMinSupport: Int = 0
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: 10}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length.
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Calculate sequential patterns:
+   * a) find and collect length-one patterns
+   * b) for each length-one patterns and each sequence,
+   *emit (pattern (prefix), suffix sequence) as key-value pairs
+   * c) group by key and then map value iterator to array
+   * d) local PrefixSpan on each prefix
+   * @return sequential patterns
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = {
+absMinSupport = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = {
+val result = if (minSupport <= 0) {
+  0
+} else {
+  val count = sequences.count()
+  val support = if (minSupport <= 1) minSupport else 1
+  (support * count).toInt
+}
+result
+  }
+
+  /**
+   * Find the patterns that it's length is one
+   * @param sequences original sequences data
+   * @return length-one patterns and projection table
+   */
+  private def findLengthOnePatterns(
+  sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], 
Array[Int])]) = {
+val LengthOnePatternAndCounts = sequences
+  .flatMap(_.distinct.map((_, 1)))
+  .reduceByKey(_ + _)
+val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts
+  .filter(_._2 < absMinSupport)
+  .map(_._1)
+  .collect()
+val frequentLengthOnePatterns = LengthOnePatternAndCounts
+  .filter(_._2 >= absMinSupport)
+val frequentLengthOnePatternsArray = frequentLengthOnePatterns
+  .map(_._1)
+  .collect()
+val filteredSequences =
+ 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34230317
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan(
+private var minSupport: Double,
+private var maxPatternLength: Int) extends java.io.Serializable {
+
+private var absMinSupport: Int = 0
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: 10}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length.
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Calculate sequential patterns:
+   * a) find and collect length-one patterns
+   * b) for each length-one patterns and each sequence,
+   *emit (pattern (prefix), suffix sequence) as key-value pairs
+   * c) group by key and then map value iterator to array
+   * d) local PrefixSpan on each prefix
+   * @return sequential patterns
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = {
+absMinSupport = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = {
+val result = if (minSupport <= 0) {
+  0
+} else {
+  val count = sequences.count()
+  val support = if (minSupport <= 1) minSupport else 1
+  (support * count).toInt
+}
+result
+  }
+
+  /**
+   * Find the patterns that it's length is one
+   * @param sequences original sequences data
+   * @return length-one patterns and projection table
+   */
+  private def findLengthOnePatterns(
+  sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], 
Array[Int])]) = {
+val LengthOnePatternAndCounts = sequences
+  .flatMap(_.distinct.map((_, 1)))
+  .reduceByKey(_ + _)
+val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts
+  .filter(_._2 < absMinSupport)
+  .map(_._1)
+  .collect()
+val frequentLengthOnePatterns = LengthOnePatternAndCounts
+  .filter(_._2 >= absMinSupport)
+val frequentLengthOnePatternsArray = frequentLengthOnePatterns
+  .map(_._1)
+  .collect()
+val filteredSequences =
+ 

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34230322
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan(
+private var minSupport: Double,
+private var maxPatternLength: Int) extends java.io.Serializable {
+
+private var absMinSupport: Int = 0
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: 10}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length.
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Calculate sequential patterns:
+   * a) find and collect length-one patterns
+   * b) for each length-one patterns and each sequence,
+   *emit (pattern (prefix), suffix sequence) as key-value pairs
+   * c) group by key and then map value iterator to array
+   * d) local PrefixSpan on each prefix
+   * @return sequential patterns
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = {
+absMinSupport = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = {
+val result = if (minSupport <= 0) {
+  0
+} else {
+  val count = sequences.count()
+  val support = if (minSupport <= 1) minSupport else 1
+  (support * count).toInt
+}
+result
+  }
+
+  /**
+   * Find the patterns that it's length is one
+   * @param sequences original sequences data
+   * @return length-one patterns and projection table
+   */
+  private def findLengthOnePatterns(
+  sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], 
Array[Int])]) = {
+val LengthOnePatternAndCounts = sequences
+  .flatMap(_.distinct.map((_, 1)))
+  .reduceByKey(_ + _)
+val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts
+  .filter(_._2 < absMinSupport)
+  .map(_._1)
+  .collect()
+val frequentLengthOnePatterns = LengthOnePatternAndCounts
--- End diff --

Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so,

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34228983
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan(
+private var minSupport: Double,
+private var maxPatternLength: Int) extends java.io.Serializable {
+
+private var absMinSupport: Int = 0
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: 10}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length.
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Calculate sequential patterns:
+   * a) find and collect length-one patterns
+   * b) for each length-one patterns and each sequence,
+   *emit (pattern (prefix), suffix sequence) as key-value pairs
+   * c) group by key and then map value iterator to array
+   * d) local PrefixSpan on each prefix
+   * @return sequential patterns
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = {
+absMinSupport = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = {
+val result = if (minSupport <= 0) {
+  0
+} else {
+  val count = sequences.count()
+  val support = if (minSupport <= 1) minSupport else 1
+  (support * count).toInt
+}
+result
+  }
+
+  /**
+   * Find the patterns that it's length is one
+   * @param sequences original sequences data
+   * @return length-one patterns and projection table
+   */
+  private def findLengthOnePatterns(
+  sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], 
Array[Int])]) = {
+val LengthOnePatternAndCounts = sequences
+  .flatMap(_.distinct.map((_, 1)))
+  .reduceByKey(_ + _)
+val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts
--- End diff --

Fixed, removed infrequent items.
If infrequent items is empty, we don't need filter the sequence, maybe 
performance is better.
When support is small, the infrequent items always empty.

  val filteredSequences =
  if (infrequentLengthOnePatterns.isEmpty) {
sequences
  } else {
sequ

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-09 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34228787
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan(
+private var minSupport: Double,
+private var maxPatternLength: Int) extends java.io.Serializable {
+
+private var absMinSupport: Int = 0
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: 10}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length.
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Calculate sequential patterns:
+   * a) find and collect length-one patterns
+   * b) for each length-one patterns and each sequence,
+   *emit (pattern (prefix), suffix sequence) as key-value pairs
+   * c) group by key and then map value iterator to array
+   * d) local PrefixSpan on each prefix
+   * @return sequential patterns
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = {
+absMinSupport = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = {
+val result = if (minSupport <= 0) {
+  0
+} else {
+  val count = sequences.count()
+  val support = if (minSupport <= 1) minSupport else 1
+  (support * count).toInt
+}
+result
+  }
+
+  /**
+   * Find the patterns that it's length is one
+   * @param sequences original sequences data
+   * @return length-one patterns and projection table
+   */
+  private def findLengthOnePatterns(
+  sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], 
Array[Int])]) = {
+val LengthOnePatternAndCounts = sequences
--- End diff --

Fixed, Add a new method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For addi

[GitHub] spark pull request: [SPARK-6487][MLlib] Add sequential pattern min...

2015-07-08 Thread zhangjiajin
Github user zhangjiajin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7258#discussion_r34221305
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+/**
+ *
+ * :: Experimental ::
+ *
+ * A parallel PrefixSpan algorithm to mine sequential pattern.
+ * The PrefixSpan algorithm is described in
+ * [[http://doi.org/10.1109/ICDE.2001.914830]].
+ *
+ * @param minSupport the minimal support level of the sequential pattern, 
any pattern appears
+ *   more than  (minSupport * size-of-the-dataset) times 
will be output
+ * @param maxPatternLength the maximal length of the sequential pattern, 
any pattern appears
+ *   less than maxPatternLength will be output
+ *
+ * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining 
Sequential Pattern Mining
+ *   (Wikipedia)]]
+ */
+@Experimental
+class PrefixSpan(
+private var minSupport: Double,
+private var maxPatternLength: Int) extends java.io.Serializable {
+
+private var absMinSupport: Int = 0
+
+  /**
+   * Constructs a default instance with default parameters
+   * {minSupport: `0.1`, maxPatternLength: 10}.
+   */
+  def this() = this(0.1, 10)
+
+  /**
+   * Sets the minimal support level (default: `0.1`).
+   */
+  def setMinSupport(minSupport: Double): this.type = {
+this.minSupport = minSupport
+this
+  }
+
+  /**
+   * Sets maximal pattern length.
+   */
+  def setMaxPatternLength(maxPatternLength: Int): this.type = {
+this.maxPatternLength = maxPatternLength
+this
+  }
+
+  /**
+   * Calculate sequential patterns:
+   * a) find and collect length-one patterns
+   * b) for each length-one patterns and each sequence,
+   *emit (pattern (prefix), suffix sequence) as key-value pairs
+   * c) group by key and then map value iterator to array
+   * d) local PrefixSpan on each prefix
+   * @return sequential patterns
+   */
+  def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = {
+absMinSupport = getAbsoluteMinSupport(sequences)
+val (lengthOnePatternsAndCounts, prefixAndCandidates) =
+  findLengthOnePatterns(sequences)
+val repartitionedRdd = 
makePrefixProjectedDatabases(prefixAndCandidates)
+val nextPatterns = getPatternsInLocal(repartitionedRdd)
+val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), 
x._2)) ++ nextPatterns
+allPatterns
+  }
+
+  private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = {
+val result = if (minSupport <= 0) {
+  0
+} else {
+  val count = sequences.count()
+  val support = if (minSupport <= 1) minSupport else 1
+  (support * count).toInt
+}
+result
+  }
+
+  /**
+   * Find the patterns that it's length is one
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >