Hi all,

I am new to Spark, so this is probably a basic question. i want to explore
the possibilities of this fw, concretely using it in conjunction with 3
party libs, like mongodb, for example.

I have been keeping instructions from
http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ in order to
connect spark with mongodb. This example is made with
spark-core_2.1.0-0.9.0.jar so first thing i did is to update pom.xml with
latest versions.

This is my pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance";

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>

<groupId>com.aironman.spark</groupId>

<artifactId>simple-project</artifactId>

<modelVersion>4.0.0</modelVersion>

<name>Simple Project</name>

<packaging>jar</packaging>

<version>1.0</version>

<repositories>

<repository>

<id>Akka repository</id>

<url>http://repo.akka.io/releases</url>

</repository>

</repositories>

<dependencies>

<dependency> <!-- Spark dependency -->

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.10</artifactId>

<version>1.0.0</version>

</dependency>


 <dependency>

<groupId>org.mongodb</groupId>

<artifactId>mongo-hadoop-core</artifactId>

<version>1.0.0</version>

</dependency>


 </dependencies>
</project>

As you can see, super simple pom.xml

And this is the JavaWordCount.java

import java.util.Arrays;

import java.util.Collections;


import org.apache.hadoop.conf.Configuration;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.bson.BSONObject;

import org.bson.BasicBSONObject;


import scala.Tuple2;


import com.mongodb.hadoop.MongoOutputFormat;


/***

 * Esta clase se supone que se conecta a un cluster mongodb para ejecutar
una tarea word count por cada palabra almacenada en la bd.

 * el problema es que esta api esta rota, creo. Estoy usando la ultima
version del fw, la 1.0.0. Esto no es spark-streaming, lo suyo seria usar un
ejemplo

 * sobre spark-streaming conectandose a un base mongodb, o usar spark-streaming
junto con spring integration, es decir, conectar spark con un servicio web
que

 * periodicamente alimentaria spark...

 * @author aironman

 *

 */

public class JavaWordCount {



    public static void main(String[] args) {



        JavaSparkContext sc = new JavaSparkContext("local", "Java Word
Count");



        Configuration config = new Configuration();

        config.set("mongo.input.uri", "mongodb:127.0.0.1:27017/beowulf.input
");

        config.set("mongo.output.uri", "mongodb:
127.0.0.1:27017/beowulf.output");





        JavaPairRDD<Object, BSONObject> mongoRDD =
sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class,
Object.class, BSONObject.class);



//         Input contains tuples of (ObjectId, BSONObject)

        JavaRDD<String> words = mongoRDD.flatMap(new
FlatMapFunction<Tuple2<Object,
BSONObject>, String>() {

            @Override

            public Iterable<String> call(Tuple2<Object, BSONObject> arg) {

                Object o = arg._2.get("text");

                if (o instanceof String) {

                    String str = (String) o;

                    str = str.toLowerCase().replaceAll("[.,!?\n]", " ");

                    return Arrays.asList(str.split(" "));

                } else {

                    return Collections.emptyList();

                }

            }

        });

        *//here is an error, The method map(Function<String,R>) in the type
JavaRDD<String> is not applicable for the arguments (new
PairFunction<String,String,Integer>(){})*

        JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String,
String, Integer>() {

            public Tuple2<String, Integer> call(String s) {

                return new Tuple2<>(s, 1);

            }

        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new
Function2<Integer,
Integer, Integer>() {

            public Integer call(Integer i1, Integer i2) {

                return i1 + i2;

            }

        });



        *//another error, The method
map(Function<Tuple2<String,Integer>,R>) in the type
JavaPairRDD<String,Integer> is not applicable for the arguments (new
 //PairFunction<Tuple2<String,Integer>,Object,BSONObject>(){})*


//         Output contains tuples of (null, BSONObject) - ObjectId will be
generated by Mongo driver if null

        JavaPairRDD<Object, BSONObject> save = counts.map(new
PairFunction<Tuple2<String,
Integer>, Object, BSONObject>() {

            @Override

            public Tuple2<Object, BSONObject> call(Tuple2<String, Integer>
tuple) {

                BSONObject bson = new BasicBSONObject();

                bson.put("word", tuple._1);

                bson.put("count", tuple._2);

                return new Tuple2<>(null, bson);

            }

        });



//         Only MongoOutputFormat and config are relevant

        save.saveAsNewAPIHadoopFile("file:/bogus", Object.class, Object.
class, MongoOutputFormat.class, config);

    }



}

It looks like jar hell dependency, isn't it?  can anyone guide or help me?

Another thing, i don t like closures, is it possible to use this fw without
using it?
Another question, are this objects, JavaSparkContext sc, JavaPairRDD<Object,
BSONObject> mongoRDD ThreadSafe? Can i use them as singleton?

Thank you very much and apologizes if the questions are not trending topic
:)

Alonso Isidoro Roman.

Mis citas preferidas (de hoy) :
"Si depurar es el proceso de quitar los errores de software, entonces
programar debe ser el proceso de introducirlos..."
 -  Edsger Dijkstra

My favorite quotes (today):
"If debugging is the process of removing software bugs, then programming
must be the process of putting ..."
  - Edsger Dijkstra

"If you pay peanuts you get monkeys"

Reply via email to