Github user feynmanliang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7937#discussion_r36239190
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
---
    @@ -139,200 +202,308 @@ class PrefixSpan private (
         run(data.rdd.map(_.asScala.map(_.asScala.toArray).toArray))
       }
     
    +}
    +
    +@Experimental
    +object PrefixSpan extends Logging {
    +
       /**
    -   * Find the complete set of sequential patterns in the input sequences. 
This method utilizes
    -   * the internal representation of itemsets as Array[Int] where each 
itemset is represented by
    -   * a contiguous sequence of non-negative integers and delimiters 
represented by [[DELIMITER]].
    -   * @param data ordered sequences of itemsets. Items are represented by 
non-negative integers.
    -   *             Each itemset has one or more items and is delimited by 
[[DELIMITER]].
    -   * @return a set of sequential pattern pairs,
    -   *         the key of pair is pattern (a list of elements),
    -   *         the value of pair is the pattern's count.
    +   * Find the complete set of frequent sequential patterns in the input 
sequences.
    +   * @param data ordered sequences of itemsets. We represent a sequence 
internally as Array[Int],
    +   *             where each itemset is represented by a contiguous 
sequence of distinct and ordered
    +   *             positive integers. We use 0 as the delimiter at itemset 
boundaries, including the
    +   *             first and the last position.
    +   * @return an RDD of (frequent sequential pattern, count) pairs,
    +   * @see [[Postfix]]
        */
    -  private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
    +  private[fpm] def genFreqPatterns(
    +      data: RDD[Array[Int]],
    +      minCount: Long,
    +      maxPatternLength: Int,
    +      maxLocalProjDBSize: Long): RDD[(Array[Int], Long)] = {
         val sc = data.sparkContext
     
         if (data.getStorageLevel == StorageLevel.NONE) {
           logWarning("Input data is not cached.")
         }
     
    -    // Use List[Set[Item]] for internal computation
    -    val sequences = data.map { seq => splitSequence(seq.toList) }
    -
    -    // Convert min support to a min number of transactions for this dataset
    -    val minCount = if (minSupport == 0) 0L else 
math.ceil(sequences.count() * minSupport).toLong
    -
    -    // (Frequent items -> number of occurrences, all items here satisfy 
the `minSupport` threshold
    -    val freqItemCounts = sequences
    -      .flatMap(seq => seq.flatMap(nonemptySubsets(_)).distinct.map(item => 
(item, 1L)))
    -      .reduceByKey(_ + _)
    -      .filter { case (item, count) => (count >= minCount) }
    -      .collect()
    -      .toMap
    -
    -    // Pairs of (length 1 prefix, suffix consisting of frequent items)
    -    val itemSuffixPairs = {
    -      val freqItemSets = freqItemCounts.keys.toSet
    -      val freqItems = freqItemSets.flatten
    -      sequences.flatMap { seq =>
    -        val filteredSeq = seq.map(item => 
freqItems.intersect(item)).filter(_.nonEmpty)
    -        freqItemSets.flatMap { item =>
    -          val candidateSuffix = LocalPrefixSpan.getSuffix(item, 
filteredSeq)
    -          candidateSuffix match {
    -            case suffix if !suffix.isEmpty => Some((List(item), suffix))
    -            case _ => None
    +    val postfixes = data.map(items => new Postfix(items))
    +
    +    // Local frequent patterns (prefixes) and their counts.
    +    val localFreqPatterns = mutable.ArrayBuffer.empty[(Array[Int], Long)]
    +    // Prefixes whose projected databases are small.
    +    val smallPrefixes = mutable.Map.empty[Int, Prefix]
    +    val emptyPrefix = Prefix.empty
    +    // Prefixes whose projected databases are large.
    +    var largePrefixes = mutable.Map(emptyPrefix.id -> emptyPrefix)
    +    while (largePrefixes.nonEmpty) {
    +      val numLocalFreqPatterns = localFreqPatterns.length
    +      logInfo(s"number of local frequent patterns: $numLocalFreqPatterns")
    +      if (localFreqPatterns.length > 1000000) {
    +        logWarning(
    +          s"""
    +             | Collected $numLocalFreqPatterns local frequent patterns. 
You may want to consider:
    +             |   1. increase minSupport,
    +             |   2. decrease maxPatternLength,
    +             |   3. increase maxLocalProjDBSize.
    +           """.stripMargin)
    +      }
    +      logInfo(s"number of small prefixes: ${smallPrefixes.size}")
    +      logInfo(s"number of large prefixes: ${largePrefixes.size}")
    +      val largePrefixArray = largePrefixes.values.toArray
    +      val freqPrefixes = postfixes.flatMap { postfix =>
    +          largePrefixArray.flatMap { prefix =>
    +            postfix.project(prefix).genPrefixItems.map { case (item, 
postfixSize) =>
    +              ((prefix.id, item), (1L, postfixSize))
    +            }
    +          }
    +        }.reduceByKey { case ((c0, s0), (c1, s1)) =>
    +          (c0 + c1, s0 + s1)
    +        }.filter { case (_, (c, _)) => c >= minCount }
    +        .collect()
    +      val newLargePrefixes = mutable.Map.empty[Int, Prefix]
    +      freqPrefixes.foreach { case ((id, item), (count, projDBSize)) =>
    +        val newPrefix = largePrefixes(id) :+ item
    +        localFreqPatterns += ((newPrefix.items :+ 0, count))
    +        if (newPrefix.length < maxPatternLength) {
    +          if (projDBSize > maxLocalProjDBSize) {
    +            newLargePrefixes += newPrefix.id -> newPrefix
    +          } else {
    +            smallPrefixes += newPrefix.id -> newPrefix
               }
             }
           }
    +      largePrefixes = newLargePrefixes
         }
     
    -    // Accumulator for the computed results to be returned, initialized to 
the frequent items (i.e.
    -    // frequent length-one prefixes)
    -    var resultsAccumulator = freqItemCounts.map { case (item, count) => 
(List(item), count) }.toList
    -
    -    // Remaining work to be locally and distributively processed 
respectfully
    -    var (pairsForLocal, pairsForDistributed) = 
partitionByProjDBSize(itemSuffixPairs)
    -
    -    // Continue processing until no pairs for distributed processing 
remain (i.e. all prefixes have
    -    // projected database sizes <= `maxLocalProjDBSize`) or 
`maxPatternLength` is reached
    -    var patternLength = 1
    -    while (pairsForDistributed.count() != 0 && patternLength < 
maxPatternLength) {
    -      val (nextPatternAndCounts, nextPrefixSuffixPairs) =
    -        extendPrefixes(minCount, pairsForDistributed)
    -      pairsForDistributed.unpersist()
    -      val (smallerPairsPart, largerPairsPart) = 
partitionByProjDBSize(nextPrefixSuffixPairs)
    -      pairsForDistributed = largerPairsPart
    -      pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK)
    -      pairsForLocal ++= smallerPairsPart
    -      resultsAccumulator ++= nextPatternAndCounts.collect()
    -      patternLength += 1 // pattern length grows one per iteration
    +    // Switch to local processing.
    +    val bcSmallPrefixes = sc.broadcast(smallPrefixes)
    +    val distributedFreqPattern = postfixes.flatMap { postfix =>
    +      bcSmallPrefixes.value.values.map { prefix =>
    +        (prefix.id, postfix.project(prefix).compressed)
    +      }.filter(_._2.nonEmpty)
    +    }.groupByKey().flatMap { case (id, projPostfixes) =>
    +      val prefix = bcSmallPrefixes.value(id)
    +      val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength 
- prefix.length)
    +      // TODO: We collect projected postfixes into memory. We should also 
compare the performance
    +      // TODO: of keeping them on shuffle files.
    +      localPrefixSpan.run(projPostfixes.toArray).map { case (pattern, 
count) =>
    +        (prefix.items ++ pattern, count)
    +      }
         }
     
    -    // Process the small projected databases locally
    -    val remainingResults = getPatternsInLocal(
    -      minCount, sc.parallelize(pairsForLocal, 1).groupByKey())
    -
    -    (sc.parallelize(resultsAccumulator, 1) ++ remainingResults)
    -      .map { case (pattern, count) => 
(flattenSequence(pattern.reverse).toArray, count) }
    +    // Union local frequent patterns and distributed ones.
    +    val freqPatterns = (sc.parallelize(localFreqPatterns, 1) ++ 
distributedFreqPattern)
    +      .persist(StorageLevel.MEMORY_AND_DISK)
    +    freqPatterns
       }
     
    -
       /**
    -   * Partitions the prefix-suffix pairs by projected database size.
    -   * @param prefixSuffixPairs prefix (length n) and suffix pairs,
    -   * @return prefix-suffix pairs partitioned by whether their projected 
database size is <= or
    -   *         greater than [[maxLocalProjDBSize]]
    +   * Represents a prefix.
    +   * @param items: items in this prefix, using the internal format
    +   * @param length: length of this prefix, not counting 0
        */
    -  private def partitionByProjDBSize(prefixSuffixPairs: 
RDD[(List[Set[Int]], List[Set[Int]])])
    -    : (List[(List[Set[Int]], List[Set[Int]])], RDD[(List[Set[Int]], 
List[Set[Int]])]) = {
    -    val prefixToSuffixSize = prefixSuffixPairs
    -      .aggregateByKey(0)(
    -        seqOp = { case (count, suffix) => count + suffix.length },
    -        combOp = { _ + _ })
    -    val smallPrefixes = prefixToSuffixSize
    -      .filter(_._2 <= maxLocalProjDBSize)
    -      .keys
    -      .collect()
    -      .toSet
    -    val small = prefixSuffixPairs.filter { case (prefix, _) => 
smallPrefixes.contains(prefix) }
    -    val large = prefixSuffixPairs.filter { case (prefix, _) => 
!smallPrefixes.contains(prefix) }
    -    (small.collect().toList, large)
    +  private[fpm] class Prefix private (val items: Array[Int], val length: 
Int) extends Serializable {
    +
    +    /** A unique id for this prefix. */
    +    val id: Int = Prefix.nextId
    +
    +    /** Expands this prefix by the input item. */
    +    def :+(item: Int): Prefix = {
    +      require(item != 0)
    +      if (item < 0) {
    +        new Prefix(items :+ -item, length + 1)
    +      } else {
    +        new Prefix(items ++ Array(0, item), length + 1)
    +      }
    +    }
       }
     
    -  /**
    -   * Extends all prefixes by one itemset from their suffix and computes 
the resulting frequent
    -   * prefixes and remaining work.
    -   * @param minCount minimum count
    -   * @param prefixSuffixPairs prefix (length N) and suffix pairs,
    -   * @return (frequent length N+1 extended prefix, count) pairs and 
(frequent length N+1 extended
    -   *         prefix, corresponding suffix) pairs.
    -   */
    -  private def extendPrefixes(
    -      minCount: Long,
    -      prefixSuffixPairs: RDD[(List[Set[Int]], List[Set[Int]])])
    -    : (RDD[(List[Set[Int]], Long)], RDD[(List[Set[Int]], List[Set[Int]])]) 
= {
    -
    -    // (length N prefix, itemset from suffix) pairs and their 
corresponding number of occurrences
    -    // Every (prefix :+ suffix) is guaranteed to have support exceeding 
`minSupport`
    -    val prefixItemPairAndCounts = prefixSuffixPairs
    -      .flatMap { case (prefix, suffix) =>
    -      suffix.flatMap(nonemptySubsets(_)).distinct.map(y => ((prefix, y), 
1L)) }
    -      .reduceByKey(_ + _)
    -      .filter { case (item, count) => (count >= minCount) }
    -
    -    // Map from prefix to set of possible next items from suffix
    -    val prefixToNextItems = prefixItemPairAndCounts
    -      .keys
    -      .groupByKey()
    -      .mapValues(_.toSet)
    -      .collect()
    -      .toMap
    -
    -    // Frequent patterns with length N+1 and their corresponding counts
    -    val extendedPrefixAndCounts = prefixItemPairAndCounts
    -      .map { case ((prefix, item), count) => (item :: prefix, count) }
    -
    -    // Remaining work, all prefixes will have length N+1
    -    val extendedPrefixAndSuffix = prefixSuffixPairs
    -      .filter(x => prefixToNextItems.contains(x._1))
    -      .flatMap { case (prefix, suffix) =>
    -        val frequentNextItemSets = prefixToNextItems(prefix)
    -        val frequentNextItems = frequentNextItemSets.flatten
    -        val filteredSuffix = suffix
    -          .map(item => frequentNextItems.intersect(item))
    -          .filter(_.nonEmpty)
    -        frequentNextItemSets.flatMap { item =>
    -          LocalPrefixSpan.getSuffix(item, filteredSuffix) match {
    -            case suffix if !suffix.isEmpty => Some(item :: prefix, suffix)
    -            case _ => None
    -          }
    -        }
    -      }
    +  private[fpm] object Prefix {
    +    /** Internal counter to generate unique IDs. */
    +    private val counter: AtomicInteger = new AtomicInteger(-1)
    +
    +    /** Gets the next unique ID. */
    +    private def nextId: Int = counter.incrementAndGet()
     
    -    (extendedPrefixAndCounts, extendedPrefixAndSuffix)
    +    /** An empty [[Prefix]] instance. */
    +    val empty: Prefix = new Prefix(Array.empty, 0)
       }
     
       /**
    -   * Calculate the patterns in local.
    -   * @param minCount the absolute minimum count
    -   * @param data prefixes and projected sequences data data
    -   * @return patterns
    +   * An internal representation of a postfix from some projection.
    +   * We use one int array to store the items, which might also contains 
other items from the
    +   * original sequence.
    +   * Items are represented by positive integers, and items in each itemset 
must be distinct and
    +   * ordered.
    +   * we use 0 as the delimiter between itemsets.
    +   * For example, a sequence `<(12)(31)1>` is represented by `[0, 1, 2, 0, 
1, 3, 0, 1, 0]`.
    +   * The postfix of this sequence w.r.t. to prefix `<1>` is `<(_2)(13)1>`.
    +   * We may reuse the original items array `[0, 1, 2, 0, 1, 3, 0, 1, 0]` 
to represent the postfix,
    +   * and mark the start index of the postfix, which is `2` in this example.
    +   * So the active items in this postfix are `[2, 0, 1, 3, 0, 1, 0]`.
    +   * We also remember the start indices of partial projections, the ones 
that split an itemset.
    +   * For example, another possible partial projection w.r.t. `<1>` is 
<(_3)1>`.
    +   * We remember the start indices of partial projections, which is `[2, 
5]` in this example.
    +   * This data structure makes it easier to do projections.
    +   *
    +   * @param items an int array containing this postfix with 0 as the 
delimiter
    +   * @param partialStarts start indices of possible partial projections, 
strictly increasing
        */
    -  private def getPatternsInLocal(
    -      minCount: Long,
    -      data: RDD[(List[Set[Int]], Iterable[List[Set[Int]]])]): 
RDD[(List[Set[Int]], Long)] = {
    -    data.flatMap {
    -      case (prefix, projDB) => LocalPrefixSpan.run(minCount, 
maxPatternLength, prefix, projDB)
    +  private[fpm] class Postfix(
    +      val items: Array[Int],
    +      val start: Int = 0,
    +      val partialStarts: Array[Int] = Array.empty) extends Serializable {
    +
    +    require(items.last == 0, "The last item in a postfix must be zero.")
    +
    +    /**
    +     * Start index of the first full itemset contained in this postfix.
    +     */
    +    private[this] def fullStart: Int = {
    +      var i = start
    +      while (items(i) != 0) {
    +        i += 1
    +      }
    +      i
         }
    -  }
     
    -}
    +    /**
    +     * Generates length-1 prefix items of this postfix with the 
corresponding postfix sizes.
    +     * There are two types of prefix items:
    +     *   a) the item can be assembled to the last itemset of the prefix, 
where we flip the sign in
    +     *      the output,
    +     *   b) the item can be appended to the prefix.
    +     * @return an iterator of (prefix item, corresponding postfix size)
    +     */
    +    def genPrefixItems: Iterator[(Int, Long)] = {
    +      val n1 = items.length - 1
    +      // For each unique item (subject to sign) in this sequence, we 
output exact one split.
    +      // TODO: use PrimitiveKeyOpenHashMap
    +      val prefixes = mutable.Map.empty[Int, Long]
    +      // a) items that can be assembled to the last itemset of the prefix
    +      partialStarts.foreach { start =>
    +        var i = start
    +        var x = -items(i)
    +        while (x != 0) {
    +          if (!prefixes.contains(x)) {
    +            prefixes(x) = n1 - i
    +          }
    +          i += 1
    +          x = -items(i)
    +        }
    +      }
    +      // b) items that can be appended to the prefix
    +      var i = fullStart
    +      while (i < n1) {
    +        val x = items(i)
    +        if (x != 0 && !prefixes.contains(x)) {
    +          prefixes(x) = n1 - i
    +        }
    +        i += 1
    +      }
    +      prefixes.toIterator
    +    }
     
    -object PrefixSpan {
    -  private[fpm] val DELIMITER = -1
    +    /** Tests whether this postfix is non-empty. */
    +    def nonEmpty: Boolean = items.length > start + 1
     
    -  /** Splits an array of itemsets delimited by [[DELIMITER]]. */
    -  private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = {
    -    sequence.span(_ != DELIMITER) match {
    -      case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail)
    -      case (x, xs) => List(x.toSet)
    +    /**
    +     * Projects this postfix with respect to the input prefix item.
    +     * @param prefix prefix item. If prefix is positive, we match items in 
any full itemset; if it
    +     *               is negative, we do partial projections.
    +     * @return the projected postfix
    +     */
    +    def project(prefix: Int): Postfix = {
    +      require(prefix != 0)
    +      val n1 = items.length - 1
    +      var matched = false
    +      var newStart = n1
    +      val newPartialStarts = mutable.ArrayBuilder.make[Int]
    +      if (prefix < 0) {
    +        // Search for partial projections.
    +        val target = -prefix
    +        partialStarts.foreach { start =>
    +          var i = start
    +          var x = items(i)
    +          while (x != target && x != 0) {
    +            i += 1
    +            x = items(i)
    +          }
    +          if (x == target) {
    +            i += 1
    +            if (!matched) {
    --- End diff --
    
    This conditional is a NOOP in this branch since `matched` will always `= 
false`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to