Hi all, I am new to Spark, so this is probably a basic question. i want to explore the possibilities of this fw, concretely using it in conjunction with 3 party libs, like mongodb, for example.
I have been keeping instructions from http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ in order to connect spark with mongodb. This example is made with spark-core_2.1.0-0.9.0.jar so first thing i did is to update pom.xml with latest versions. This is my pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=" http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <groupId>com.aironman.spark</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <repositories> <repository> <id>Akka repository</id> <url>http://repo.akka.io/releases</url> </repository> </repositories> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-hadoop-core</artifactId> <version>1.0.0</version> </dependency> </dependencies> </project> As you can see, super simple pom.xml And this is the JavaWordCount.java import java.util.Arrays; import java.util.Collections; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.bson.BSONObject; import org.bson.BasicBSONObject; import scala.Tuple2; import com.mongodb.hadoop.MongoOutputFormat; /*** * Esta clase se supone que se conecta a un cluster mongodb para ejecutar una tarea word count por cada palabra almacenada en la bd. * el problema es que esta api esta rota, creo. Estoy usando la ultima version del fw, la 1.0.0. Esto no es spark-streaming, lo suyo seria usar un ejemplo * sobre spark-streaming conectandose a un base mongodb, o usar spark-streaming junto con spring integration, es decir, conectar spark con un servicio web que * periodicamente alimentaria spark... * @author aironman * */ public class JavaWordCount { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext("local", "Java Word Count"); Configuration config = new Configuration(); config.set("mongo.input.uri", "mongodb:127.0.0.1:27017/beowulf.input "); config.set("mongo.output.uri", "mongodb: 127.0.0.1:27017/beowulf.output"); JavaPairRDD<Object, BSONObject> mongoRDD = sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class, Object.class, BSONObject.class); // Input contains tuples of (ObjectId, BSONObject) JavaRDD<String> words = mongoRDD.flatMap(new FlatMapFunction<Tuple2<Object, BSONObject>, String>() { @Override public Iterable<String> call(Tuple2<Object, BSONObject> arg) { Object o = arg._2.get("text"); if (o instanceof String) { String str = (String) o; str = str.toLowerCase().replaceAll("[.,!?\n]", " "); return Arrays.asList(str.split(" ")); } else { return Collections.emptyList(); } } }); *//here is an error, The method map(Function<String,R>) in the type JavaRDD<String> is not applicable for the arguments (new PairFunction<String,String,Integer>(){})* JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); *//another error, The method map(Function<Tuple2<String,Integer>,R>) in the type JavaPairRDD<String,Integer> is not applicable for the arguments (new //PairFunction<Tuple2<String,Integer>,Object,BSONObject>(){})* // Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null JavaPairRDD<Object, BSONObject> save = counts.map(new PairFunction<Tuple2<String, Integer>, Object, BSONObject>() { @Override public Tuple2<Object, BSONObject> call(Tuple2<String, Integer> tuple) { BSONObject bson = new BasicBSONObject(); bson.put("word", tuple._1); bson.put("count", tuple._2); return new Tuple2<>(null, bson); } }); // Only MongoOutputFormat and config are relevant save.saveAsNewAPIHadoopFile("file:/bogus", Object.class, Object. class, MongoOutputFormat.class, config); } } It looks like jar hell dependency, isn't it? can anyone guide or help me? Another thing, i don t like closures, is it possible to use this fw without using it? Another question, are this objects, JavaSparkContext sc, JavaPairRDD<Object, BSONObject> mongoRDD ThreadSafe? Can i use them as singleton? Thank you very much and apologizes if the questions are not trending topic :) Alonso Isidoro Roman. Mis citas preferidas (de hoy) : "Si depurar es el proceso de quitar los errores de software, entonces programar debe ser el proceso de introducirlos..." - Edsger Dijkstra My favorite quotes (today): "If debugging is the process of removing software bugs, then programming must be the process of putting ..." - Edsger Dijkstra "If you pay peanuts you get monkeys"