Analyzing json Data streams using sparkSQL in spark streaming returns java.lang.ClassNotFoundException

2016-03-08 Thread Nesrine BEN MUSTAPHA
Hello,

I tried to use sparkSQL to analyse json data streams within a standalone
application.

here the code snippet that receive the streaming data:

*final JavaReceiverInputDStream lines =
streamCtx.socketTextStream("localhost", Integer.parseInt(args[0]),
StorageLevel.MEMORY_AND_DISK_SER_2());*

*lines.foreachRDD((rdd) -> {*

*final JavaRDD jsonElements = rdd.flatMap(new
FlatMapFunction() {*

*@Override*

*public Iterable call(final String line)*

*throws Exception {*

*return Arrays.asList(line.split("\n"));*

*}*

*}).filter(new Function() {*

*@Override*

*public Boolean call(final String v1)*

*throws Exception {*

*return v1.length() > 0;*

*}*

*});*

*//System.out.println("Data Received = " + jsonElements.collect().size());*

*final SQLContext sqlContext =
JavaSQLContextSingleton.getInstance(rdd.context());*

*final DataFrame dfJsonElement = sqlContext.read().json(jsonElements);
 *

*executeSQLOperations(sqlContext, dfJsonElement);*

*});*

*streamCtx.start();*

*streamCtx.awaitTermination();*

*}*


I got the following error when the red line is executed:

java.lang.ClassNotFoundException:
com.intrinsec.common.spark.SQLStreamingJsonAnalyzer$2
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)


local class incompatible: stream classdesc

2016-03-01 Thread Nesrine BEN MUSTAPHA
Hi,

I installed a standalone spark cluster with two workers. I developed a Java
Application that use the maven dependency of spark (same version as the
spark cluster).

In my class Spark jobs I have only two methods considered as two different
jobs:

the first one is the example of spark word count as follows:

*public static List>
sparkWordCount(final String inputFilePath, final String
outputFilePath, final String masterUrl)
{*

*final SparkConf conf = new SparkConf();*

*conf.set("spark.ui.port", "7077");*

*conf.setAppName("My First Agent Spark Application");*

*final String[] jars = {*

*"target/spark-1.0-SNAPSHOT-jar-with-dependencies.jar"*

*};*

*conf.setJars(jars);*

*conf.set("spark.eventLog.dir", "/Users/nbs/spark-1.6.0/tmp/spark-events");*

*conf.set("spark.eventLog.enabled", "true");*

*conf.setSparkHome("/Users/nbs/spark-1.6.0/");*

*conf.setMaster("spark://Nesrines-Mac-mini.local:7077");*

*final JavaSparkContext sc = new JavaSparkContext(conf);*

*try {*

*final JavaRDD rdd = sc.textFile(inputPath);*

*final JavaPairRDD counts = rdd.flatMap(new FlatMapFunction() {*

*@Override*

*public Iterable call(final String x) {*

*return Arrays.asList(x.split(" "));*

*}*

*}).mapToPair(new PairFunction() {*

*@Override*

*public Tuple2 call(final String x) {*

*return new Tuple2<>(x, 1);*

*}*

*}).reduceByKey(new Function2() {*

*@Override*

*public Integer call(final Integer x, final Integer y) {*

*return x + y;*

*}*

*});*

*// counts.saveAsTextFile(outputPath);*

*final List> results = counts.collect();*

*return results;*

*} finally {*

*sc.close();*

*}*

*}*


When I try to execute this job and try to get the answer. an exception is
invoked when the collect() instruction is called.

PS: others jobs are executed without problems.

Thanks,




org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
0.0 (TID 5, 172.22.142.19): java.io.InvalidClassException:
com.intrinsec.ict.common.spark.SparkJobs$1; local class incompatible:
stream classdesc serialVersionUID = -1239009763274695582, local class
serialVersionUID = -88119554882139439

at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)

at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)

at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)