Hi Yanbo I am using 1.6.0. I am having a hard of time trying to figure out what the exact equation is. I do not know Scala.
I took a look a the source code URL you provide. I do not know Scala override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) val idf = udf { vec: Vector => idfModel.transform(vec) } dataset.withColumn($(outputCol), idf(col($(inputCol)))) } You mentioned the doc is out of date. http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf Based on my understanding of the subject matter the equations in the java doc are correct. I could not find anything like the equations in the source code? IDF(t,D)=log|D|+1DF(t,D)+1, TFIDF(t,d,D)=TF(t,d)・IDF(t,D). I found the spark unit test org.apache.spark.mllib.feature.JavaTfIdfSuite the results do not match equation. (In general the unit test asserts seem incomplete). I have created several small test example to try and figure out how to use NaiveBase, HashingTF, and IDF. The values of TFIDF, theta, probabilities , … The result produced by spark not match the published results at http://nlp.stanford.edu/IR-book/html/htmledition/naive-bayes-text-classifica tion-1.html Kind regards Andy private DataFrame createTrainingData() { // make sure we only use dictionarySize words JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList( // 0 is Chinese // 1 in notChinese RowFactory.create(0, 0.0, Arrays.asList("Chinese", "Beijing", "Chinese")), RowFactory.create(1, 0.0, Arrays.asList("Chinese", "Chinese", "Shanghai")), RowFactory.create(2, 0.0, Arrays.asList("Chinese", "Macao")), RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan", "Chinese")))); return createData(rdd); } private DataFrame createData(JavaRDD<Row> rdd) { StructField id = null; id = new StructField("id", DataTypes.IntegerType, false, Metadata.empty()); StructField label = null; label = new StructField("label", DataTypes.DoubleType, false, Metadata.empty()); StructField words = null; words = new StructField("words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty()); StructType schema = new StructType(new StructField[] { id, label, words }); DataFrame ret = sqlContext.createDataFrame(rdd, schema); return ret; } private DataFrame runPipleLineTF_IDF(DataFrame rawDF) { HashingTF hashingTF = new HashingTF() .setInputCol("words") .setOutputCol("tf") .setNumFeatures(dictionarySize); DataFrame termFrequenceDF = hashingTF.transform(rawDF); termFrequenceDF.cache(); // idf needs to make 2 passes over data set //val idf = new IDF(minDocFreq = 2).fit(tf) IDFModel idf = new IDF() //.setMinDocFreq(1) // our vocabulary has 6 words we hash into 7 .setInputCol(hashingTF.getOutputCol()) .setOutputCol("idf") .fit(termFrequenceDF); DataFrame ret = idf.transform(termFrequenceDF); return ret; } |-- id: integer (nullable = false) |-- label: double (nullable = false) |-- words: array (nullable = false) | |-- element: string (containsNull = true) |-- tf: vector (nullable = true) |-- idf: vector (nullable = true) +---+-----+----------------------------+-------------------------+---------- ---------------------------------------------+ |id |label|words |tf |idf | +---+-----+----------------------------+-------------------------+---------- ---------------------------------------------+ |0 |0.0 |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0]) |(7,[1,2],[0.0,0.9162907318741551]) | |1 |0.0 |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0]) |(7,[1,4],[0.0,0.9162907318741551]) | |2 |0.0 |[Chinese, Macao] |(7,[1,6],[1.0,1.0]) |(7,[1,6],[0.0,0.9162907318741551]) | |3 |1.0 |[Tokyo, Japan, Chinese] |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.916290731874 1551])| +---+-----+----------------------------+-------------------------+---------- ---------------------------------------------+ Here is the spark test case @Test public void tfIdf() { // The tests are to check Java compatibility. HashingTF tf = new HashingTF(); @SuppressWarnings("unchecked") JavaRDD<List<String>> documents = sc.parallelize(Arrays.asList( Arrays.asList("this is a sentence".split(" ")), Arrays.asList("this is another sentence".split(" ")), Arrays.asList("this is still a sentence".split(" "))), 2); JavaRDD<Vector> termFreqs = tf.transform(documents); termFreqs.collect(); IDF idf = new IDF(); JavaRDD<Vector> tfIdfs = idf.fit(termFreqs).transform(termFreqs); List<Vector> localTfIdfs = tfIdfs.collect(); int indexOfThis = tf.indexOf("this"); System.err.println("AEDWIP: indexOfThis: " + indexOfThis); int indexOfSentence = tf.indexOf("sentence"); System.err.println("AEDWIP: indexOfSentence: " + indexOfSentence); int indexOfAnother = tf.indexOf("another"); System.err.println("AEDWIP: indexOfAnother: " + indexOfAnother); for (Vector v: localTfIdfs) { System.err.println("AEDWIP: V.toString() " + v.toString()); Assert.assertEquals(0.0, v.apply(indexOfThis), 1e-15); } } $ mvn test -DwildcardSuites=none -Dtest=org.apache.spark.mllib.feature.JavaTfIdfSuite AEDWIP: indexOfThis: 413342 AEDWIP: indexOfSentence: 251491 AEDWIP: indexOfAnother: 263939 AEDWIP: V.toString() (1048576,[97,3370,251491,413342],[0.28768207245178085,0.0,0.0,0.0]) AEDWIP: V.toString() (1048576,[3370,251491,263939,413342],[0.0,0.0,0.6931471805599453,0.0]) AEDWIP: V.toString() (1048576,[97,3370,251491,413342,713128],[0.28768207245178085,0.0,0.0,0.0,0.6 931471805599453]) Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.908 sec - in org.apache.spark.mllib.feature.JavaTfIdfSuite From: Yanbo Liang <yblia...@gmail.com> Date: Sunday, January 17, 2016 at 12:34 AM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: has any one implemented TF_IDF using ML transformers? > Hi Andy, > > Actually, the output of ML IDF model is the TF-IDF vector of each instance > rather than IDF vector. > So it's unnecessary to do member wise multiplication to calculate TF-IDF > value. You can refer the code at here: > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/sp > ark/ml/feature/IDF.scala#L121 > I found the document of IDF is not very clear, we need to update it. > > Thanks > Yanbo > > 2016-01-16 6:10 GMT+08:00 Andy Davidson <a...@santacruzintegration.com>: >> I wonder if I am missing something? TF-IDF is very popular. Spark ML has a >> lot of transformers how ever it TF_IDF is not supported directly. >> >> Spark provide a HashingTF and IDF transformer. The java doc >> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf >> >> Mentions you can implement TFIDF as follows >> >> TFIDF(t,d,D)=TF(t,d)・IDF(t,D). >> >> The problem I am running into is both HashingTF and IDF return a sparse >> vector. >> >> Ideally the spark code to implement TFIDF would be one line. >> >> DataFrame ret = tmp.withColumn("features", >> tmp.col("tf").multiply(tmp.col("idf"))); >> >> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to >> data type mismatch: '(tf * idf)' requires numeric type, not vector; >> >> I could implement my own UDF to do member wise multiplication how ever given >> how common TF-IDF is I wonder if this code already exists some where >> >> I found org.apache.spark.util.Vector.Multiplier. There is no documentation >> how ever give the argument is double, my guess is it just does scalar >> multiplication. >> >> I guess I could do something like >> >> Double[] v = mySparkVector.toArray(); >> Then use JBlas to do member wise multiplication >> >> I assume sparceVectors are not distributed so there would not be any >> additional communication cost >> >> >> If this code is truly missing. I would be happy to write it and donate it >> >> Andy >> >> >> From: Andrew Davidson <a...@santacruzintegration.com> >> Date: Wednesday, January 13, 2016 at 2:52 PM >> To: "user @spark" <user@spark.apache.org> >> Subject: trouble calculating TF-IDF data type mismatch: '(tf * idf)' >> requires numeric type, not vector; >> >>> Bellow is a little snippet of my Java Test Code. Any idea how I implement >>> member wise vector multiplication? >>> >>> Kind regards >>> >>> Andy >>> >>> transformed df printSchema() >>> >>> root >>> >>> |-- id: integer (nullable = false) >>> >>> |-- label: double (nullable = false) >>> >>> |-- words: array (nullable = false) >>> >>> | |-- element: string (containsNull = true) >>> >>> |-- tf: vector (nullable = true) >>> >>> |-- idf: vector (nullable = true) >>> >>> >>> >>> +---+-----+----------------------------+-------------------------+---------- >>> ---------------------------------------------+ >>> >>> |id |label|words |tf |idf >>> | >>> >>> +---+-----+----------------------------+-------------------------+---------- >>> ---------------------------------------------+ >>> >>> |0 |0.0 |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0]) >>> |(7,[1,2],[0.0,0.9162907318741551]) | >>> >>> |1 |0.0 |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0]) >>> |(7,[1,4],[0.0,0.9162907318741551]) | >>> >>> |2 |0.0 |[Chinese, Macao] |(7,[1,6],[1.0,1.0]) >>> |(7,[1,6],[0.0,0.9162907318741551]) | >>> >>> |3 |1.0 |[Tokyo, Japan, Chinese] >>> |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.916290731874 >>> 1551])| >>> >>> +---+-----+----------------------------+-------------------------+---------- >>> ---------------------------------------------+ >>> >>> >>> @Test >>> >>> public void test() { >>> >>> DataFrame rawTrainingDF = createTrainingData(); >>> >>> DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF); >>> >>> . . . >>> >>> } >>> >>> private DataFrame runPipleLineTF_IDF(DataFrame rawDF) { >>> >>> HashingTF hashingTF = new HashingTF() >>> >>> .setInputCol("words") >>> >>> .setOutputCol("tf") >>> >>> .setNumFeatures(dictionarySize); >>> >>> >>> >>> DataFrame termFrequenceDF = hashingTF.transform(rawDF); >>> >>> >>> >>> termFrequenceDF.cache(); // idf needs to make 2 passes over data set >>> >>> IDFModel idf = new IDF() >>> >>> //.setMinDocFreq(1) // our vocabulary has 6 words we >>> hash into 7 >>> >>> .setInputCol(hashingTF.getOutputCol()) >>> >>> .setOutputCol("idf") >>> >>> .fit(termFrequenceDF); >>> >>> >>> >>> DataFrame tmp = idf.transform(termFrequenceDF); >>> >>> >>> >>> DataFrame ret = tmp.withColumn("features", >>> tmp.col("tf").multiply(tmp.col("idf"))); >>> >>> logger.warn("\ntransformed df printSchema()"); >>> >>> ret.printSchema(); >>> >>> ret.show(false); >>> >>> >>> >>> return ret; >>> >>> } >>> >>> >>> >>> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to >>> data type mismatch: '(tf * idf)' requires numeric type, not vector; >>> >>> >>> >>> >>> >>> private DataFrame createTrainingData() { >>> >>> // make sure we only use dictionarySize words >>> >>> JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList( >>> >>> // 0 is Chinese >>> >>> // 1 in notChinese >>> >>> RowFactory.create(0, 0.0, Arrays.asList("Chinese", >>> "Beijing", "Chinese")), >>> >>> RowFactory.create(1, 0.0, Arrays.asList("Chinese", >>> "Chinese", "Shanghai")), >>> >>> RowFactory.create(2, 0.0, Arrays.asList("Chinese", >>> "Macao")), >>> >>> RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan", >>> "Chinese")))); >>> >>> >>> >>> return createData(rdd); >>> >>> } >>> >>> >>> >>> private DataFrame createTestData() { >>> >>> JavaRDD<Row> rdd = javaSparkContext.parallelize(Arrays.asList( >>> >>> // 0 is Chinese >>> >>> // 1 in notChinese >>> >>> // "bernoulli" requires label to be IntegerType >>> >>> RowFactory.create(4, 1.0, Arrays.asList("Chinese", >>> "Chinese", "Chinese", "Tokyo", "Japan")))); >>> >>> return createData(rdd); >>> >>> } >