Repository: spark Updated Branches: refs/heads/master e5749a134 -> dc0865bc7
[SPARK-2511][MLLIB] add HashingTF and IDF This is roughly the TF-IDF implementation used in the Databricks Cloud Demo: http://databricks.com/cloud/ . Both `HashingTF` and `IDF` are implemented as transformers, similar to scikit-learn. Author: Xiangrui Meng <m...@databricks.com> Closes #1671 from mengxr/tfidf and squashes the following commits: 7d65888 [Xiangrui Meng] use JavaConverters._ 5fe9ec4 [Xiangrui Meng] fix unit test 6e214ec [Xiangrui Meng] add apache header cfd9aed [Xiangrui Meng] add Java-friendly methods move classes to mllib.feature 3814440 [Xiangrui Meng] add HashingTF and IDF Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc0865bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc0865bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc0865bc Branch: refs/heads/master Commit: dc0865bc7e119fe507061c27069c17523b87dfea Parents: e5749a1 Author: Xiangrui Meng <m...@databricks.com> Authored: Thu Jul 31 12:55:00 2014 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Thu Jul 31 12:55:00 2014 -0700 ---------------------------------------------------------------------- .../apache/spark/mllib/feature/HashingTF.scala | 79 ++++++++ .../org/apache/spark/mllib/feature/IDF.scala | 194 +++++++++++++++++++ .../spark/mllib/feature/JavaTfIdfSuite.java | 66 +++++++ .../spark/mllib/feature/HashingTFSuite.scala | 52 +++++ .../apache/spark/mllib/feature/IDFSuite.scala | 63 ++++++ 5 files changed, 454 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dc0865bc/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala new file mode 100644 index 0000000..0f6d580 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import java.lang.{Iterable => JavaIterable} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +/** + * :: Experimental :: + * Maps a sequence of terms to their term frequencies using the hashing trick. + * + * @param numFeatures number of features (default: 1000000) + */ +@Experimental +class HashingTF(val numFeatures: Int) extends Serializable { + + def this() = this(1000000) + + /** + * Returns the index of the input term. + */ + def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures) + + /** + * Transforms the input document into a sparse term frequency vector. + */ + def transform(document: Iterable[_]): Vector = { + val termFrequencies = mutable.HashMap.empty[Int, Double] + document.foreach { term => + val i = indexOf(term) + termFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) + 1.0) + } + Vectors.sparse(numFeatures, termFrequencies.toSeq) + } + + /** + * Transforms the input document into a sparse term frequency vector (Java version). + */ + def transform(document: JavaIterable[_]): Vector = { + transform(document.asScala) + } + + /** + * Transforms the input document to term frequency vectors. + */ + def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = { + dataset.map(this.transform) + } + + /** + * Transforms the input document to term frequency vectors (Java version). + */ + def transform[D <: JavaIterable[_]](dataset: JavaRDD[D]): JavaRDD[Vector] = { + dataset.rdd.map(this.transform).toJavaRDD() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/dc0865bc/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala new file mode 100644 index 0000000..7ed611a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import breeze.linalg.{DenseVector => BDV} + +import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.mllib.rdd.RDDFunctions._ +import org.apache.spark.rdd.RDD + +/** + * :: Experimental :: + * Inverse document frequency (IDF). + * The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`, where `m` is the total + * number of documents and `d(t)` is the number of documents that contain term `t`. + */ +@Experimental +class IDF { + + // TODO: Allow different IDF formulations. + + private var brzIdf: BDV[Double] = _ + + /** + * Computes the inverse document frequency. + * @param dataset an RDD of term frequency vectors + */ + def fit(dataset: RDD[Vector]): this.type = { + brzIdf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)( + seqOp = (df, v) => df.add(v), + combOp = (df1, df2) => df1.merge(df2) + ).idf() + this + } + + /** + * Computes the inverse document frequency. + * @param dataset a JavaRDD of term frequency vectors + */ + def fit(dataset: JavaRDD[Vector]): this.type = { + fit(dataset.rdd) + } + + /** + * Transforms term frequency (TF) vectors to TF-IDF vectors. + * @param dataset an RDD of term frequency vectors + * @return an RDD of TF-IDF vectors + */ + def transform(dataset: RDD[Vector]): RDD[Vector] = { + if (!initialized) { + throw new IllegalStateException("Haven't learned IDF yet. Call fit first.") + } + val theIdf = brzIdf + val bcIdf = dataset.context.broadcast(theIdf) + dataset.mapPartitions { iter => + val thisIdf = bcIdf.value + iter.map { v => + val n = v.size + v match { + case sv: SparseVector => + val nnz = sv.indices.size + val newValues = new Array[Double](nnz) + var k = 0 + while (k < nnz) { + newValues(k) = sv.values(k) * thisIdf(sv.indices(k)) + k += 1 + } + Vectors.sparse(n, sv.indices, newValues) + case dv: DenseVector => + val newValues = new Array[Double](n) + var j = 0 + while (j < n) { + newValues(j) = dv.values(j) * thisIdf(j) + j += 1 + } + Vectors.dense(newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + } + } + } + + /** + * Transforms term frequency (TF) vectors to TF-IDF vectors (Java version). + * @param dataset a JavaRDD of term frequency vectors + * @return a JavaRDD of TF-IDF vectors + */ + def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = { + transform(dataset.rdd).toJavaRDD() + } + + /** Returns the IDF vector. */ + def idf(): Vector = { + if (!initialized) { + throw new IllegalStateException("Haven't learned IDF yet. Call fit first.") + } + Vectors.fromBreeze(brzIdf) + } + + private def initialized: Boolean = brzIdf != null +} + +private object IDF { + + /** Document frequency aggregator. */ + class DocumentFrequencyAggregator extends Serializable { + + /** number of documents */ + private var m = 0L + /** document frequency vector */ + private var df: BDV[Long] = _ + + /** Adds a new document. */ + def add(doc: Vector): this.type = { + if (isEmpty) { + df = BDV.zeros(doc.size) + } + doc match { + case sv: SparseVector => + val nnz = sv.indices.size + var k = 0 + while (k < nnz) { + if (sv.values(k) > 0) { + df(sv.indices(k)) += 1L + } + k += 1 + } + case dv: DenseVector => + val n = dv.size + var j = 0 + while (j < n) { + if (dv.values(j) > 0.0) { + df(j) += 1L + } + j += 1 + } + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + m += 1L + this + } + + /** Merges another. */ + def merge(other: DocumentFrequencyAggregator): this.type = { + if (!other.isEmpty) { + m += other.m + if (df == null) { + df = other.df.copy + } else { + df += other.df + } + } + this + } + + private def isEmpty: Boolean = m == 0L + + /** Returns the current IDF vector. */ + def idf(): BDV[Double] = { + if (isEmpty) { + throw new IllegalStateException("Haven't seen any document yet.") + } + val n = df.length + val inv = BDV.zeros[Double](n) + var j = 0 + while (j < n) { + inv(j) = math.log((m + 1.0)/ (df(j) + 1.0)) + j += 1 + } + inv + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/dc0865bc/mllib/src/test/java/org/apache/spark/mllib/feature/JavaTfIdfSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/feature/JavaTfIdfSuite.java b/mllib/src/test/java/org/apache/spark/mllib/feature/JavaTfIdfSuite.java new file mode 100644 index 0000000..e8d99f4 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/feature/JavaTfIdfSuite.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import com.google.common.collect.Lists; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.linalg.Vector; + +public class JavaTfIdfSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaTfIdfSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + @Test + public void tfIdf() { + // The tests are to check Java compatibility. + HashingTF tf = new HashingTF(); + JavaRDD<ArrayList<String>> documents = sc.parallelize(Lists.newArrayList( + Lists.newArrayList("this is a sentence".split(" ")), + Lists.newArrayList("this is another sentence".split(" ")), + Lists.newArrayList("this is still a sentence".split(" "))), 2); + JavaRDD<Vector> termFreqs = tf.transform(documents); + termFreqs.collect(); + IDF idf = new IDF(); + JavaRDD<Vector> tfIdfs = idf.fit(termFreqs).transform(termFreqs); + List<Vector> localTfIdfs = tfIdfs.collect(); + int indexOfThis = tf.indexOf("this"); + for (Vector v: localTfIdfs) { + Assert.assertEquals(0.0, v.apply(indexOfThis), 1e-15); + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/dc0865bc/mllib/src/test/scala/org/apache/spark/mllib/feature/HashingTFSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/HashingTFSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/HashingTFSuite.scala new file mode 100644 index 0000000..a599e0d --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/HashingTFSuite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.LocalSparkContext + +class HashingTFSuite extends FunSuite with LocalSparkContext { + + test("hashing tf on a single doc") { + val hashingTF = new HashingTF(1000) + val doc = "a a b b c d".split(" ") + val n = hashingTF.numFeatures + val termFreqs = Seq( + (hashingTF.indexOf("a"), 2.0), + (hashingTF.indexOf("b"), 2.0), + (hashingTF.indexOf("c"), 1.0), + (hashingTF.indexOf("d"), 1.0)) + assert(termFreqs.map(_._1).forall(i => i >= 0 && i < n), + "index must be in range [0, #features)") + assert(termFreqs.map(_._1).toSet.size === 4, "expecting perfect hashing") + val expected = Vectors.sparse(n, termFreqs) + assert(hashingTF.transform(doc) === expected) + } + + test("hashing tf on an RDD") { + val hashingTF = new HashingTF + val localDocs: Seq[Seq[String]] = Seq( + "a a b b b c d".split(" "), + "a b c d a b c".split(" "), + "c b a c b a a".split(" ")) + val docs = sc.parallelize(localDocs, 2) + assert(hashingTF.transform(docs).collect().toSet === localDocs.map(hashingTF.transform).toSet) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/dc0865bc/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala new file mode 100644 index 0000000..78a2804 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors} +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.util.TestingUtils._ + +class IDFSuite extends FunSuite with LocalSparkContext { + + test("idf") { + val n = 4 + val localTermFrequencies = Seq( + Vectors.sparse(n, Array(1, 3), Array(1.0, 2.0)), + Vectors.dense(0.0, 1.0, 2.0, 3.0), + Vectors.sparse(n, Array(1), Array(1.0)) + ) + val m = localTermFrequencies.size + val termFrequencies = sc.parallelize(localTermFrequencies, 2) + val idf = new IDF + intercept[IllegalStateException] { + idf.idf() + } + intercept[IllegalStateException] { + idf.transform(termFrequencies) + } + idf.fit(termFrequencies) + val expected = Vectors.dense(Array(0, 3, 1, 2).map { x => + math.log((m.toDouble + 1.0) / (x + 1.0)) + }) + assert(idf.idf() ~== expected absTol 1e-12) + val tfidf = idf.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap() + assert(tfidf.size === 3) + val tfidf0 = tfidf(0L).asInstanceOf[SparseVector] + assert(tfidf0.indices === Array(1, 3)) + assert(Vectors.dense(tfidf0.values) ~== + Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12) + val tfidf1 = tfidf(1L).asInstanceOf[DenseVector] + assert(Vectors.dense(tfidf1.values) ~== + Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12) + val tfidf2 = tfidf(2L).asInstanceOf[SparseVector] + assert(tfidf2.indices === Array(1)) + assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12) + } +}