    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
    @@ -139,200 +202,308 @@ class PrefixSpan private (
    +object PrefixSpan extends Logging {
    -   * Find the complete set of sequential patterns in the input sequences. 
This method utilizes
    -   * the internal representation of itemsets as Array[Int] where each 
itemset is represented by
    -   * a contiguous sequence of non-negative integers and delimiters 
represented by [[DELIMITER]].
    -   * @param data ordered sequences of itemsets. Items are represented by 
non-negative integers.
    -   *             Each itemset has one or more items and is delimited by 
    -   * @return a set of sequential pattern pairs,
    -   *         the key of pair is pattern (a list of elements),
    -   *         the value of pair is the pattern's count.
    +   * Find the complete set of frequent sequential patterns in the input 
    +   * @param data ordered sequences of itemsets. We represent a sequence 
internally as Array[Int],
    +   *             where each itemset is represented by a contiguous 
sequence of distinct and ordered
    +   *             positive integers. We use 0 as the delimiter at itemset 
boundaries, including the
    +   *             first and the last position.
    +   * @return an RDD of (frequent sequential pattern, count) pairs,
    +   * @see [[Postfix]]
    -  private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
    +  private[fpm] def genFreqPatterns(
    +      data: RDD[Array[Int]],
    +      minCount: Long,
    +      maxPatternLength: Int,
    +      maxLocalProjDBSize: Long): RDD[(Array[Int], Long)] = {
         val sc = data.sparkContext
         if (data.getStorageLevel == StorageLevel.NONE) {
           logWarning("Input data is not cached.")
    -    // Use List[Set[Item]] for internal computation
    -    val sequences = { seq => splitSequence(seq.toList) }
    -    // Convert min support to a min number of transactions for this dataset
    -    val minCount = if (minSupport == 0) 0L else 
math.ceil(sequences.count() * minSupport).toLong
    -    // (Frequent items -> number of occurrences, all items here satisfy 
the `minSupport` threshold
    -    val freqItemCounts = sequences
    -      .flatMap(seq => seq.flatMap(nonemptySubsets(_)) => 
(item, 1L)))
    -      .reduceByKey(_ + _)
    -      .filter { case (item, count) => (count >= minCount) }
    -      .collect()
    -      .toMap
    -    // Pairs of (length 1 prefix, suffix consisting of frequent items)
    -    val itemSuffixPairs = {
    -      val freqItemSets = freqItemCounts.keys.toSet
    -      val freqItems = freqItemSets.flatten
    -      sequences.flatMap { seq =>
    -        val filteredSeq = => 
    -        freqItemSets.flatMap { item =>
    -          val candidateSuffix = LocalPrefixSpan.getSuffix(item, 
    -          candidateSuffix match {
    -            case suffix if !suffix.isEmpty => Some((List(item), suffix))
    -            case _ => None
    +    val postfixes = => new Postfix(items))
    +    // Local frequent patterns (prefixes) and their counts.
    +    val localFreqPatterns = mutable.ArrayBuffer.empty[(Array[Int], Long)]
    +    // Prefixes whose projected databases are small.
    +    val smallPrefixes = mutable.Map.empty[Int, Prefix]
    +    val emptyPrefix = Prefix.empty
    +    // Prefixes whose projected databases are large.
    +    var largePrefixes = mutable.Map( -> emptyPrefix)
    +    while (largePrefixes.nonEmpty) {
    +      val numLocalFreqPatterns = localFreqPatterns.length
    +      logInfo(s"number of local frequent patterns: $numLocalFreqPatterns")
    +      if (localFreqPatterns.length > 1000000) {
    +        logWarning(
    +          s"""
    +             | Collected $numLocalFreqPatterns local frequent patterns. 
You may want to consider:
    +             |   1. increase minSupport,
    +             |   2. decrease maxPatternLength,
    +             |   3. increase maxLocalProjDBSize.
    +           """.stripMargin)
    +      }
    +      logInfo(s"number of small prefixes: ${smallPrefixes.size}")
    +      logInfo(s"number of large prefixes: ${largePrefixes.size}")
    +      val largePrefixArray = largePrefixes.values.toArray
    +      val freqPrefixes = postfixes.flatMap { postfix =>
    +          largePrefixArray.flatMap { prefix =>
    +            postfix.project(prefix) { case (item, 
postfixSize) =>
    +              ((, item), (1L, postfixSize))
    +            }
    +          }
    +        }.reduceByKey { case ((c0, s0), (c1, s1)) =>
    +          (c0 + c1, s0 + s1)
    +        }.filter { case (_, (c, _)) => c >= minCount }
    +        .collect()
    +      val newLargePrefixes = mutable.Map.empty[Int, Prefix]
    +      freqPrefixes.foreach { case ((id, item), (count, projDBSize)) =>
    +        val newPrefix = largePrefixes(id) :+ item
    +        localFreqPatterns += ((newPrefix.items :+ 0, count))
    +        if (newPrefix.length < maxPatternLength) {
    +          if (projDBSize > maxLocalProjDBSize) {
    +            newLargePrefixes += -> newPrefix
    +          } else {
    +            smallPrefixes += -> newPrefix
    +      largePrefixes = newLargePrefixes
    -    // Accumulator for the computed results to be returned, initialized to 
the frequent items (i.e.
    -    // frequent length-one prefixes)
    -    var resultsAccumulator = { case (item, count) => 
(List(item), count) }.toList
    -    // Remaining work to be locally and distributively processed 
    -    var (pairsForLocal, pairsForDistributed) = 
    -    // Continue processing until no pairs for distributed processing 
remain (i.e. all prefixes have
    -    // projected database sizes <= `maxLocalProjDBSize`) or 
`maxPatternLength` is reached
    -    var patternLength = 1
    -    while (pairsForDistributed.count() != 0 && patternLength < 
maxPatternLength) {
    -      val (nextPatternAndCounts, nextPrefixSuffixPairs) =
    -        extendPrefixes(minCount, pairsForDistributed)
    -      pairsForDistributed.unpersist()
    -      val (smallerPairsPart, largerPairsPart) = 
    -      pairsForDistributed = largerPairsPart
    -      pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK)
    -      pairsForLocal ++= smallerPairsPart
    -      resultsAccumulator ++= nextPatternAndCounts.collect()
    -      patternLength += 1 // pattern length grows one per iteration
    +    // Switch to local processing.
    +    val bcSmallPrefixes = sc.broadcast(smallPrefixes)
    +    val distributedFreqPattern = postfixes.flatMap { postfix =>
    + { prefix =>
    +        (, postfix.project(prefix).compressed)
    +      }.filter(_._2.nonEmpty)
    +    }.groupByKey().flatMap { case (id, projPostfixes) =>
    +      val prefix = bcSmallPrefixes.value(id)
    +      val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength 
- prefix.length)
    +      // TODO: We collect projected postfixes into memory. We should also 
compare the performance
    +      // TODO: of keeping them on shuffle files.
    + { case (pattern, 
count) =>
    +        (prefix.items ++ pattern, count)
    +      }
    -    // Process the small projected databases locally
    -    val remainingResults = getPatternsInLocal(
    -      minCount, sc.parallelize(pairsForLocal, 1).groupByKey())
    -    (sc.parallelize(resultsAccumulator, 1) ++ remainingResults)
    -      .map { case (pattern, count) => 
(flattenSequence(pattern.reverse).toArray, count) }
    +    // Union local frequent patterns and distributed ones.
    +    val freqPatterns = (sc.parallelize(localFreqPatterns, 1) ++ 
    +      .persist(StorageLevel.MEMORY_AND_DISK)
    +    freqPatterns
    -   * Partitions the prefix-suffix pairs by projected database size.
    -   * @param prefixSuffixPairs prefix (length n) and suffix pairs,
    -   * @return prefix-suffix pairs partitioned by whether their projected 
database size is <= or
    -   *         greater than [[maxLocalProjDBSize]]
    +   * Represents a prefix.
    +   * @param items: items in this prefix, using the internal format
    +   * @param length: length of this prefix, not counting 0
    -  private def partitionByProjDBSize(prefixSuffixPairs: 
RDD[(List[Set[Int]], List[Set[Int]])])
    -    : (List[(List[Set[Int]], List[Set[Int]])], RDD[(List[Set[Int]], 
List[Set[Int]])]) = {
    -    val prefixToSuffixSize = prefixSuffixPairs
    -      .aggregateByKey(0)(
    -        seqOp = { case (count, suffix) => count + suffix.length },
    -        combOp = { _ + _ })
    -    val smallPrefixes = prefixToSuffixSize
    -      .filter(_._2 <= maxLocalProjDBSize)
    -      .keys
    -      .collect()
    -      .toSet
    -    val small = prefixSuffixPairs.filter { case (prefix, _) => 
smallPrefixes.contains(prefix) }
    -    val large = prefixSuffixPairs.filter { case (prefix, _) => 
!smallPrefixes.contains(prefix) }
    -    (small.collect().toList, large)
    +  private[fpm] class Prefix private (val items: Array[Int], val length: 
Int) extends Serializable {
    +    /** A unique id for this prefix. */
    +    val id: Int = Prefix.nextId
    +    /** Expands this prefix by the input item. */
    +    def :+(item: Int): Prefix = {
    +      require(item != 0)
    +      if (item < 0) {
    +        new Prefix(items :+ -item, length + 1)
    +      } else {
    +        new Prefix(items ++ Array(0, item), length + 1)
    +      }
    +    }
    -  /**
    -   * Extends all prefixes by one itemset from their suffix and computes 
the resulting frequent
    -   * prefixes and remaining work.
    -   * @param minCount minimum count
    -   * @param prefixSuffixPairs prefix (length N) and suffix pairs,
    -   * @return (frequent length N+1 extended prefix, count) pairs and 
(frequent length N+1 extended
    -   *         prefix, corresponding suffix) pairs.
    -   */
    -  private def extendPrefixes(
    -      minCount: Long,
    -      prefixSuffixPairs: RDD[(List[Set[Int]], List[Set[Int]])])
    -    : (RDD[(List[Set[Int]], Long)], RDD[(List[Set[Int]], List[Set[Int]])]) 
= {
    -    // (length N prefix, itemset from suffix) pairs and their 
corresponding number of occurrences
    -    // Every (prefix :+ suffix) is guaranteed to have support exceeding 
    -    val prefixItemPairAndCounts = prefixSuffixPairs
    -      .flatMap { case (prefix, suffix) =>
    -      suffix.flatMap(nonemptySubsets(_)) => ((prefix, y), 
1L)) }
    -      .reduceByKey(_ + _)
    -      .filter { case (item, count) => (count >= minCount) }
    -    // Map from prefix to set of possible next items from suffix
    -    val prefixToNextItems = prefixItemPairAndCounts
    -      .keys
    -      .groupByKey()
    -      .mapValues(_.toSet)
    -      .collect()
    -      .toMap
    -    // Frequent patterns with length N+1 and their corresponding counts
    -    val extendedPrefixAndCounts = prefixItemPairAndCounts
    -      .map { case ((prefix, item), count) => (item :: prefix, count) }
    -    // Remaining work, all prefixes will have length N+1
    -    val extendedPrefixAndSuffix = prefixSuffixPairs
    -      .filter(x => prefixToNextItems.contains(x._1))
    -      .flatMap { case (prefix, suffix) =>
    -        val frequentNextItemSets = prefixToNextItems(prefix)
    -        val frequentNextItems = frequentNextItemSets.flatten
    -        val filteredSuffix = suffix
    -          .map(item => frequentNextItems.intersect(item))
    -          .filter(_.nonEmpty)
    -        frequentNextItemSets.flatMap { item =>
    -          LocalPrefixSpan.getSuffix(item, filteredSuffix) match {
    -            case suffix if !suffix.isEmpty => Some(item :: prefix, suffix)
    -            case _ => None
    -          }
    -        }
    -      }
    +  private[fpm] object Prefix {
    +    /** Internal counter to generate unique IDs. */
    +    private val counter: AtomicInteger = new AtomicInteger(-1)
    +    /** Gets the next unique ID. */
    +    private def nextId: Int = counter.incrementAndGet()
    -    (extendedPrefixAndCounts, extendedPrefixAndSuffix)
    +    /** An empty [[Prefix]] instance. */
    +    val empty: Prefix = new Prefix(Array.empty, 0)
    -   * Calculate the patterns in local.
    -   * @param minCount the absolute minimum count
    -   * @param data prefixes and projected sequences data data
    -   * @return patterns
    +   * An internal representation of a postfix from some projection.
    +   * We use one int array to store the items, which might also contains 
other items from the
    +   * original sequence.
    +   * Items are represented by positive integers, and items in each itemset 
must be distinct and
    +   * ordered.
    +   * we use 0 as the delimiter between itemsets.
    +   * For example, a sequence `<(12)(31)1>` is represented by `[0, 1, 2, 0, 
1, 3, 0, 1, 0]`.
    +   * The postfix of this sequence w.r.t. to prefix `<1>` is `<(_2)(13)1>`.
    +   * We may reuse the original items array `[0, 1, 2, 0, 1, 3, 0, 1, 0]` 
to represent the postfix,
    +   * and mark the start index of the postfix, which is `2` in this example.
    +   * So the active items in this postfix are `[2, 0, 1, 3, 0, 1, 0]`.
    +   * We also remember the start indices of partial projections, the ones 
that split an itemset.
    +   * For example, another possible partial projection w.r.t. `<1>` is 
    +   * We remember the start indices of partial projections, which is `[2, 
5]` in this example.
    +   * This data structure makes it easier to do projections.
    +   *
    +   * @param items an int array containing this postfix with 0 as the 
    +   * @param partialStarts start indices of possible partial projections, 
strictly increasing
    -  private def getPatternsInLocal(
    -      minCount: Long,
    -      data: RDD[(List[Set[Int]], Iterable[List[Set[Int]]])]): 
RDD[(List[Set[Int]], Long)] = {
    -    data.flatMap {
    -      case (prefix, projDB) =>, 
maxPatternLength, prefix, projDB)
    +  private[fpm] class Postfix(
    +      val items: Array[Int],
    +      val start: Int = 0,
    +      val partialStarts: Array[Int] = Array.empty) extends Serializable {
    +    require(items.last == 0, "The last item in a postfix must be zero.")
    +    /**
    +     * Start index of the first full itemset contained in this postfix.
    +     */
    +    private[this] def fullStart: Int = {
    +      var i = start
    +      while (items(i) != 0) {
    +        i += 1
    +      }
    +      i
    -  }
    +    /**
    +     * Generates length-1 prefix items of this postfix with the 
corresponding postfix sizes.
    +     * There are two types of prefix items:
    +     *   a) the item can be assembled to the last itemset of the prefix, 
where we flip the sign in
    +     *      the output,
    +     *   b) the item can be appended to the prefix.
    +     * @return an iterator of (prefix item, corresponding postfix size)
    +     */
    +    def genPrefixItems: Iterator[(Int, Long)] = {
    +      val n1 = items.length - 1
    +      // For each unique item (subject to sign) in this sequence, we 
output exact one split.
    +      // TODO: use PrimitiveKeyOpenHashMap
    +      val prefixes = mutable.Map.empty[Int, Long]
    +      // a) items that can be assembled to the last itemset of the prefix
    +      partialStarts.foreach { start =>
    +        var i = start
    +        var x = -items(i)
    +        while (x != 0) {
    +          if (!prefixes.contains(x)) {
    +            prefixes(x) = n1 - i
    +          }
    +          i += 1
    +          x = -items(i)
    +        }
    +      }
    +      // b) items that can be appended to the prefix
    +      var i = fullStart
    +      while (i < n1) {
    +        val x = items(i)
    +        if (x != 0 && !prefixes.contains(x)) {
    +          prefixes(x) = n1 - i
    +        }
    +        i += 1
    +      }
    +      prefixes.toIterator
    +    }
    -object PrefixSpan {
    -  private[fpm] val DELIMITER = -1
    +    /** Tests whether this postfix is non-empty. */
    +    def nonEmpty: Boolean = items.length > start + 1
    -  /** Splits an array of itemsets delimited by [[DELIMITER]]. */
    -  private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = {
    -    sequence.span(_ != DELIMITER) match {
    -      case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail)
    -      case (x, xs) => List(x.toSet)
    +    /**
    +     * Projects this postfix with respect to the input prefix item.
    +     * @param prefix prefix item. If prefix is positive, we match items in 
any full itemset; if it
    +     *               is negative, we do partial projections.
    +     * @return the projected postfix
    +     */
    +    def project(prefix: Int): Postfix = {
    +      require(prefix != 0)
    +      val n1 = items.length - 1
    +      var matched = false
    +      var newStart = n1
    +      val newPartialStarts = mutable.ArrayBuilder.make[Int]
    +      if (prefix < 0) {
    +        // Search for partial projections.
    +        val target = -prefix
    +        partialStarts.foreach { start =>
    +          var i = start
    +          var x = items(i)
    +          while (x != target && x != 0) {
    +            i += 1
    +            x = items(i)
    +          }
    +          if (x == target) {
    +            i += 1
    +            if (!matched) {
