Hi All, I am using the new Apache Spark version 1.4.0 Data-frames API to extract information from Twitter's Status JSON, mostly focused on the Entities Object <https://dev.twitter.com/overview/api/entities> - the relevant part to this question is showed below:
{ ... ... "entities": { "hashtags": [], "trends": [], "urls": [], "user_mentions": [ { "screen_name": "linobocchini", "name": "Lino Bocchini", "id": 187356243, "id_str": "187356243", "indices": [ 3, 16 ] }, { "screen_name": "jeanwyllys_real", "name": "Jean Wyllys", "id": 111123176, "id_str": "111123176", "indices": [ 79, 95 ] } ], "symbols": [] }, ... ... } There are several examples on how extract information from primitives types as string, integer, etc - but I couldn't find anything on how to process those kind of complex structures. I tried the code below but it is still doesn't work, it throws an Exception val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) val tweets = sqlContext.read.json("tweets.json") // this function is just to filter empty entities.user_mentions[] nodes // some tweets doesn't contains any mentions import org.apache.spark.sql.functions.udf val isEmpty = udf((value: List[Any]) => value.isEmpty) import org.apache.spark.sql._ import sqlContext.implicits._ case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String) val mentions = tweets.select("entities.user_mentions"). filter(!isEmpty($"user_mentions")). explode($"user_mentions") { case Row(arr: Array[Row]) => arr.map { elem => UserMention( elem.getAs[Long]("id"), elem.getAs[String]("is_str"), elem.getAs[Array[Long]]("indices"), elem.getAs[String]("name"), elem.getAs[String]("screen_name")) } } mentions.first Exception when I try to call mentions.first: scala> mentions.first 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8) scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55) at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81) What is wrong here? I understand it is related to the types but I couldn't figure out it yet. As additional context, the structure mapped automatically is: scala> mentions.printSchema root |-- user_mentions: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- id: long (nullable = true) | | |-- id_str: string (nullable = true) | | |-- indices: array (nullable = true) | | | |-- element: long (containsNull = true) | | |-- name: string (nullable = true) | | |-- screen_name: string (nullable = true) NOTE 1: I know it is possible to solve this using HiveQL but I would like to use Data-frames once there is so much momentum around it. SELECT explode(entities.user_mentions) as mentions FROM tweets NOTE 2: the UDF val isEmpty = udf((value: List[Any]) => value.isEmpty) is a ugly hack and I'm missing something here, but was the only way I came up to avoid a NPE I’ve posted the same question on SO: http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames Thanks all! - gustavo