this is a bit strange, because you’re trying to create an RDD inside of a 
foreach function (the jsonElements). This executes on the workers, and so will 
actually produce a different instance in each JVM on each worker, not one 
single RDD referenced by the driver, which is what I think you’re trying to get.

Why don’t you try something like:

JavaDStream<String> jsonElements = lines.flatMap( … )

and just skip the lines.foreach?

> On Mar 8, 2016, at 11:59 AM, Nesrine BEN MUSTAPHA 
> <nesrine.benmusta...@gmail.com> wrote:
> 
> Hello,
> 
> I tried to use sparkSQL to analyse json data streams within a standalone 
> application. 
> 
> here the code snippet that receive the streaming data: 
> final JavaReceiverInputDStream<String> lines = 
> streamCtx.socketTextStream("localhost", Integer.parseInt(args[0]), 
> StorageLevel.MEMORY_AND_DISK_SER_2());
> 
> lines.foreachRDD((rdd) -> {
> 
> final JavaRDD<String> jsonElements = rdd.flatMap(new FlatMapFunction<String, 
> String>() {
> 
> @Override
> 
> public Iterable<String> call(final String line)
> 
> throws Exception {
> 
> return Arrays.asList(line.split("\n"));
> 
> }
> 
> }).filter(new Function<String, Boolean>() {
> 
> @Override
> 
> public Boolean call(final String v1)
> 
> throws Exception {
> 
> return v1.length() > 0;
> 
> }
> 
> });
> 
> //System.out.println("Data Received = " + jsonElements.collect().size());
> 
> final SQLContext sqlContext = 
> JavaSQLContextSingleton.getInstance(rdd.context());
> 
> final DataFrame dfJsonElement = sqlContext.read().json(jsonElements);     
> 
> executeSQLOperations(sqlContext, dfJsonElement);
> 
> });
> 
> streamCtx.start();
> 
> streamCtx.awaitTermination();
> 
> }
> 
> 
> 
> 
> 
> 
> 
> 
> 
> I got the following error when the red line is executed:
> 
> java.lang.ClassNotFoundException: 
> com.intrinsec.common.spark.SQLStreamingJsonAnalyzer$2
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> 
> 
> 
> 
> 
> 

Reply via email to