Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/7937#discussion_r36237987 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala --- @@ -139,200 +202,308 @@ class PrefixSpan private ( run(data.rdd.map(_.asScala.map(_.asScala.toArray).toArray)) } +} + +@Experimental +object PrefixSpan extends Logging { + /** - * Find the complete set of sequential patterns in the input sequences. This method utilizes - * the internal representation of itemsets as Array[Int] where each itemset is represented by - * a contiguous sequence of non-negative integers and delimiters represented by [[DELIMITER]]. - * @param data ordered sequences of itemsets. Items are represented by non-negative integers. - * Each itemset has one or more items and is delimited by [[DELIMITER]]. - * @return a set of sequential pattern pairs, - * the key of pair is pattern (a list of elements), - * the value of pair is the pattern's count. + * Find the complete set of frequent sequential patterns in the input sequences. + * @param data ordered sequences of itemsets. We represent a sequence internally as Array[Int], + * where each itemset is represented by a contiguous sequence of distinct and ordered + * positive integers. We use 0 as the delimiter at itemset boundaries, including the + * first and the last position. + * @return an RDD of (frequent sequential pattern, count) pairs, + * @see [[Postfix]] */ - private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { + private[fpm] def genFreqPatterns( + data: RDD[Array[Int]], + minCount: Long, + maxPatternLength: Int, + maxLocalProjDBSize: Long): RDD[(Array[Int], Long)] = { val sc = data.sparkContext if (data.getStorageLevel == StorageLevel.NONE) { logWarning("Input data is not cached.") } - // Use List[Set[Item]] for internal computation - val sequences = data.map { seq => splitSequence(seq.toList) } - - // Convert min support to a min number of transactions for this dataset - val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong - - // (Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold - val freqItemCounts = sequences - .flatMap(seq => seq.flatMap(nonemptySubsets(_)).distinct.map(item => (item, 1L))) - .reduceByKey(_ + _) - .filter { case (item, count) => (count >= minCount) } - .collect() - .toMap - - // Pairs of (length 1 prefix, suffix consisting of frequent items) - val itemSuffixPairs = { - val freqItemSets = freqItemCounts.keys.toSet - val freqItems = freqItemSets.flatten - sequences.flatMap { seq => - val filteredSeq = seq.map(item => freqItems.intersect(item)).filter(_.nonEmpty) - freqItemSets.flatMap { item => - val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq) - candidateSuffix match { - case suffix if !suffix.isEmpty => Some((List(item), suffix)) - case _ => None + val postfixes = data.map(items => new Postfix(items)) + + // Local frequent patterns (prefixes) and their counts. + val localFreqPatterns = mutable.ArrayBuffer.empty[(Array[Int], Long)] + // Prefixes whose projected databases are small. + val smallPrefixes = mutable.Map.empty[Int, Prefix] + val emptyPrefix = Prefix.empty + // Prefixes whose projected databases are large. + var largePrefixes = mutable.Map(emptyPrefix.id -> emptyPrefix) + while (largePrefixes.nonEmpty) { + val numLocalFreqPatterns = localFreqPatterns.length + logInfo(s"number of local frequent patterns: $numLocalFreqPatterns") + if (localFreqPatterns.length > 1000000) { + logWarning( + s""" + | Collected $numLocalFreqPatterns local frequent patterns. You may want to consider: + | 1. increase minSupport, + | 2. decrease maxPatternLength, + | 3. increase maxLocalProjDBSize. + """.stripMargin) + } + logInfo(s"number of small prefixes: ${smallPrefixes.size}") + logInfo(s"number of large prefixes: ${largePrefixes.size}") + val largePrefixArray = largePrefixes.values.toArray + val freqPrefixes = postfixes.flatMap { postfix => + largePrefixArray.flatMap { prefix => + postfix.project(prefix).genPrefixItems.map { case (item, postfixSize) => + ((prefix.id, item), (1L, postfixSize)) + } + } + }.reduceByKey { case ((c0, s0), (c1, s1)) => + (c0 + c1, s0 + s1) + }.filter { case (_, (c, _)) => c >= minCount } + .collect() + val newLargePrefixes = mutable.Map.empty[Int, Prefix] + freqPrefixes.foreach { case ((id, item), (count, projDBSize)) => + val newPrefix = largePrefixes(id) :+ item + localFreqPatterns += ((newPrefix.items :+ 0, count)) + if (newPrefix.length < maxPatternLength) { + if (projDBSize > maxLocalProjDBSize) { + newLargePrefixes += newPrefix.id -> newPrefix + } else { + smallPrefixes += newPrefix.id -> newPrefix } } } + largePrefixes = newLargePrefixes } - // Accumulator for the computed results to be returned, initialized to the frequent items (i.e. - // frequent length-one prefixes) - var resultsAccumulator = freqItemCounts.map { case (item, count) => (List(item), count) }.toList - - // Remaining work to be locally and distributively processed respectfully - var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs) - - // Continue processing until no pairs for distributed processing remain (i.e. all prefixes have - // projected database sizes <= `maxLocalProjDBSize`) or `maxPatternLength` is reached - var patternLength = 1 - while (pairsForDistributed.count() != 0 && patternLength < maxPatternLength) { - val (nextPatternAndCounts, nextPrefixSuffixPairs) = - extendPrefixes(minCount, pairsForDistributed) - pairsForDistributed.unpersist() - val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs) - pairsForDistributed = largerPairsPart - pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK) - pairsForLocal ++= smallerPairsPart - resultsAccumulator ++= nextPatternAndCounts.collect() - patternLength += 1 // pattern length grows one per iteration + // Switch to local processing. + val bcSmallPrefixes = sc.broadcast(smallPrefixes) + val distributedFreqPattern = postfixes.flatMap { postfix => + bcSmallPrefixes.value.values.map { prefix => + (prefix.id, postfix.project(prefix).compressed) + }.filter(_._2.nonEmpty) + }.groupByKey().flatMap { case (id, projPostfixes) => + val prefix = bcSmallPrefixes.value(id) + val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength - prefix.length) + // TODO: We collect projected postfixes into memory. We should also compare the performance + // TODO: of keeping them on shuffle files. + localPrefixSpan.run(projPostfixes.toArray).map { case (pattern, count) => + (prefix.items ++ pattern, count) + } } - // Process the small projected databases locally - val remainingResults = getPatternsInLocal( - minCount, sc.parallelize(pairsForLocal, 1).groupByKey()) - - (sc.parallelize(resultsAccumulator, 1) ++ remainingResults) - .map { case (pattern, count) => (flattenSequence(pattern.reverse).toArray, count) } + // Union local frequent patterns and distributed ones. + val freqPatterns = (sc.parallelize(localFreqPatterns, 1) ++ distributedFreqPattern) + .persist(StorageLevel.MEMORY_AND_DISK) + freqPatterns } - /** - * Partitions the prefix-suffix pairs by projected database size. - * @param prefixSuffixPairs prefix (length n) and suffix pairs, - * @return prefix-suffix pairs partitioned by whether their projected database size is <= or - * greater than [[maxLocalProjDBSize]] + * Represents a prefix. + * @param items: items in this prefix, using the internal format + * @param length: length of this prefix, not counting 0 */ - private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Set[Int]], List[Set[Int]])]) - : (List[(List[Set[Int]], List[Set[Int]])], RDD[(List[Set[Int]], List[Set[Int]])]) = { - val prefixToSuffixSize = prefixSuffixPairs - .aggregateByKey(0)( - seqOp = { case (count, suffix) => count + suffix.length }, - combOp = { _ + _ }) - val smallPrefixes = prefixToSuffixSize - .filter(_._2 <= maxLocalProjDBSize) - .keys - .collect() - .toSet - val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) } - val large = prefixSuffixPairs.filter { case (prefix, _) => !smallPrefixes.contains(prefix) } - (small.collect().toList, large) + private[fpm] class Prefix private (val items: Array[Int], val length: Int) extends Serializable { + + /** A unique id for this prefix. */ + val id: Int = Prefix.nextId + + /** Expands this prefix by the input item. */ + def :+(item: Int): Prefix = { + require(item != 0) + if (item < 0) { + new Prefix(items :+ -item, length + 1) + } else { + new Prefix(items ++ Array(0, item), length + 1) + } + } } - /** - * Extends all prefixes by one itemset from their suffix and computes the resulting frequent - * prefixes and remaining work. - * @param minCount minimum count - * @param prefixSuffixPairs prefix (length N) and suffix pairs, - * @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended - * prefix, corresponding suffix) pairs. - */ - private def extendPrefixes( - minCount: Long, - prefixSuffixPairs: RDD[(List[Set[Int]], List[Set[Int]])]) - : (RDD[(List[Set[Int]], Long)], RDD[(List[Set[Int]], List[Set[Int]])]) = { - - // (length N prefix, itemset from suffix) pairs and their corresponding number of occurrences - // Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport` - val prefixItemPairAndCounts = prefixSuffixPairs - .flatMap { case (prefix, suffix) => - suffix.flatMap(nonemptySubsets(_)).distinct.map(y => ((prefix, y), 1L)) } - .reduceByKey(_ + _) - .filter { case (item, count) => (count >= minCount) } - - // Map from prefix to set of possible next items from suffix - val prefixToNextItems = prefixItemPairAndCounts - .keys - .groupByKey() - .mapValues(_.toSet) - .collect() - .toMap - - // Frequent patterns with length N+1 and their corresponding counts - val extendedPrefixAndCounts = prefixItemPairAndCounts - .map { case ((prefix, item), count) => (item :: prefix, count) } - - // Remaining work, all prefixes will have length N+1 - val extendedPrefixAndSuffix = prefixSuffixPairs - .filter(x => prefixToNextItems.contains(x._1)) - .flatMap { case (prefix, suffix) => - val frequentNextItemSets = prefixToNextItems(prefix) - val frequentNextItems = frequentNextItemSets.flatten - val filteredSuffix = suffix - .map(item => frequentNextItems.intersect(item)) - .filter(_.nonEmpty) - frequentNextItemSets.flatMap { item => - LocalPrefixSpan.getSuffix(item, filteredSuffix) match { - case suffix if !suffix.isEmpty => Some(item :: prefix, suffix) - case _ => None - } - } - } + private[fpm] object Prefix { + /** Internal counter to generate unique IDs. */ + private val counter: AtomicInteger = new AtomicInteger(-1) + + /** Gets the next unique ID. */ + private def nextId: Int = counter.incrementAndGet() - (extendedPrefixAndCounts, extendedPrefixAndSuffix) + /** An empty [[Prefix]] instance. */ + val empty: Prefix = new Prefix(Array.empty, 0) } /** - * Calculate the patterns in local. - * @param minCount the absolute minimum count - * @param data prefixes and projected sequences data data - * @return patterns + * An internal representation of a postfix from some projection. + * We use one int array to store the items, which might also contains other items from the + * original sequence. + * Items are represented by positive integers, and items in each itemset must be distinct and + * ordered. + * we use 0 as the delimiter between itemsets. + * For example, a sequence `<(12)(31)1>` is represented by `[0, 1, 2, 0, 1, 3, 0, 1, 0]`. + * The postfix of this sequence w.r.t. to prefix `<1>` is `<(_2)(13)1>`. + * We may reuse the original items array `[0, 1, 2, 0, 1, 3, 0, 1, 0]` to represent the postfix, + * and mark the start index of the postfix, which is `2` in this example. + * So the active items in this postfix are `[2, 0, 1, 3, 0, 1, 0]`. + * We also remember the start indices of partial projections, the ones that split an itemset. + * For example, another possible partial projection w.r.t. `<1>` is <(_3)1>`. + * We remember the start indices of partial projections, which is `[2, 5]` in this example. + * This data structure makes it easier to do projections. + * + * @param items an int array containing this postfix with 0 as the delimiter + * @param partialStarts start indices of possible partial projections, strictly increasing */ - private def getPatternsInLocal( - minCount: Long, - data: RDD[(List[Set[Int]], Iterable[List[Set[Int]]])]): RDD[(List[Set[Int]], Long)] = { - data.flatMap { - case (prefix, projDB) => LocalPrefixSpan.run(minCount, maxPatternLength, prefix, projDB) + private[fpm] class Postfix( + val items: Array[Int], + val start: Int = 0, + val partialStarts: Array[Int] = Array.empty) extends Serializable { + + require(items.last == 0, "The last item in a postfix must be zero.") + + /** + * Start index of the first full itemset contained in this postfix. + */ + private[this] def fullStart: Int = { + var i = start + while (items(i) != 0) { + i += 1 + } + i } - } -} + /** + * Generates length-1 prefix items of this postfix with the corresponding postfix sizes. + * There are two types of prefix items: + * a) the item can be assembled to the last itemset of the prefix, where we flip the sign in + * the output, + * b) the item can be appended to the prefix. --- End diff -- "the item is not part of the first itemset in the postfix and can..."
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org