Github user feynmanliang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7937#discussion_r36243986
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
    @@ -139,200 +202,308 @@ class PrefixSpan private (
         run(data.rdd.map(_.asScala.map(_.asScala.toArray).toArray))
       }
     
    +}
    +
    +@Experimental
    +object PrefixSpan extends Logging {
    +
       /**
    -   * Find the complete set of sequential patterns in the input sequences. 
This method utilizes
    -   * the internal representation of itemsets as Array[Int] where each 
itemset is represented by
    -   * a contiguous sequence of non-negative integers and delimiters 
represented by [[DELIMITER]].
    -   * @param data ordered sequences of itemsets. Items are represented by 
non-negative integers.
    -   *             Each itemset has one or more items and is delimited by 
[[DELIMITER]].
    -   * @return a set of sequential pattern pairs,
    -   *         the key of pair is pattern (a list of elements),
    -   *         the value of pair is the pattern's count.
    +   * Find the complete set of frequent sequential patterns in the input 
sequences.
    +   * @param data ordered sequences of itemsets. We represent a sequence 
internally as Array[Int],
    +   *             where each itemset is represented by a contiguous 
sequence of distinct and ordered
    +   *             positive integers. We use 0 as the delimiter at itemset 
boundaries, including the
    +   *             first and the last position.
    +   * @return an RDD of (frequent sequential pattern, count) pairs,
    +   * @see [[Postfix]]
        */
    -  private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
    +  private[fpm] def genFreqPatterns(
    +      data: RDD[Array[Int]],
    +      minCount: Long,
    +      maxPatternLength: Int,
    +      maxLocalProjDBSize: Long): RDD[(Array[Int], Long)] = {
         val sc = data.sparkContext
     
         if (data.getStorageLevel == StorageLevel.NONE) {
           logWarning("Input data is not cached.")
         }
     
    -    // Use List[Set[Item]] for internal computation
    -    val sequences = data.map { seq => splitSequence(seq.toList) }
    -
    -    // Convert min support to a min number of transactions for this dataset
    -    val minCount = if (minSupport == 0) 0L else 
math.ceil(sequences.count() * minSupport).toLong
    -
    -    // (Frequent items -> number of occurrences, all items here satisfy 
the `minSupport` threshold
    -    val freqItemCounts = sequences
    -      .flatMap(seq => seq.flatMap(nonemptySubsets(_)).distinct.map(item => 
(item, 1L)))
    -      .reduceByKey(_ + _)
    -      .filter { case (item, count) => (count >= minCount) }
    -      .collect()
    -      .toMap
    -
    -    // Pairs of (length 1 prefix, suffix consisting of frequent items)
    -    val itemSuffixPairs = {
    -      val freqItemSets = freqItemCounts.keys.toSet
    -      val freqItems = freqItemSets.flatten
    -      sequences.flatMap { seq =>
    -        val filteredSeq = seq.map(item => 
freqItems.intersect(item)).filter(_.nonEmpty)
    -        freqItemSets.flatMap { item =>
    -          val candidateSuffix = LocalPrefixSpan.getSuffix(item, 
filteredSeq)
    -          candidateSuffix match {
    -            case suffix if !suffix.isEmpty => Some((List(item), suffix))
    -            case _ => None
    +    val postfixes = data.map(items => new Postfix(items))
    +
    +    // Local frequent patterns (prefixes) and their counts.
    +    val localFreqPatterns = mutable.ArrayBuffer.empty[(Array[Int], Long)]
    +    // Prefixes whose projected databases are small.
    +    val smallPrefixes = mutable.Map.empty[Int, Prefix]
    +    val emptyPrefix = Prefix.empty
    +    // Prefixes whose projected databases are large.
    +    var largePrefixes = mutable.Map(emptyPrefix.id -> emptyPrefix)
    +    while (largePrefixes.nonEmpty) {
    +      val numLocalFreqPatterns = localFreqPatterns.length
    +      logInfo(s"number of local frequent patterns: $numLocalFreqPatterns")
    +      if (localFreqPatterns.length > 1000000) {
    +        logWarning(
    +          s"""
    +             | Collected $numLocalFreqPatterns local frequent patterns. 
You may want to consider:
    +             |   1. increase minSupport,
    +             |   2. decrease maxPatternLength,
    +             |   3. increase maxLocalProjDBSize.
    +           """.stripMargin)
    +      }
    +      logInfo(s"number of small prefixes: ${smallPrefixes.size}")
    +      logInfo(s"number of large prefixes: ${largePrefixes.size}")
    +      val largePrefixArray = largePrefixes.values.toArray
    +      val freqPrefixes = postfixes.flatMap { postfix =>
    --- End diff --
    
    nit: for comprehension over 3-nested (flat)maps?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to