http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/JoinTest.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/test/scala/org/apache/scrunch/JoinTest.scala b/scrunch/src/test/scala/org/apache/scrunch/JoinTest.scala new file mode 100644 index 0000000..5303c03 --- /dev/null +++ b/scrunch/src/test/scala/org/apache/scrunch/JoinTest.scala @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.scrunch + +import org.apache.crunch.io.{From => from, To => to} +import org.apache.crunch.test.FileHelper + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Test + +class JoinTest extends JUnitSuite { + val pipeline = Pipeline.mapReduce[CogroupTest] + + def wordCount(fileName: String) = { + pipeline.read(from.textFile(fileName)) + .flatMap(_.toLowerCase.split("\\W+")).count + } + + @Test def join { + val shakespeare = FileHelper.createTempCopyOf("shakes.txt") + val maugham = FileHelper.createTempCopyOf("maugham.txt") + val output = FileHelper.createOutputPath() + output.deleteOnExit() + val filtered = wordCount(shakespeare).join(wordCount(maugham)) + .map((k, v) => (k, v._1 - v._2)) + .write(to.textFile(output.getAbsolutePath())) + .filter((k, d) => d > 0).materialize + assert(filtered.exists(_ == ("macbeth", 66))) + pipeline.done + } +}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/PageRankClassTest.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/test/scala/org/apache/scrunch/PageRankClassTest.scala b/scrunch/src/test/scala/org/apache/scrunch/PageRankClassTest.scala new file mode 100644 index 0000000..9ab7897 --- /dev/null +++ b/scrunch/src/test/scala/org/apache/scrunch/PageRankClassTest.scala @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.scrunch + +import Avros._ + +import org.apache.crunch.{DoFn, Emitter, Pair => P} +import org.apache.crunch.io.{From => from} +import org.apache.crunch.test.FileHelper + +import scala.collection.mutable.HashMap + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Assert._ +import _root_.org.junit.Test + +case class PageRankData(pr: Float, oldpr: Float, urls: Array[String]) { + def this() = this(0f, 0f, null) + + def scaledPageRank = pr / urls.length + + def next(newPageRank: Float) = new PageRankData(newPageRank, pr, urls) + + def delta = math.abs(pr - oldpr) +} + +class CachingPageRankClassFn extends DoFn[P[String, PageRankData], P[String, Float]] { + val cache = new HashMap[String, Float] { + override def default(key: String) = 0f + } + + override def process(input: P[String, PageRankData], emitFn: Emitter[P[String, Float]]) { + val prd = input.second() + if (prd.urls.length > 0) { + val newpr = prd.pr / prd.urls.length + prd.urls.foreach(url => cache.put(url, cache(url) + newpr)) + if (cache.size > 5000) { + cleanup(emitFn) + } + } + } + + override def cleanup(emitFn: Emitter[P[String, Float]]) { + cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2))) + cache.clear + } +} + +class PageRankClassTest extends JUnitSuite { + val pipeline = Pipeline.mapReduce[PageRankTest] + + def initialInput(fileName: String) = { + pipeline.read(from.textFile(fileName)) + .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) }) + .groupByKey + .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray))) + } + + def update(prev: PTable[String, PageRankData], d: Float) = { + val outbound = prev.flatMap((url, prd) => { + prd.urls.map(link => (link, prd.scaledPageRank)) + }) + cg(prev, outbound, d) + } + + def cg(prev: PTable[String, PageRankData], + out: PTable[String, Float], d: Float) = { + prev.cogroup(out).map((url, v) => { + val (p, o) = v + val prd = p.head + (url, prd.next((1 - d) + d * o.sum)) + }) + } + + def fastUpdate(prev: PTable[String, PageRankData], d: Float) = { + val outbound = prev.parallelDo(new CachingPageRankClassFn(), tableOf(strings, floats)) + cg(prev, outbound, d) + } + + @Test def testPageRank { + pipeline.getConfiguration.set("crunch.debug", "true") + var prev = initialInput(FileHelper.createTempCopyOf("urls.txt")) + var delta = 1.0f + while (delta > 0.01f) { + prev = update(prev, 0.5f) + delta = prev.values.map(_.delta).max.materialize.head + } + assertEquals(0.0048, delta, 0.001) + pipeline.done + } + + def testFastPageRank { + pipeline.getConfiguration.set("crunch.debug", "true") + var prev = initialInput(FileHelper.createTempCopyOf("urls.txt")) + var delta = 1.0f + while (delta > 0.01f) { + prev = fastUpdate(prev, 0.5f) + delta = prev.values.map(_.delta).max.materialize.head + } + assertEquals(0.0048, delta, 0.001) + pipeline.done + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/PageRankTest.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/test/scala/org/apache/scrunch/PageRankTest.scala b/scrunch/src/test/scala/org/apache/scrunch/PageRankTest.scala new file mode 100644 index 0000000..cbf7ebf --- /dev/null +++ b/scrunch/src/test/scala/org/apache/scrunch/PageRankTest.scala @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.scrunch + +import Avros._ + +import org.apache.crunch.{DoFn, Emitter, Pair => P} +import org.apache.crunch.io.{From => from} +import org.apache.crunch.test.FileHelper + +import scala.collection.mutable.HashMap + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Assert._ +import _root_.org.junit.Test + +class CachingPageRankFn extends DoFn[P[String, (Float, Float, List[String])], P[String, Float]] { + val cache = new HashMap[String, Float] { + override def default(key: String) = 0f + } + + override def process(input: P[String, (Float, Float, List[String])], emitFn: Emitter[P[String, Float]]) { + val (pr, oldpr, urls) = input.second() + val newpr = pr / urls.size + urls.foreach(url => cache.put(url, cache(url) + newpr)) + if (cache.size > 5000) { + cleanup(emitFn) + } + } + + override def cleanup(emitFn: Emitter[P[String, Float]]) { + cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2))) + cache.clear + } +} + +class PageRankTest extends JUnitSuite { + val pipeline = Pipeline.mapReduce[PageRankTest] + + def initialInput(fileName: String) = { + pipeline.read(from.textFile(fileName)) + .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) }) + .groupByKey + .map((url, links) => (url, (1f, 0f, links.toList))) + } + + def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = { + val outbound = prev.flatMap((url, v) => { + val (pr, oldpr, links) = v + links.map(link => (link, pr / links.size)) + }) + cg(prev, outbound, d) + } + + def cg(prev: PTable[String, (Float, Float, List[String])], + out: PTable[String, Float], d: Float) = { + prev.cogroup(out).map((url, v) => { + val (p, o) = v + val (pr, oldpr, links) = p.head + (url, ((1 - d) + d * o.sum, pr, links)) + }) + } + + def fastUpdate(prev: PTable[String, (Float, Float, List[String])], d: Float) = { + val outbound = prev.parallelDo(new CachingPageRankFn(), tableOf(strings, floats)) + cg(prev, outbound, d) + } + + @Test def testPageRank { + var prev = initialInput(FileHelper.createTempCopyOf("urls.txt")) + var delta = 1.0f + while (delta > 0.01f) { + prev = update(prev, 0.5f) + delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head + } + assertEquals(0.0048, delta, 0.001) + pipeline.done + } + + @Test def testFastPageRank { + var prev = initialInput(FileHelper.createTempCopyOf("urls.txt")) + var delta = 1.0f + while (delta > 0.01f) { + prev = fastUpdate(prev, 0.5f) + delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head + } + assertEquals(0.0048, delta, 0.001) + pipeline.done + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/PipelineAppTest.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/test/scala/org/apache/scrunch/PipelineAppTest.scala b/scrunch/src/test/scala/org/apache/scrunch/PipelineAppTest.scala new file mode 100644 index 0000000..fe33aac --- /dev/null +++ b/scrunch/src/test/scala/org/apache/scrunch/PipelineAppTest.scala @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +import org.apache.crunch.test.FileHelper +import org.apache.scrunch.PipelineApp + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Test + +object WordCount extends PipelineApp { + + def wordSplit(line: String) = line.split("\\W+").filter(!_.isEmpty()) + + def countWords(filename: String) = { + val lines = read(from.textFile(filename)) + val words = lines.flatMap(wordSplit) + words.count + } + + val w1 = countWords(args(0)) + val w2 = countWords(args(1)) + cogroup(w1, w2).write(to.textFile(args(2))) +} + +class PipelineAppTest extends JUnitSuite { + @Test def run { + val args = new Array[String](3) + args(0) = FileHelper.createTempCopyOf("shakes.txt") + args(1) = FileHelper.createTempCopyOf("maugham.txt") + args(2) = FileHelper.createOutputPath.getAbsolutePath + WordCount.main(args) + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/TopTest.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/test/scala/org/apache/scrunch/TopTest.scala b/scrunch/src/test/scala/org/apache/scrunch/TopTest.scala new file mode 100644 index 0000000..bac27bd --- /dev/null +++ b/scrunch/src/test/scala/org/apache/scrunch/TopTest.scala @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.scrunch + +import org.apache.crunch.io.{From => from, To => to} +import org.apache.crunch.test.FileHelper + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Test + +class TopTest extends JUnitSuite { + + @Test def topInMem { + val ptable = Mem.tableOf(("foo", 17), ("bar", 29), ("baz", 1729)) + assert(ptable.top(1, true).materialize.head == ("baz", 1729)) + } + + @Test def top2 { + val pipeline = Pipeline.mapReduce[TopTest] + val input = FileHelper.createTempCopyOf("shakes.txt") + + val wc = pipeline.read(from.textFile(input)) + .flatMap(_.toLowerCase.split("\\s+")) + .filter(!_.isEmpty()).count + assert(wc.top(10, true).materialize.exists(_ == ("is", 205))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/UnionTest.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/test/scala/org/apache/scrunch/UnionTest.scala b/scrunch/src/test/scala/org/apache/scrunch/UnionTest.scala new file mode 100644 index 0000000..63fecdb --- /dev/null +++ b/scrunch/src/test/scala/org/apache/scrunch/UnionTest.scala @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.scrunch + +import org.apache.crunch.io.{From => from} +import org.apache.crunch.test.FileHelper + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Test + +class UnionTest extends JUnitSuite { + val pipeline = Pipeline.mapReduce[UnionTest] + val shakespeare = FileHelper.createTempCopyOf("shakes.txt") + val maugham = FileHelper.createTempCopyOf("maugham.txt") + + def wordCount(col: PCollection[String]) = { + col.flatMap(_.toLowerCase.split("\\W+")).count + } + + @Test def testUnionCollection { + val union = pipeline.read(from.textFile(shakespeare)).union( + pipeline.read(from.textFile(maugham))) + val wc = wordCount(union).materialize + assert(wc.exists(_ == ("you", 3691))) + pipeline.done + } + + @Test def testUnionTable { + val wcs = wordCount(pipeline.read(from.textFile(shakespeare))) + val wcm = wordCount(pipeline.read(from.textFile(maugham))) + val wc = wcs.union(wcm).groupByKey.combine(v => v.sum).materialize + assert(wc.exists(_ == ("you", 3691))) + pipeline.done + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/WordCountTest.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/test/scala/org/apache/scrunch/WordCountTest.scala b/scrunch/src/test/scala/org/apache/scrunch/WordCountTest.scala new file mode 100644 index 0000000..e97a1fd --- /dev/null +++ b/scrunch/src/test/scala/org/apache/scrunch/WordCountTest.scala @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.scrunch + +import org.apache.crunch.io.{From => from, To => to} +import org.apache.crunch.test.FileHelper + +import java.io.File + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Test + +class WordCountTest extends JUnitSuite { + @Test def wordCount { + val pipeline = Pipeline.mapReduce[WordCountTest] + val input = FileHelper.createTempCopyOf("shakes.txt") + val wordCountOut = FileHelper.createOutputPath + + val fcc = pipeline.read(from.textFile(input)) + .flatMap(_.toLowerCase.split("\\s+")) + .filter(!_.isEmpty()).count + .write(to.textFile(wordCountOut.getAbsolutePath)) // Word counts + .map((w, c) => (w.slice(0, 1), c)) + .groupByKey.combine(v => v.sum).materialize + assert(fcc.exists(_ == ("w", 1404))) + + pipeline.done + wordCountOut.delete() + } +}
