Repository: crunch Updated Branches: refs/heads/master ce9aaa3a5 -> 047d8fd36
CRUNCH-618: Run on Spark 2. Contributed by GergŠPásztor. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/047d8fd3 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/047d8fd3 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/047d8fd3 Branch: refs/heads/master Commit: 047d8fd36773608a3d2cf6445881173e7d26377c Parents: ce9aaa3 Author: Tom White <[email protected]> Authored: Thu Apr 13 16:10:23 2017 +0100 Committer: Tom White <[email protected]> Committed: Thu Apr 13 16:10:23 2017 +0100 ---------------------------------------------------------------------- crunch-kafka/pom.xml | 2 +- crunch-scrunch/pom.xml | 2 +- .../interpreter/InterpreterJarTest.scala | 23 +++++++++------- .../org/apache/crunch/scrunch/PTypeFamily.scala | 4 +-- .../scrunch/interpreter/InterpreterRunner.scala | 27 ++++++++++--------- .../org/apache/crunch/fn/IterableIterator.java | 28 ++++++++++++++++++++ .../crunch/fn/SDoubleFlatMapFunction.java | 2 +- .../org/apache/crunch/fn/SFlatMapFunction.java | 2 +- .../org/apache/crunch/fn/SFlatMapFunction2.java | 2 +- .../java/org/apache/crunch/fn/SFunctions.java | 8 +++--- .../apache/crunch/fn/SPairFlatMapFunction.java | 2 +- .../impl/spark/fn/CombineMapsideFunction.java | 4 +-- .../crunch/impl/spark/fn/CrunchPairTuple2.java | 9 ++----- .../crunch/impl/spark/fn/FlatMapPairDoFn.java | 4 +-- .../crunch/impl/spark/fn/PairFlatMapDoFn.java | 4 +-- .../impl/spark/fn/ReduceGroupingFunction.java | 9 ++----- pom.xml | 10 +++---- 17 files changed, 83 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml index 961b106..32429f5 100644 --- a/crunch-kafka/pom.xml +++ b/crunch-kafka/pom.xml @@ -40,7 +40,7 @@ under the License. </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> </dependency> <dependency> <groupId>org.scala-lang</groupId> http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml index 51925fb..9376059 100644 --- a/crunch-scrunch/pom.xml +++ b/crunch-scrunch/pom.xml @@ -38,7 +38,7 @@ under the License. <artifactId>scala-compiler</artifactId> </dependency> <dependency> - <groupId>org.scala-lang</groupId> + <groupId>jline</groupId> <artifactId>jline</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala index 5ebc303..bc9bd0f 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala @@ -22,8 +22,6 @@ import java.io.FileOutputStream import java.util.jar.JarFile import java.util.jar.JarOutputStream -import scala.tools.nsc.io.VirtualDirectory - import com.google.common.io.Files import org.junit.Assert.assertNotNull import org.junit.Test @@ -31,30 +29,35 @@ import org.apache.crunch.test.CrunchTestSupport import org.scalatest.junit.JUnitSuite import org.apache.crunch.scrunch.CrunchSuite +import scala.tools.nsc.interpreter.{ReplDir, ReplOutput} +import scala.tools.nsc.settings.MutableSettings + /** - * Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}. + * Tests creating jars from a {@link scala.tools.nsc.interpreter.ReplDir}. */ class InterpreterJarTest extends CrunchSuite { /** - * Tests transforming a virtual directory into a temporary jar file. + * Tests transforming an output directory into a temporary jar file. */ - @Test def virtualDirToJar: Unit = { - // Create a virtual directory and populate with some mock content. - val root = new VirtualDirectory("testDir", None) + @Test def outputDirToJar: Unit = { + // Create an output directory and populate with some mock content. + val settings = new MutableSettings(e => println("ERROR: "+e)) + val dirSetting = settings.StringSetting("-Yrepl-outdir", "path", "Test path", "") + val root: ReplDir = new ReplOutput(dirSetting).dir // Add some subdirectories to the root. (1 to 10).foreach { i => - val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory] + val subdir = root.subdirectoryNamed("subdir" + i) // Add some classfiles to each sub directory. (1 to 10).foreach { j => subdir.fileNamed("MyClass" + j + ".class") } } - // Now generate a jar file from the virtual directory. + // Now generate a jar file from the output directory. val tempJar = new File(tempDir.getRootFile(), "replJar.jar") val jarStream = new JarOutputStream(new FileOutputStream(tempJar)) - InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream) + InterpreterRunner.addOutputDirectoryToJar(root, "top/pack/name/", jarStream) jarStream.close() // Verify the contents of the jar. http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala index 47cf637..a140acd 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala @@ -264,8 +264,8 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily { } private def products[T <: Product](tpe: Type, mirror: Mirror): PType[T] = { - val ctor = tpe.member(nme.CONSTRUCTOR).asMethod - val args = ctor.paramss.head.map(x => (x.name.toString, + val ctor = tpe.member(termNames.CONSTRUCTOR).asMethod + val args = ctor.paramLists.head.map(x => (x.name.toString, typeToPType(x.typeSignature, mirror))) val out = (x: Product) => TupleN.of(x.productIterator.toArray.asInstanceOf[Array[Object]] : _*) val rtc = mirror.runtimeClass(tpe) http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala index 0d84381..9416f1f 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala @@ -29,11 +29,9 @@ import scala.tools.nsc.ObjectRunner import scala.tools.nsc.Properties import scala.tools.nsc.ScriptRunner import scala.tools.nsc.interpreter.ILoop -import scala.tools.nsc.io.Jar -import scala.tools.nsc.io.VirtualDirectory +import scala.tools.nsc.io.{AbstractFile, Jar} import com.google.common.io.Files import org.apache.hadoop.conf.Configuration - import org.apache.crunch.util.DistCache import org.apache.commons.io.IOUtils @@ -126,7 +124,7 @@ object InterpreterRunner extends MainGenericRunner { ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments) } else runTarget() match { - case Left(ex) => errorFn(ex.toString()) + case Left(ex) => errorFn(ex.getMessage(), Some(ex)) case Right(b) => b } } @@ -145,11 +143,11 @@ object InterpreterRunner extends MainGenericRunner { def createReplCodeJar(): File = { var jarStream: JarOutputStream = null try { - val virtualDirectory = repl.intp.virtualDirectory.asInstanceOf[VirtualDirectory] + val outputDirectory = repl.replOutput.dir val tempDir = Files.createTempDir() val tempJar = new File(tempDir, "replJar.jar") jarStream = new JarOutputStream(new FileOutputStream(tempJar)) - addVirtualDirectoryToJar(virtualDirectory, "", jarStream) + addOutputDirectoryToJar(outputDirectory, "", jarStream) return tempJar } finally { IOUtils.closeQuietly(jarStream) @@ -157,14 +155,14 @@ object InterpreterRunner extends MainGenericRunner { } /** - * Add the contents of the specified virtual directory to a jar. This method will recursively + * Add the contents of the specified output directory to a jar. This method will recursively * descend into subdirectories to add their contents. * - * @param dir The virtual directory whose contents should be added. - * @param entryPath The entry path for classes found in the virtual directory. + * @param dir The output directory whose contents should be added. + * @param entryPath The entry path for classes found in the output directory. * @param jarStream An output stream for writing the jar file. */ - def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream: + def addOutputDirectoryToJar(dir: AbstractFile, entryPath: String, jarStream: JarOutputStream): Unit = { dir.foreach { file => if (file.isDirectory) { @@ -173,8 +171,7 @@ object InterpreterRunner extends MainGenericRunner { val entry: JarEntry = new JarEntry(dirPath) jarStream.putNextEntry(entry) jarStream.closeEntry() - addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory], - dirPath, jarStream) + addOutputDirectoryToJar(file, dirPath, jarStream) } else if (file.hasExtension("class")) { // Add class files as an entry in the jar file and write the class to the jar. val entry: JarEntry = new JarEntry(entryPath + file.name) @@ -197,7 +194,11 @@ object InterpreterRunner extends MainGenericRunner { // Generate a jar of REPL code and add to the distributed cache. val replJarFile = createReplCodeJar() DistCache.addJarToDistributedCache(configuration, replJarFile) - // Get the paths to jars added with the :cp command. + /** + * Get the paths to jars added with the :cp command. + * The next line will cause a Deprecation Warning, because of the 'repl.addedClasspath', but + * we can safely ignore it as we are not using it to modify the classpath. + */ val addedJarPaths = repl.addedClasspath.split(':') addedJarPaths.foreach { path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration, path) http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java b/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java new file mode 100644 index 0000000..3c06c13 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import java.util.Iterator; + +class IterableIterator<T> implements Iterable<T> { + private final Iterator<T> itr; + IterableIterator(Iterator<T> itr) { + this.itr = itr; + } + public Iterator<T> iterator() { return itr;} +} http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java index f3f67cc..01b0b4c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java @@ -30,7 +30,7 @@ public abstract class SDoubleFlatMapFunction<T> extends SparkDoFn<T, Double> @Override public void process(T input, Emitter<Double> emitter) { try { - for (Double d : call(input)) { + for (Double d : new IterableIterator<Double>(call(input))) { emitter.emit(d); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java index 1fecb76..9ee4d9f 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java @@ -30,7 +30,7 @@ public abstract class SFlatMapFunction<T, R> extends SparkDoFn<T, R> @Override public void process(T input, Emitter<R> emitter) { try { - for (R r : call(input)) { + for (R r : new IterableIterator<R>(call(input))) { emitter.emit(r); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java index 0798f63..d7c6514 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java @@ -32,7 +32,7 @@ public abstract class SFlatMapFunction2<K, V, R> extends DoFn<Pair<K, V>, R> @Override public void process(Pair<K, V> input, Emitter<R> emitter) { try { - for (R r : call(input.first(), input.second())) { + for (R r : new IterableIterator<R>(call(input.first(), input.second()))) { emitter.emit(r); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java index cc59746..0ba7a37 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java @@ -17,6 +17,8 @@ */ package org.apache.crunch.fn; +import java.util.Iterator; + import org.apache.spark.api.java.function.DoubleFlatMapFunction; import org.apache.spark.api.java.function.DoubleFunction; import org.apache.spark.api.java.function.FlatMapFunction; @@ -62,7 +64,7 @@ public final class SFunctions { public static <T, R> SFlatMapFunction<T, R> wrap(final FlatMapFunction<T, R> f) { return new SFlatMapFunction<T, R>() { @Override - public Iterable<R> call(T t) throws Exception { + public Iterator<R> call(T t) throws Exception { return f.call(t); } }; @@ -71,7 +73,7 @@ public final class SFunctions { public static <K, V, R> SFlatMapFunction2<K, V, R> wrap(final FlatMapFunction2<K, V, R> f) { return new SFlatMapFunction2<K, V, R>() { @Override - public Iterable<R> call(K k, V v) throws Exception { + public Iterator<R> call(K k, V v) throws Exception { return f.call(k, v); } }; @@ -89,7 +91,7 @@ public final class SFunctions { public static <T> SDoubleFlatMapFunction<T> wrap(final DoubleFlatMapFunction<T> f) { return new SDoubleFlatMapFunction<T>() { @Override - public Iterable<Double> call(T t) throws Exception { + public Iterator<Double> call(T t) throws Exception { return f.call(t); } }; http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java index 3b8e75a..2becd48 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java @@ -32,7 +32,7 @@ public abstract class SPairFlatMapFunction<T, K, V> extends SparkDoFn<T, Pair<K, @Override public void process(T input, Emitter<Pair<K, V>> emitter) { try { - for (Tuple2<K, V> kv : call(input)) { + for (Tuple2<K, V> kv : new IterableIterator<Tuple2<K, V>>(call(input))) { emitter.emit(Pair.of(kv._1(), kv._2())); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java index 1bea08d..231de77 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java @@ -45,7 +45,7 @@ public class CombineMapsideFunction<K, V> implements PairFlatMapFunction<Iterato } @Override - public Iterable<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception { + public Iterator<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception { ctxt.initialize(combineFn, null); Map<K, List<V>> cache = Maps.newHashMap(); int cnt = 0; @@ -63,7 +63,7 @@ public class CombineMapsideFunction<K, V> implements PairFlatMapFunction<Iterato } } - return new Flattener<K, V>(cache); + return new Flattener<K, V>(cache).iterator(); } private Map<K, List<V>> reduce(Map<K, List<V>> cache) { http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java index d6c544c..ca3011f 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java @@ -27,12 +27,7 @@ import java.util.Iterator; public class CrunchPairTuple2<K, V> implements PairFlatMapFunction<Iterator<Pair<K, V>>, K, V> { @Override - public Iterable<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator) throws Exception { - return new Iterable<Tuple2<K, V>>() { - @Override - public Iterator<Tuple2<K, V>> iterator() { - return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc()); - } - }; + public Iterator<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator) throws Exception { + return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java index 8ec2834..aca59f3 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java @@ -37,9 +37,9 @@ public class FlatMapPairDoFn<K, V, T> implements FlatMapFunction<Iterator<Tuple2 } @Override - public Iterable<T> call(Iterator<Tuple2<K, V>> input) throws Exception { + public Iterator<T> call(Iterator<Tuple2<K, V>> input) throws Exception { ctxt.initialize(fn, null); return new CrunchIterable<Pair<K, V>, T>(fn, - Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())); + Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())).iterator(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java index 7f289cc..c012e96 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java @@ -37,10 +37,10 @@ public class PairFlatMapDoFn<T, K, V> implements PairFlatMapFunction<Iterator<T> } @Override - public Iterable<Tuple2<K, V>> call(Iterator<T> input) throws Exception { + public Iterator<Tuple2<K, V>> call(Iterator<T> input) throws Exception { ctxt.initialize(fn, null); return Iterables.transform( new CrunchIterable<T, Pair<K, V>>(fn, input), - GuavaUtils.<K, V>pair2tupleFunc()); + GuavaUtils.<K, V>pair2tupleFunc()).iterator(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java index d3dd69e..eb14dfe 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java @@ -50,14 +50,9 @@ public class ReduceGroupingFunction implements PairFlatMapFunction<Iterator<Tupl } @Override - public Iterable<Tuple2<ByteArray, List<byte[]>>> call( + public Iterator<Tuple2<ByteArray, List<byte[]>>> call( final Iterator<Tuple2<ByteArray, List<byte[]>>> iter) throws Exception { - return new Iterable<Tuple2<ByteArray, List<byte[]>>>() { - @Override - public Iterator<Tuple2<ByteArray, List<byte[]>>> iterator() { - return new GroupingIterator(iter, rawComparator()); - } - }; + return new GroupingIterator(iter, rawComparator()); } private RawComparator<?> rawComparator() { http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7a570c0..5c45568 100644 --- a/pom.xml +++ b/pom.xml @@ -105,11 +105,11 @@ under the License. <avro.classifier>hadoop2</avro.classifier> <kafka.version>0.10.0.1</kafka.version> - <scala.base.version>2.10</scala.base.version> - <scala.version>2.10.4</scala.version> + <scala.base.version>2.11</scala.base.version> + <scala.version>2.11.8</scala.version> <scalatest.version>2.2.4</scalatest.version> - <spark.version>1.3.1</spark.version> - <jline.version>2.10.4</jline.version> + <spark.version>2.0.0</spark.version> + <jline.version>2.12.1</jline.version> <jsr305.version>1.3.9</jsr305.version> </properties> @@ -456,7 +456,7 @@ under the License. </dependency> <dependency> - <groupId>org.scala-lang</groupId> + <groupId>jline</groupId> <artifactId>jline</artifactId> <version>${jline.version}</version> </dependency>
