One thing I noticed around the place where you get the first error -- you are calling words.map instead of words.mapToPair. map produces JavaRDD<R> whereas mapToPair gives you a JavaPairRDD. I don't use the Java APIs myself but it looks to me like you need to check the types more carefully.
On Mon, Jun 23, 2014 at 2:15 PM, Alonso Isidoro Roman <alons...@gmail.com> wrote: > Hi all, > > I am new to Spark, so this is probably a basic question. i want to explore > the possibilities of this fw, concretely using it in conjunction with 3 > party libs, like mongodb, for example. > > I have been keeping instructions from > http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ in order to > connect spark with mongodb. This example is made with > spark-core_2.1.0-0.9.0.jar so first thing i did is to update pom.xml with > latest versions. > > This is my pom.xml > > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > > <groupId>com.aironman.spark</groupId> > > <artifactId>simple-project</artifactId> > > <modelVersion>4.0.0</modelVersion> > > <name>Simple Project</name> > > <packaging>jar</packaging> > > <version>1.0</version> > > <repositories> > > <repository> > > <id>Akka repository</id> > > <url>http://repo.akka.io/releases</url> > > </repository> > > </repositories> > > <dependencies> > > <dependency> <!-- Spark dependency --> > > <groupId>org.apache.spark</groupId> > > <artifactId>spark-core_2.10</artifactId> > > <version>1.0.0</version> > > </dependency> > > > <dependency> > > <groupId>org.mongodb</groupId> > > <artifactId>mongo-hadoop-core</artifactId> > > <version>1.0.0</version> > > </dependency> > > > </dependencies> > > </project> > > As you can see, super simple pom.xml > > And this is the JavaWordCount.java > > import java.util.Arrays; > > import java.util.Collections; > > > import org.apache.hadoop.conf.Configuration; > > import org.apache.spark.api.java.JavaPairRDD; > > import org.apache.spark.api.java.JavaRDD; > > import org.apache.spark.api.java.JavaSparkContext; > > import org.apache.spark.api.java.function.FlatMapFunction; > > import org.apache.spark.api.java.function.Function2; > > import org.apache.spark.api.java.function.PairFunction; > > import org.bson.BSONObject; > > import org.bson.BasicBSONObject; > > > import scala.Tuple2; > > > import com.mongodb.hadoop.MongoOutputFormat; > > > /*** > > * Esta clase se supone que se conecta a un cluster mongodb para ejecutar > una tarea word count por cada palabra almacenada en la bd. > > * el problema es que esta api esta rota, creo. Estoy usando la ultima > version del fw, la 1.0.0. Esto no es spark-streaming, lo suyo seria usar un > ejemplo > > * sobre spark-streaming conectandose a un base mongodb, o usar > spark-streaming junto con spring integration, es decir, conectar spark con > un servicio web que > > * periodicamente alimentaria spark... > > * @author aironman > > * > > */ > > public class JavaWordCount { > > > > public static void main(String[] args) { > > > > JavaSparkContext sc = new JavaSparkContext("local", "Java Word > Count"); > > > > Configuration config = new Configuration(); > > config.set("mongo.input.uri", > "mongodb:127.0.0.1:27017/beowulf.input"); > > config.set("mongo.output.uri", > "mongodb:127.0.0.1:27017/beowulf.output"); > > > > > > JavaPairRDD<Object, BSONObject> mongoRDD = > sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class, > Object.class, BSONObject.class); > > > > // Input contains tuples of (ObjectId, BSONObject) > > JavaRDD<String> words = mongoRDD.flatMap(new > FlatMapFunction<Tuple2<Object, BSONObject>, String>() { > > @Override > > public Iterable<String> call(Tuple2<Object, BSONObject> arg) { > > Object o = arg._2.get("text"); > > if (o instanceof String) { > > String str = (String) o; > > str = str.toLowerCase().replaceAll("[.,!?\n]", " "); > > return Arrays.asList(str.split(" ")); > > } else { > > return Collections.emptyList(); > > } > > } > > }); > > //here is an error, The method map(Function<String,R>) in the type > JavaRDD<String> is not applicable for the arguments (new > PairFunction<String,String,Integer>(){}) > > JavaPairRDD<String, Integer> ones = words.map(new > PairFunction<String, String, Integer>() { > > public Tuple2<String, Integer> call(String s) { > > return new Tuple2<>(s, 1); > > } > > }); > > JavaPairRDD<String, Integer> counts = ones.reduceByKey(new > Function2<Integer, Integer, Integer>() { > > public Integer call(Integer i1, Integer i2) { > > return i1 + i2; > > } > > }); > > > > //another error, The method map(Function<Tuple2<String,Integer>,R>) > in the type JavaPairRDD<String,Integer> is not applicable for the arguments > (new //PairFunction<Tuple2<String,Integer>,Object,BSONObject>(){}) > > > // Output contains tuples of (null, BSONObject) - ObjectId will be > generated by Mongo driver if null > > JavaPairRDD<Object, BSONObject> save = counts.map(new > PairFunction<Tuple2<String, Integer>, Object, BSONObject>() { > > @Override > > public Tuple2<Object, BSONObject> call(Tuple2<String, Integer> > tuple) { > > BSONObject bson = new BasicBSONObject(); > > bson.put("word", tuple._1); > > bson.put("count", tuple._2); > > return new Tuple2<>(null, bson); > > } > > }); > > > > // Only MongoOutputFormat and config are relevant > > save.saveAsNewAPIHadoopFile("file:/bogus", Object.class, > Object.class, MongoOutputFormat.class, config); > > } > > > > } > > > It looks like jar hell dependency, isn't it? can anyone guide or help me? > > Another thing, i don t like closures, is it possible to use this fw without > using it? > Another question, are this objects, JavaSparkContext sc, JavaPairRDD<Object, > BSONObject> mongoRDD ThreadSafe? Can i use them as singleton? > > Thank you very much and apologizes if the questions are not trending topic > :) > > Alonso Isidoro Roman. > > Mis citas preferidas (de hoy) : > "Si depurar es el proceso de quitar los errores de software, entonces > programar debe ser el proceso de introducirlos..." > - Edsger Dijkstra > > My favorite quotes (today): > "If debugging is the process of removing software bugs, then programming > must be the process of putting ..." > - Edsger Dijkstra > > "If you pay peanuts you get monkeys" >