this is a bit strange, because you’re trying to create an RDD inside of a foreach function (the jsonElements). This executes on the workers, and so will actually produce a different instance in each JVM on each worker, not one single RDD referenced by the driver, which is what I think you’re trying to get.
Why don’t you try something like: JavaDStream<String> jsonElements = lines.flatMap( … ) and just skip the lines.foreach? > On Mar 8, 2016, at 11:59 AM, Nesrine BEN MUSTAPHA > <nesrine.benmusta...@gmail.com> wrote: > > Hello, > > I tried to use sparkSQL to analyse json data streams within a standalone > application. > > here the code snippet that receive the streaming data: > final JavaReceiverInputDStream<String> lines = > streamCtx.socketTextStream("localhost", Integer.parseInt(args[0]), > StorageLevel.MEMORY_AND_DISK_SER_2()); > > lines.foreachRDD((rdd) -> { > > final JavaRDD<String> jsonElements = rdd.flatMap(new FlatMapFunction<String, > String>() { > > @Override > > public Iterable<String> call(final String line) > > throws Exception { > > return Arrays.asList(line.split("\n")); > > } > > }).filter(new Function<String, Boolean>() { > > @Override > > public Boolean call(final String v1) > > throws Exception { > > return v1.length() > 0; > > } > > }); > > //System.out.println("Data Received = " + jsonElements.collect().size()); > > final SQLContext sqlContext = > JavaSQLContextSingleton.getInstance(rdd.context()); > > final DataFrame dfJsonElement = sqlContext.read().json(jsonElements); > > executeSQLOperations(sqlContext, dfJsonElement); > > }); > > streamCtx.start(); > > streamCtx.awaitTermination(); > > } > > > > > > > > > > I got the following error when the red line is executed: > > java.lang.ClassNotFoundException: > com.intrinsec.common.spark.SQLStreamingJsonAnalyzer$2 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) > > > > > >