http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala deleted file mode 100644 index 7f506e5..0000000 --- a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.scrunch - -import java.util.{Collection => JCollect} - -import scala.collection.JavaConversions._ - -import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} -import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair} -import org.apache.crunch.lib.{Join, Cartesian, Aggregate, Cogroup, PTables} -import org.apache.scrunch.interpreter.InterpreterRunner - -class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] { - import PTable._ - - def filter(f: (K, V) => Boolean): PTable[K, V] = { - parallelDo(filterFn[K, V](f), native.getPTableType()) - } - - def map[T, To](f: (K, V) => T) - (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { - b(this, mapFn(f), pt.get(getTypeFamily())) - } - - def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = { - val ptf = getTypeFamily() - val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf)) - parallelDo(mapValuesFn[K, V, T](f), ptype) - } - - def mapKeys[T](f: K => T)(implicit pt: PTypeH[T]) = { - val ptf = getTypeFamily() - val ptype = ptf.tableOf(pt.get(ptf), native.getValueType()) - parallelDo(mapKeysFn[K, V, T](f), ptype) - } - - def flatMap[T, To](f: (K, V) => Traversable[T]) - (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { - b(this, flatMapFn(f), pt.get(getTypeFamily())) - } - - def union(others: PTable[K, V]*) = { - new PTable[K, V](native.union(others.map(_.native) : _*)) - } - - def keys() = new PCollection[K](PTables.keys(native)) - - def values() = new PCollection[V](PTables.values(native)) - - def cogroup[V2](other: PTable[K, V2]) = { - val jres = Cogroup.cogroup[K, V, V2](this.native, other.native) - val ptf = getTypeFamily() - val inter = new PTable[K, CPair[JCollect[V], JCollect[V2]]](jres) - inter.parallelDo(new SMapTableValuesFn[K, CPair[JCollect[V], JCollect[V2]], (Iterable[V], Iterable[V2])] { - def apply(x: CPair[JCollect[V], JCollect[V2]]) = { - (collectionAsScalaIterable[V](x.first()), collectionAsScalaIterable[V2](x.second())) - } - }, ptf.tableOf(keyType, ptf.tuple2(ptf.collections(valueType), ptf.collections(other.valueType)))) - } - - type JoinFn[V2] = (JTable[K, V], JTable[K, V2]) => JTable[K, CPair[V, V2]] - - protected def join[V2](joinFn: JoinFn[V2], other: PTable[K, V2]): PTable[K, (V, V2)] = { - val jres = joinFn(this.native, other.native) - val ptf = getTypeFamily() - val ptype = ptf.tableOf(keyType, ptf.tuple2(valueType, other.valueType)) - val inter = new PTable[K, CPair[V, V2]](jres) - inter.parallelDo(new SMapTableValuesFn[K, CPair[V, V2], (V, V2)] { - def apply(x: CPair[V, V2]) = (x.first(), x.second()) - }, ptype) - } - - def join[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - innerJoin(other) - } - - def innerJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - join[V2](Join.innerJoin[K, V, V2](_, _), other) - } - - def leftJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - join[V2](Join.leftJoin[K, V, V2](_, _), other) - } - - def rightJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - join[V2](Join.rightJoin[K, V, V2](_, _), other) - } - - def fullJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - join[V2](Join.fullJoin[K, V, V2](_, _), other) - } - - def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = { - val ptf = getTypeFamily() - val inter = new PTable(Cartesian.cross(this.native, other.native)) - val f = (k: CPair[K,K2], v: CPair[V,V2]) => CPair.of((k.first(), k.second()), (v.first(), v.second())) - inter.parallelDo(mapFn(f), ptf.tableOf(ptf.tuple2(keyType, other.keyType), ptf.tuple2(valueType, other.valueType))) - } - - def top(limit: Int, maximize: Boolean) = { - wrap(Aggregate.top(this.native, limit, maximize)) - } - - def groupByKey() = new PGroupedTable(native.groupByKey()) - - def groupByKey(partitions: Int) = new PGroupedTable(native.groupByKey(partitions)) - - def groupByKey(options: GroupingOptions) = new PGroupedTable(native.groupByKey(options)) - - def wrap(newNative: AnyRef) = { - new PTable[K, V](newNative.asInstanceOf[JTable[K, V]]) - } - - def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native - - def materialize(): Iterable[(K, V)] = { - InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) - native.materialize.view.map(x => (x.first, x.second)) - } - - def materializeToMap(): Map[K, V] = { - InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) - native.materializeToMap().view.toMap - } - - def keyType() = native.getPTableType().getKeyType() - - def valueType() = native.getPTableType().getValueType() -} - -trait SFilterTableFn[K, V] extends FilterFn[CPair[K, V]] with Function2[K, V, Boolean] { - override def accept(input: CPair[K, V]) = apply(input.first(), input.second()) -} - -trait SDoTableFn[K, V, T] extends DoFn[CPair[K, V], T] with Function2[K, V, Traversable[T]] { - override def process(input: CPair[K, V], emitter: Emitter[T]) { - for (v <- apply(input.first(), input.second())) { - emitter.emit(v) - } - } -} - -trait SMapTableFn[K, V, T] extends MapFn[CPair[K, V], T] with Function2[K, V, T] { - override def map(input: CPair[K, V]) = apply(input.first(), input.second()) -} - -trait SMapTableValuesFn[K, V, T] extends MapFn[CPair[K, V], CPair[K, T]] with Function1[V, T] { - override def map(input: CPair[K, V]) = CPair.of(input.first(), apply(input.second())) -} - -trait SMapTableKeysFn[K, V, T] extends MapFn[CPair[K, V], CPair[T, V]] with Function1[K, T] { - override def map(input: CPair[K, V]) = CPair.of(apply(input.first()), input.second()) -} - -object PTable { - def filterFn[K, V](fn: (K, V) => Boolean) = { - new SFilterTableFn[K, V] { def apply(k: K, v: V) = fn(k, v) } - } - - def mapValuesFn[K, V, T](fn: V => T) = { - new SMapTableValuesFn[K, V, T] { def apply(v: V) = fn(v) } - } - - def mapKeysFn[K, V, T](fn: K => T) = { - new SMapTableKeysFn[K, V, T] { def apply(k: K) = fn(k) } - } - - def mapFn[K, V, T](fn: (K, V) => T) = { - new SMapTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) } - } - - def flatMapFn[K, V, T](fn: (K, V) => Traversable[T]) = { - new SDoTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala b/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala deleted file mode 100644 index 1bd3db6..0000000 --- a/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.scrunch - -import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, MapFn} -import org.apache.crunch.types.{PType, PTypeFamily => PTF} -import org.apache.crunch.types.writable.WritableTypeFamily -import org.apache.crunch.types.avro.{AvroTypeFamily, Avros => CAvros} -import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JFloat, Boolean => JBoolean} -import java.util.{Collection => JCollection} -import scala.collection.JavaConversions._ - -class TMapFn[S, T](f: S => T) extends MapFn[S, T] { - override def map(input: S) = f(input) -} - -trait PTypeFamily { - - def ptf: PTF - - val strings = ptf.strings() - - val bytes = ptf.bytes() - - def records[T: ClassManifest] = ptf.records(classManifest[T].erasure) - - def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = { - ptf.derived(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt) - } - - val longs = { - val in = (x: JLong) => x.longValue() - val out = (x: Long) => new JLong(x) - derived(classOf[Long], in, out, ptf.longs()) - } - - val ints = { - val in = (x: JInt) => x.intValue() - val out = (x: Int) => new JInt(x) - derived(classOf[Int], in, out, ptf.ints()) - } - - val floats = { - val in = (x: JFloat) => x.floatValue() - val out = (x: Float) => new JFloat(x) - derived(classOf[Float], in, out, ptf.floats()) - } - - val doubles = { - val in = (x: JDouble) => x.doubleValue() - val out = (x: Double) => new JDouble(x) - derived(classOf[Double], in, out, ptf.doubles()) - } - - val booleans = { - val in = (x: JBoolean) => x.booleanValue() - val out = (x: Boolean) => new JBoolean(x) - derived(classOf[Boolean], in, out, ptf.booleans()) - } - - def collections[T](ptype: PType[T]) = { - derived(classOf[Iterable[T]], collectionAsScalaIterable[T], asJavaCollection[T], ptf.collections(ptype)) - } - - def maps[T](ptype: PType[T]) = { - derived(classOf[scala.collection.Map[String, T]], mapAsScalaMap[String, T], mapAsJavaMap[String, T], ptf.maps(ptype)) - } - - def lists[T](ptype: PType[T]) = { - val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toList - val out = (x: List[T]) => asJavaCollection[T](x) - derived(classOf[List[T]], in, out, ptf.collections(ptype)) - } - - def sets[T](ptype: PType[T]) = { - val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toSet - val out = (x: Set[T]) => asJavaCollection[T](x) - derived(classOf[Set[T]], in, out, ptf.collections(ptype)) - } - - def tuple2[T1, T2](p1: PType[T1], p2: PType[T2]) = { - val in = (x: CPair[T1, T2]) => (x.first(), x.second()) - val out = (x: (T1, T2)) => CPair.of(x._1, x._2) - derived(classOf[(T1, T2)], in, out, ptf.pairs(p1, p2)) - } - - def tuple3[T1, T2, T3](p1: PType[T1], p2: PType[T2], p3: PType[T3]) = { - val in = (x: CTuple3[T1, T2, T3]) => (x.first(), x.second(), x.third()) - val out = (x: (T1, T2, T3)) => CTuple3.of(x._1, x._2, x._3) - derived(classOf[(T1, T2, T3)], in, out, ptf.triples(p1, p2, p3)) - } - - def tuple4[T1, T2, T3, T4](p1: PType[T1], p2: PType[T2], p3: PType[T3], p4: PType[T4]) = { - val in = (x: CTuple4[T1, T2, T3, T4]) => (x.first(), x.second(), x.third(), x.fourth()) - val out = (x: (T1, T2, T3, T4)) => CTuple4.of(x._1, x._2, x._3, x._4) - derived(classOf[(T1, T2, T3, T4)], in, out, ptf.quads(p1, p2, p3, p4)) - } - - def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType) -} - -object Writables extends PTypeFamily { - override def ptf = WritableTypeFamily.getInstance() -} - -object Avros extends PTypeFamily { - override def ptf = AvroTypeFamily.getInstance() - - CAvros.REFLECT_DATA_FACTORY = new ScalaReflectDataFactory() - - def reflects[T: ClassManifest]() = CAvros.reflects(classManifest[T].erasure) -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala b/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala deleted file mode 100644 index fa13d3a..0000000 --- a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.scrunch - -import java.io.File - -import org.apache.hadoop.conf.Configuration -import org.slf4j.LoggerFactory - -import org.apache.crunch.{Pipeline => JPipeline} -import org.apache.crunch.impl.mem.MemPipeline -import org.apache.crunch.impl.mr.MRPipeline -import org.apache.crunch.util.DistCache -import org.apache.scrunch.interpreter.InterpreterRunner - -/** - * Manages the state of a pipeline execution. - * - * ==Overview== - * There are two subtypes of [[org.apache.crunch.Pipeline]]: - * [[org.apache.crunch.Pipeline#MapReduce]] - for jobs run on a Hadoop cluster. - * [[org.apache.crunch.Pipeline#InMemory]] - for jobs run in memory. - * - * To create a Hadoop pipeline: - * {{{ - * import org.apache.scrunch.Pipeline - * - * Pipeline.mapreduce[MyClass] - * }}} - * - * To get an in memory pipeline: - * {{{ - * import org.apache.scrunch.Pipeline - * - * Pipeline.inMemory - * }}} - */ -class Pipeline(val jpipeline: JPipeline) extends PipelineLike { - /** - * A convenience method for reading a text file. - * - * @param pathName Path to desired text file. - * @return A PCollection containing the lines in the specified file. - */ - def readTextFile(pathName: String): PCollection[String] = { - new PCollection[String](jpipeline.readTextFile(pathName)) - } - - /** - * A convenience method for writing a text file. - * - * @param pcollect A PCollection to write to text. - * @param pathName Path to desired output text file. - */ - def writeTextFile[T](pcollect: PCollection[T], pathName: String) { - jpipeline.writeTextFile(pcollect.native, pathName) - } -} - -/** - * Companion object. Contains subclasses of Pipeline. - */ -object Pipeline { - val log = LoggerFactory.getLogger(classOf[Pipeline]) - - /** - * Pipeline for running jobs on a hadoop cluster. - * - * @param clazz Type of the class using the pipeline. - * @param configuration Hadoop configuration to use. - */ - class MapReducePipeline (clazz: Class[_], configuration: Configuration) extends Pipeline( - { - // Attempt to add all jars in the Scrunch distribution lib directory to the job that will - // be run. - val jarPath = DistCache.findContainingJar(classOf[org.apache.scrunch.Pipeline]) - if (jarPath != null) { - val scrunchJarFile = new File(jarPath) - DistCache.addJarDirToDistributedCache(configuration, scrunchJarFile.getParent()) - } else { - log.warn("Could not locate Scrunch jar file, so could not add Scrunch jars to the " + - "job(s) about to be run.") - } - if (InterpreterRunner.repl == null) { - new MRPipeline(clazz, configuration) - } else { - // We're running in the REPL, so we'll use the crunch jar as the job jar. - new MRPipeline(classOf[org.apache.scrunch.Pipeline], configuration) - } - }) - - /** - * Pipeline for running jobs in memory. - */ - object InMemoryPipeline extends Pipeline(MemPipeline.getInstance()) - - /** - * Creates a pipeline for running jobs on a hadoop cluster using the default configuration. - * - * @param clazz Type of the class using the pipeline. - */ - def mapReduce(clazz: Class[_]): MapReducePipeline = mapReduce(clazz, new Configuration()) - - /** - * Creates a pipeline for running jobs on a hadoop cluster. - * - * @param clazz Type of the class using the pipeline. - * @param configuration Hadoop configuration to use. - */ - def mapReduce(clazz: Class[_], configuration: Configuration): MapReducePipeline = { - new MapReducePipeline(clazz, configuration) - } - - /** - * Creates a pipeline for running jobs on a hadoop cluster using the default configuration. - * - * @tparam T Type of the class using the pipeline. - */ - def mapReduce[T : ClassManifest]: MapReducePipeline = mapReduce[T](new Configuration()) - - /** - * Creates a pipeline for running jobs on a hadoop cluster. - * - * @param configuration Hadoop configuration to use. - * @tparam T Type of the class using the pipeline. - */ - def mapReduce[T : ClassManifest](configuration: Configuration): MapReducePipeline = { - new MapReducePipeline(implicitly[ClassManifest[T]].erasure, configuration) - } - - /** - * Gets a pipeline for running jobs in memory. - */ - def inMemory: InMemoryPipeline.type = InMemoryPipeline - - /** - * Creates a new Pipeline according to the provided specifications. - * - * @param configuration Configuration for connecting to a Hadoop cluster. - * @param memory Option specifying whether or not the pipeline is an in memory or mapreduce pipeline. - * @param manifest ClassManifest for the class using the pipeline. - * @tparam T type of the class using the pipeline. - * @deprecated Use either {{{Pipeline.mapReduce(class, conf)}}} or {{{Pipeline.inMemory}}} - */ - def apply[T]( - configuration: Configuration = new Configuration(), - memory: Boolean = false)(implicit manifest: ClassManifest[T] - ): Pipeline = if (memory) inMemory else mapReduce(manifest.erasure, configuration) -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala deleted file mode 100644 index b427f20..0000000 --- a/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.scrunch - -import java.io.Serializable - -import scala.collection.mutable.ListBuffer - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.util.GenericOptionsParser - -import org.apache.crunch.{Source, TableSource, Target} - -trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with DelayedInit { - implicit def _string2path(str: String) = new Path(str) - - /** Contains factory methods used to create `Source`s. */ - val from = From - - /** Contains factory methods used to create `Target`s. */ - val to = To - - /** Contains factory methods used to create `SourceTarget`s. */ - val at = At - - private val initCode = new ListBuffer[() => Unit] - - private var _args: Array[String] = _ - - /** Command-line arguments passed to this application. */ - protected def args: Array[String] = _args - - def configuration: Configuration = pipeline.getConfiguration - - /** Gets the distributed filesystem associated with this application's configuration. */ - def fs: FileSystem = FileSystem.get(configuration) - - override def delayedInit(body: => Unit) { - initCode += (() => body) - } - - def main(args: Array[String]) = { - val parser = new GenericOptionsParser(configuration, args) - _args = parser.getRemainingArgs() - for (proc <- initCode) proc() - done - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala deleted file mode 100644 index cdeb37b..0000000 --- a/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.scrunch - -/** - * This trait provides convenience methods for building pipelines. - */ -trait PipelineHelper { - /** - * Materializes the specified PCollection and displays its contents. - */ - def dump(data: PCollection[_]) { - data.materialize.foreach(println(_)) - } - - /** - * Materializes the specified PTable and displays its contents. - */ - def dump(data: PTable[_, _]) { - data.materialize.foreach(println(_)) - } - - /** - * Performs a cogroup on the two specified PTables. - */ - def cogroup[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2]) - : PTable[K, (Iterable[V1], Iterable[V2])] = { - t1.cogroup(t2) - } - - /** - * Performs an innerjoin on the two specified PTables. - */ - def join[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2]) - : PTable[K, (V1, V2)] = { - t1.join(t2) - } - - /** - * Unions the specified PCollections. - */ - def union[T](first: PCollection[T], others: PCollection[T]*) - : PCollection[T] = { - first.union(others: _*) - } - - /** - * Unions the specified PTables. - */ - def union[K, V](first: PTable[K, V], others: PTable[K, V]*) - : PTable[K, V] = { - first.union(others: _*) - } -} - -/** - * Companion object containing convenience methods for building pipelines. - */ -object PipelineHelper extends PipelineHelper http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala deleted file mode 100644 index 92dbaf3..0000000 --- a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.scrunch - -import org.apache.hadoop.conf.Configuration - -import org.apache.crunch.{Pipeline => JPipeline} -import org.apache.crunch.Source -import org.apache.crunch.TableSource -import org.apache.crunch.Target -import org.apache.scrunch.interpreter.InterpreterRunner - -trait PipelineLike { - def jpipeline: JPipeline - - /** - * Gets the configuration object associated with this pipeline. - */ - def getConfiguration(): Configuration = jpipeline.getConfiguration() - - /** - * Reads a source into a [[org.apache.scrunch.PCollection]] - * - * @param source The source to read from. - * @tparam T The type of the values being read. - * @return A PCollection containing data read from the specified source. - */ - def read[T](source: Source[T]): PCollection[T] = new PCollection(jpipeline.read(source)) - - /** - * Reads a source into a [[org.apache.scrunch.PTable]] - * - * @param source The source to read from. - * @tparam K The type of the keys being read. - * @tparam V The type of the values being read. - * @return A PCollection containing data read from the specified source. - */ - def read[K, V](source: TableSource[K, V]): PTable[K, V] = new PTable(jpipeline.read(source)) - - /** - * Writes a parallel collection to a target. - * - * @param collection The collection to write. - * @param target The destination target for this write. - */ - def write(collection: PCollection[_], target: Target): Unit = jpipeline.write(collection.native, target) - - /** - * Writes a parallel table to a target. - * - * @param table The table to write. - * @param target The destination target for this write. - */ - def write(table: PTable[_, _], target: Target): Unit = jpipeline.write(table.native, target) - - /** - * Constructs and executes a series of MapReduce jobs in order - * to write data to the output targets. - */ - def run(): Unit = { - InterpreterRunner.addReplJarsToJob(getConfiguration()) - jpipeline.run() - } - - /** - * Run any remaining jobs required to generate outputs and then - * clean up any intermediate data files that were created in - * this run or previous calls to `run`. - */ - def done(): Unit = jpipeline.done() - - /** - * Turn on debug logging for jobs that are run from this pipeline. - */ - def debug(): Unit = jpipeline.enableDebug() -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala b/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala deleted file mode 100644 index e37a0c7..0000000 --- a/scrunch/src/main/scala/org/apache/scrunch/interpreter/InterpreterRunner.scala +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.scrunch.interpreter - -import java.io.File -import java.io.FileOutputStream -import java.util.jar.JarEntry -import java.util.jar.JarOutputStream - -import scala.tools.nsc.GenericRunnerCommand -import scala.tools.nsc.Global -import scala.tools.nsc.MainGenericRunner -import scala.tools.nsc.ObjectRunner -import scala.tools.nsc.Properties -import scala.tools.nsc.ScriptRunner -import scala.tools.nsc.interpreter.ILoop -import scala.tools.nsc.io.Jar -import scala.tools.nsc.io.VirtualDirectory - -import com.google.common.io.Files -import org.apache.commons.io.IOUtils -import org.apache.hadoop.conf.Configuration - -import org.apache.crunch.util.DistCache - -/** - * An object used to run a Scala REPL with modifications to facilitate Scrunch jobs running - * within the REPL. - */ -object InterpreterRunner extends MainGenericRunner { - - // The actual Scala repl. - var repl: ILoop = null - - /** - * Checks whether or not the Scala repl has been started. - * - * @return <code>true</code> if the repl is running, <code>false</code> otherwise. - */ - def isReplRunning() = repl == null - - /** - * The main entry point for the REPL. This method is lifted from - * {@link scala.tools.nsc.MainGenericRunner} and modified to facilitate testing whether or not - * the REPL is actually running. - * - * @param args Arguments used on the command line to start the REPL. - * @return <code>true</code> if execution was successful, <code>false</code> otherwise. - */ - override def process(args: Array[String]): Boolean = { - val command = new GenericRunnerCommand(args.toList, (x: String) => errorFn(x)) - import command.{settings, howToRun, thingToRun} - // Defines a nested function to retrieve a sample compiler if necessary. - def sampleCompiler = new Global(settings) - - import Properties.{versionString, copyrightString} - if (!command.ok) { - return errorFn("\n" + command.shortUsageMsg) - } else if (settings.version.value) { - return errorFn("Scala code runner %s -- %s".format(versionString, copyrightString)) - } else if (command.shouldStopWithInfo) { - return errorFn(command getInfoMessage sampleCompiler) - } - - // Functions to retrieve settings values that were passed to REPL invocation. - // The -e argument provides a Scala statement to execute. - // The -i option requests that a file be preloaded into the interactive shell. - def isE = !settings.execute.isDefault - def dashe = settings.execute.value - def isI = !settings.loadfiles.isDefault - def dashi = settings.loadfiles.value - - // Function to retrieve code passed by -e and -i options to REPL. - def combinedCode = { - val files = if (isI) dashi map (file => scala.tools.nsc.io.File(file).slurp()) else Nil - val str = if (isE) List(dashe) else Nil - files ++ str mkString "\n\n" - } - - import GenericRunnerCommand._ - - // Function for running the target command. It can run an object with main, a script, or - // an interactive REPL. - def runTarget(): Either[Throwable, Boolean] = howToRun match { - case AsObject => - ObjectRunner.runAndCatch(settings.classpathURLs, thingToRun, command.arguments) - case AsScript => - ScriptRunner.runScriptAndCatch(settings, thingToRun, command.arguments) - case AsJar => - ObjectRunner.runAndCatch( - scala.tools.nsc.io.File(thingToRun).toURL +: settings.classpathURLs, - new Jar(thingToRun).mainClass getOrElse sys.error("Cannot find main class for jar: " + - thingToRun), - command.arguments - ) - case Error => - Right(false) - case _ => - // We start the shell when no arguments are given. - repl = new ILoop - Right(repl.process(settings)) - } - - /**If -e and -i were both given, we want to execute the -e code after the - * -i files have been included, so they are read into strings and prepended to - * the code given in -e. The -i option is documented to only make sense - * interactively so this is a pretty reasonable assumption. - * - * This all needs a rewrite though. - */ - if (isE) { - ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments) - } - else runTarget() match { - case Left(ex) => errorFn(ex) - case Right(b) => b - } - } - - def main(args: Array[String]) { - val retVal = process(args) - if (!retVal) - sys.exit(1) - } - - /** - * Creates a jar file containing the code thus far compiled by the REPL in a temporary directory. - * - * @return A file object representing the jar file created. - */ - def createReplCodeJar(): File = { - var jarStream: JarOutputStream = null - try { - val virtualDirectory = repl.virtualDirectory - val tempDir = Files.createTempDir() - val tempJar = new File(tempDir, "replJar.jar") - jarStream = new JarOutputStream(new FileOutputStream(tempJar)) - addVirtualDirectoryToJar(virtualDirectory, "", jarStream) - return tempJar - } finally { - IOUtils.closeQuietly(jarStream) - } - } - - /** - * Add the contents of the specified virtual directory to a jar. This method will recursively - * descend into subdirectories to add their contents. - * - * @param dir The virtual directory whose contents should be added. - * @param entryPath The entry path for classes found in the virtual directory. - * @param jarStream An output stream for writing the jar file. - */ - def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream: - JarOutputStream): Unit = { - dir.foreach { file => - if (file.isDirectory) { - // Recursively descend into subdirectories, adjusting the package name as we do. - val dirPath = entryPath + file.name + "/" - val entry: JarEntry = new JarEntry(dirPath) - jarStream.putNextEntry(entry) - jarStream.closeEntry() - addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory], - dirPath, jarStream) - } else if (file.hasExtension("class")) { - // Add class files as an entry in the jar file and write the class to the jar. - val entry: JarEntry = new JarEntry(entryPath + file.name) - jarStream.putNextEntry(entry) - jarStream.write(file.toByteArray) - jarStream.closeEntry() - } - } - } - - /** - * Generates a jar containing the code thus far compiled by the REPL, - * and adds that jar file to the distributed cache of jobs using the specified configuration. - * Also adds any jars added with the :cp command to the user's job. - * - * @param configuration The configuration of jobs that should use the REPL code jar. - */ - def addReplJarsToJob(configuration: Configuration): Unit = { - if (repl != null) { - // Generate a jar of REPL code and add to the distributed cache. - val replJarFile = createReplCodeJar() - DistCache.addJarToDistributedCache(configuration, replJarFile) - // Get the paths to jars added with the :cp command. - val addedJarPaths = repl.addedClasspath.split(':') - addedJarPaths.foreach { - path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/imports.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scripts/imports.scala b/scrunch/src/main/scripts/imports.scala deleted file mode 100644 index 64d7149..0000000 --- a/scrunch/src/main/scripts/imports.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import org.apache.scrunch._ - http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/scrunch ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scripts/scrunch b/scrunch/src/main/scripts/scrunch deleted file mode 100755 index 44cb6fb..0000000 --- a/scrunch/src/main/scripts/scrunch +++ /dev/null @@ -1,163 +0,0 @@ -#!/bin/bash --posix -# -############################################################################## -# Copyright 2002-2011, LAMP/EPFL -# -# This is free software; see the distribution for copying conditions. -# There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A -# PARTICULAR PURPOSE. -############################################################################## - -# Identify the bin dir in the distribution from which this script is running. -bin=`dirname $0` -bin=`cd ${bin} && pwd` - -# Set the directory where libraries for scrunch shell live. -SCRUNCH_LIB_DIR="${bin}/../lib" -# Set the conf directory for the scrunch distribution. -SCRUNCH_CONF_DIR="${bin}/../conf" -# Set the main class used to run scrunch shell. -MAIN_CLASS="org.apache.scrunch.interpreter.InterpreterRunner" - -# Not sure what the right default is here: trying nonzero. -scala_exit_status=127 -saved_stty="" - -# restore stty settings (echo in particular) -function restoreSttySettings() { - if [[ -n $SCALA_RUNNER_DEBUG ]]; then - echo "restoring stty: $saved_stty" - fi - - stty $saved_stty - saved_stty="" -} - -function onExit() { - if [[ "$saved_stty" != "" ]]; then - restoreSttySettings - exit $scala_exit_status - fi -} - -# to reenable echo if we are interrupted before completing. -trap onExit INT - -# save terminal settings -saved_stty=$(stty -g 2>/dev/null) -# clear on error so we don't later try to restore them -if [[ ! $? ]]; then - saved_stty="" -fi -if [[ -n $SCALA_RUNNER_DEBUG ]]; then - echo "saved stty: $saved_stty" -fi - -cygwin=false; -case "`uname`" in - CYGWIN*) cygwin=true ;; -esac - -# Constructing scrunch shell classpath. -SCRUNCH_SHELL_CLASSPATH="" -# Add files in conf dir. -for ext in "$SCRUNCH_CONF_DIR"/* ; do - if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then - SCRUNCH_SHELL_CLASSPATH="$ext" - else - SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext" - fi -done -# Add files in lib dir. -for ext in "$SCRUNCH_LIB_DIR"/*.jar ; do - if [ -z "$SCRUNCH_SHELL_CLASSPATH" ] ; then - SCRUNCH_SHELL_CLASSPATH="$ext" - else - SCRUNCH_SHELL_CLASSPATH="$SCRUNCH_SHELL_CLASSPATH:$ext" - fi -done - -# Constructing Hadoop classpath. -if [ -z "$HADOOP_HOME" ]; then - echo "HADOOP_HOME must be set to run the Scrunch shell." - exit 1 -fi -HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` - -CYGWIN_JLINE_TERMINAL= -if $cygwin; then - if [ "$OS" = "Windows_NT" ] && cygpath -m .>/dev/null 2>/dev/null ; then - format=mixed - else - format=windows - fi - SCRUNCH_SHELL_CLASSPATH=`cygpath --path --$format "$SCRUNCH_SHELL_CLASSPATH"` - case "$TERM" in - rxvt* | xterm*) - stty -icanon min 1 -echo - CYGWIN_JLINE_TERMINAL="-Djline.terminal=scala.tools.jline.UnixTerminal" - ;; - esac -fi - -[ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xmx256M -Xms32M" - -# break out -D and -J options and add them to JAVA_OPTS as well -# so they reach the underlying JVM in time to do some good. The -# -D options will be available as system properties. -declare -a java_args -declare -a scala_args - -# Don't use the bootstrap classloader. -CPSELECT="-classpath " - -while [ $# -gt 0 ]; do - case "$1" in - -D*) - # pass to scala as well: otherwise we lose it sometimes when we - # need it, e.g. communicating with a server compiler. - java_args=("${java_args[@]}" "$1") - scala_args=("${scala_args[@]}" "$1") - shift - ;; - -J*) - # as with -D, pass to scala even though it will almost - # never be used. - java_args=("${java_args[@]}" "${1:2}") - scala_args=("${scala_args[@]}" "$1") - shift - ;; - -toolcp) - TOOL_CLASSPATH="$TOOL_CLASSPATH:$2" - shift 2 - ;; - *) - scala_args=("${scala_args[@]}" "$1") - shift - ;; - esac -done - -# reset "$@" to the remaining args -set -- "${scala_args[@]}" - -if [ -z "$JAVACMD" -a -n "$JAVA_HOME" -a -x "$JAVA_HOME/bin/java" ]; then - JAVACMD="$JAVA_HOME/bin/java" -fi - -"${JAVACMD:=java}" \ - $JAVA_OPTS \ - "${java_args[@]}" \ - ${CPSELECT}${TOOL_CLASSPATH}":"${SCRUNCH_SHELL_CLASSPATH}":"${HADOOP_CLASSPATH} \ - -Dscala.usejavacp=true \ - -Denv.emacs="$EMACS" \ - $CYGWIN_JLINE_TERMINAL \ - $MAIN_CLASS "$@" \ - -i ${bin}/imports.scala \ - -Yrepl-sync -# The -Yrepl-sync option is a fix for the 2.9.1 REPL. This should probably not be necessary in the future. - -# record the exit status lest it be overwritten: -# then reenable echo and propagate the code. -scala_exit_status=$? -onExit http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/main/scripts/scrunch-job.py ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scripts/scrunch-job.py b/scrunch/src/main/scripts/scrunch-job.py deleted file mode 100755 index 2a61b3b..0000000 --- a/scrunch/src/main/scripts/scrunch-job.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/python -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import glob -import os -import re -import shutil -import subprocess -import sys - -# Configuration in script -############################################################## -if not "SCALA_HOME" in os.environ: - sys.stderr.write("Environment variable SCALA_HOME must be set\n") - sys.exit(1) -SCALA_LIB = os.path.join(os.environ["SCALA_HOME"], "lib") - -if not "HADOOP_HOME" in os.environ: - sys.stderr.write("Environment variable HADOOP_HOME must be set\n") - sys.exit(1) -HADOOP_HOME = os.environ["HADOOP_HOME"] -HADOOP_JARS = ":".join(glob.glob(os.path.join(HADOOP_HOME, "*.jar"))) - -#Get the absolute path of the original (non-symlink) file. -if os.path.islink(__file__): - ORIGINAL_FILE = os.readlink(__file__) -else: - ORIGINAL_FILE = __file__ - -DIST_ROOT = os.path.abspath(os.path.dirname(ORIGINAL_FILE)+"/../") -LIB_DIR = DIST_ROOT + "/lib" # Dir with all scrunch dependencies. -TMPDIR = "/tmp" -BUILDDIR = TMPDIR + "/script-build" -COMPILE_CMD = "java -cp %s/scala-library.jar:%s/scala-compiler.jar -Dscala.home=%s scala.tools.nsc.Main" % (SCALA_LIB, SCALA_LIB, SCALA_LIB) -############################################################## - -argv = sys.argv[1:] -if len(argv) < 1: - sys.stderr.write("ERROR: insufficient args.\n") - sys.exit(1) - -JOBFILE = argv.pop(0) - -def file_type(): - m = re.search(r'\.(scala|java)$', JOBFILE) - if m: - return m.group(1) - return None - -def is_file(): - return file_type() is not None - -PACK_RE = r'package ([^;]+)' -OBJECT_RE = r'object\s+([^\s(]+).*(extends|with)\s+PipelineApp.*' -EXTENSION_RE = r'(.*)\.(scala|java)$' - -#Get the name of the job from the file. -#the rule is: last class in the file, or the one that matches the filename -def get_job_name(file): - package = "" - job = None - default = None - match = re.search(EXTENSION_RE, file) - if match: - default = match.group(1) - for s in open(file, "r"): - mp = re.search(PACK_RE, s) - mo = re.search(OBJECT_RE, s) - if mp: - package = mp.group(1).trim() + "." - elif mo: - if not job or not default or not job.tolower() == default.tolower(): - #use either the last class, or the one with the same name as the file - job = mo.group(1) - if not job: - raise "Could not find job name" - return "%s%s" % (package, job) - else: - return file - -LIB_PATH = os.path.abspath(LIB_DIR) -if not os.path.exists(LIB_PATH): - sys.stderr.write("Scrunch distribution lib directory not found; run mvn package to construct a distribution to run examples from.\n") - sys.exit(1) -LIB_JARS = glob.glob(os.path.join(LIB_PATH, "*.jar")) -LIB_CP = ":".join(LIB_JARS) - -JOBPATH = os.path.abspath(JOBFILE) -JOB = get_job_name(JOBFILE) -JOBJAR = JOB + ".jar" -JOBJARPATH = os.path.join(TMPDIR, JOBJAR) - -def needs_rebuild(): - return not os.path.exists(JOBJARPATH) or os.stat(JOBJARPATH).st_mtime < os.stat(JOBPATH).st_mtime - -def build_job_jar(): - sys.stderr.write("compiling " + JOBFILE + "\n") - if os.path.exists(BUILDDIR): - shutil.rmtree(BUILDDIR) - os.makedirs(BUILDDIR) - cmd = "%s -classpath %s:%s -d %s %s" % (COMPILE_CMD, LIB_CP, HADOOP_JARS, BUILDDIR, JOBFILE) - print cmd - if subprocess.call(cmd, shell=True): - shutil.rmtree(BUILDDIR) - sys.exit(1) - - jar_cmd = "jar cf %s -C %s ." % (JOBJARPATH, BUILDDIR) - subprocess.call(jar_cmd, shell=True) - shutil.rmtree(BUILDDIR) - -def hadoop_command(): - return "HADOOP_CLASSPATH=%s ; %s/bin/hadoop jar %s %s %s" % (LIB_CP, HADOOP_HOME, JOBJARPATH, JOB, " ".join(argv)) - -if is_file() and needs_rebuild(): - build_job_jar() - -SHELL_COMMAND = hadoop_command() -print SHELL_COMMAND -os.system(SHELL_COMMAND) http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/scrunch/src/site/markdown/index.md b/scrunch/src/site/markdown/index.md deleted file mode 100644 index 32a9279..0000000 --- a/scrunch/src/site/markdown/index.md +++ /dev/null @@ -1,20 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -# Scrunch - A Scala Wrapper for Crunch ---- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/scrunch/src/site/site.xml ---------------------------------------------------------------------- diff --git a/scrunch/src/site/site.xml b/scrunch/src/site/site.xml deleted file mode 100644 index 73fbd17..0000000 --- a/scrunch/src/site/site.xml +++ /dev/null @@ -1,34 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project name="${project.name}" - xmlns="http://maven.apache.org/DECORATION/1.3.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/DECORATION/1.3.0 - http://maven.apache.org/xsd/decoration-1.3.0.xsd"> - - <body> - <!-- Note: Breadcrumbs for Doxia's Markdown parser are currently broken, - see https://jira.codehaus.org/browse/DOXIA-472 --> - <breadcrumbs> - <item name="Apache" href="http://www.apache.org/index.html" /> - <item name="Crunch" href="../index.html"/> - </breadcrumbs> - - </body> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dfd28922/src/site/markdown/scrunch.md ---------------------------------------------------------------------- diff --git a/src/site/markdown/scrunch.md b/src/site/markdown/scrunch.md index 7d0585a..324b88a 100644 --- a/src/site/markdown/scrunch.md +++ b/src/site/markdown/scrunch.md @@ -32,9 +32,9 @@ a mixture of functional and object-oriented programming styles and has powerful capabilities, allowing us to create complex pipelines using very few keystrokes. Here is the Scrunch analogue of the classic WordCount problem: - import com.cloudera.crunch.io.{From => from} - import com.cloudera.scrunch._ - import com.cloudera.scrunch.Conversions_ # For implicit type conversions + import org.apache.crunch.io.{From => from} + import org.apache.crunch.scrunch._ + import org.apache.crunch.scrunch.Conversions_ # For implicit type conversions class WordCountExample { val pipeline = new Pipeline[WordCountExample] @@ -80,7 +80,6 @@ the output of a Crunch pipeline into the client: Scrunch is alpha-quality code, written by someone who was learning Scala on the fly. There will be bugs, rough edges, and non-idiomatic Scala usage all over the place. This will improve with time, and we welcome contributions from Scala experts who are interested in helping us make Scrunch into a first-class project. -The Crunch developers mailing list is [here](https://groups.google.com/a/cloudera.org/group/crunch-dev/topics). Scrunch emerged out of conversations with [Dmitriy Ryaboy](http://twitter.com/#!/squarecog), [Oscar Boykin](http://twitter.com/#!/posco), and [Avi Bryant](http://twitter.com/#!/avibryant) from Twitter.
