I have a streaming app that receives very complicated JSON (twitter status). I would like to work with it as a hash map. It would be very difficult to define a pojo for this JSON. (I can not use twitter4j) // map json string to map<String, Object>
JavaRDD<Hashtable<String, String>> jsonMapRDD = powerTrackRDD.map(new Function<String, Hashtable<String,String>>(){ private static final long serialVersionUID = 1L; @Override public Hashtable<String, String> call(String line) throws Exception { //HashMap<String, String> hm = JsonUtils.jsonToHashMap(line); //Hashtable<String,String> ret = new Hashtable<String, String>(hm); Hashtable<String, String> ret = null; return ret; }}); Using the sqlContext works how ever I assume that this is going to be slow and error prone given it likely many key/value pairs are missing DataFrame df = sqlContext.read().json(getFilePath().toString()); df.printSchema(); Any suggestions would be greatly appriciated Andy org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal a:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15 0) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11 1) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.map(RDD.scala:323) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46) at com.pws.sparkStreaming.collector.SavePowerTrackActivityStream.test(SavePower TrackActivityStream.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl .java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod. java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.j ava:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.ja va:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.jav a:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav a:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav a:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26 ) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestRef erence.java:86) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:3 8) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu nner.java:459) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu nner.java:675) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner. java:382) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner .java:192) Caused by: java.io.NotSerializableException: com.pws.sparkStreaming.collector.SavePowerTrackActivityStream Serialization stack: - object not serializable (class: com.pws.sparkStreaming.collector.SavePowerTrackActivityStream, value: com.pws.sparkStreaming.collector.SavePowerTrackActivityStream@f438904) - field (class: com.pws.sparkStreaming.collector.SavePowerTrackActivityStream$1, name: this$0, type: class com.pws.sparkStreaming.collector.SavePowerTrackActivityStream) - object (class com.pws.sparkStreaming.collector.SavePowerTrackActivityStream$1, com.pws.sparkStreaming.collector.SavePowerTrackActivityStream$1@3fa7df1) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(Serializ ationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializ er.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer. scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal a:301) ... 37 more