Hi,

*Spark RuntimeException due to Unsupported datatype NullType , *When saving
null primitives *jsonRDD *with *.saveAsParquetFile()*

*Code: I am trying to* store jsonRDD into Parquet file using *saveAsParquetFile
with below code.*

JavaRDD<String> javaRDD = ssc.sparkContext().parallelize(jsonData);
JavaSchemaRDD schemaObject = sqlContext.jsonRDD(javaRDD);
*schemaObject.saveAsParquetFile*("tweets/tweet" + time.toString().replace("
ms", "") + ".parquet");

*Input: *In below *JSON input* have some *null values* which are not
supported by spark and throwing error as *Unsupported datatype NullType.*

{"id":"tag:search.twitter.com
,2005:11111111111111","objectType":"activity","actor":{"objectType":"person","id":"id:
twitter.com:1111111","link":"http://www.twitter.com/funtubevids","displayName":"مشاهد
حول العالم","postedTime":"2014-05-01T06:14:51.000Z","image":"
https://pbs.twimg.com/profile_images/11111111111/VORNn-Df_normal.png";,
*"summary"*:*null*,"links":[{*"href":null*
,"rel":"me"}],"friendsCount":0,"followersCount":49,"listedCount":0,"statusesCount":61,
*"twitterTimeZone":null*,"verified":false*,"utcOffset":null*
,"preferredUsername":"funtubevids","languages":["en"],"favoritesCount":0},"verb":"post","postedTime":"2014-05-27T17:33:54.000Z","generator":{"displayName":"web","link":"
http://twitter.com
"},"provider":{"objectType":"service","displayName":"Twitter","link":"
http://www.twitter.com"},"link":";
http://twitter.com/funtubevids/statuses/1111111111111","body":"القيادة في
مدرج الطيران #مهبط #مدرج #مطار #هبوط #قيادة #سيارة #طائرة #airport #plane
#car https://t.co/gnn7LKE6pC","object":"urls":[{"url":";
https://t.co/gnn7LKE6pC","expanded_url":";
https://www.youtube.com/watch?v=J-j6RSRMvRo
","expanded_status":200}],"klout_score":10,"language":{"value":"ar"}}}


*ERROR* scheduler.JobScheduler: Error running job streaming job
1407741190000 ms.0
*java.lang.RuntimeException: Unsupported datatype NullType*
               at scala.sys.package$.error(package.scala:27)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:267)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$2.apply(ParquetTypes.scala:244)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$2.apply(ParquetTypes.scala:244)
               at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
               at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
               at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
               at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
               at
scala.collection.AbstractTraversable.map(Traversable.scala:105)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:243)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:235)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$2.apply(ParquetTypes.scala:244)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$2.apply(ParquetTypes.scala:244)
               at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
               at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
               at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
               at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
               at
scala.collection.AbstractTraversable.map(Traversable.scala:105)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:243)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$3.apply(ParquetTypes.scala:287)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$3.apply(ParquetTypes.scala:286)
               at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
               at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
               at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
               at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
               at
scala.collection.AbstractTraversable.map(Traversable.scala:105)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:285)
               at
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:331)
               at
org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:133)
               at
org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:112)
               at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:156)

Please provide your valuable solution for above issue.

Thanks in Advance!.

Regards,

Rafeeq S
*(“What you do is what matters, not what you think or say or plan.” )*

Reply via email to