Hi All,

I am using the new Apache Spark version 1.4.0 Data-frames API to extract 
information from Twitter's Status JSON, mostly focused on the Entities Object 
<https://dev.twitter.com/overview/api/entities> - the relevant part to this 
question is showed below:

{
  ...
  ...
  "entities": {
    "hashtags": [],
    "trends": [],
    "urls": [],
    "user_mentions": [
      {
        "screen_name": "linobocchini",
        "name": "Lino Bocchini",
        "id": 187356243,
        "id_str": "187356243",
        "indices": [ 3, 16 ]
      },
      {
        "screen_name": "jeanwyllys_real",
        "name": "Jean Wyllys",
        "id": 111123176,
        "id_str": "111123176",
        "indices": [ 79, 95 ]
      }
    ],
    "symbols": []
  },
  ...
  ...
}
There are several examples on how extract information from primitives types as 
string, integer, etc - but I couldn't find anything on how to process those 
kind of complex structures.

I tried the code below but it is still doesn't work, it throws an Exception

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

val tweets = sqlContext.read.json("tweets.json")

// this function is just to filter empty entities.user_mentions[] nodes
// some tweets doesn't contains any mentions
import org.apache.spark.sql.functions.udf
val isEmpty = udf((value: List[Any]) => value.isEmpty)

import org.apache.spark.sql._
import sqlContext.implicits._
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
String, screenName: String)

val mentions = tweets.select("entities.user_mentions").
  filter(!isEmpty($"user_mentions")).
  explode($"user_mentions") {
  case Row(arr: Array[Row]) => arr.map { elem =>
    UserMention(
      elem.getAs[Long]("id"),
      elem.getAs[String]("is_str"),
      elem.getAs[Array[Long]]("indices"),
      elem.getAs[String]("name"),
      elem.getAs[String]("screen_name"))
  }
}

mentions.first
Exception when I try to call mentions.first:

scala>     mentions.first
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean 
Wyllys,jeanwyllys_real])] (of class 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
    at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
    at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
    at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
    at 
org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
What is wrong here? I understand it is related to the types but I couldn't 
figure out it yet.

As additional context, the structure mapped automatically is:

scala> mentions.printSchema
root
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screen_name: string (nullable = true)
NOTE 1: I know it is possible to solve this using HiveQL but I would like to 
use Data-frames once there is so much momentum around it.

SELECT explode(entities.user_mentions) as mentions
FROM tweets
NOTE 2: the UDF val isEmpty = udf((value: List[Any]) => value.isEmpty) is a 
ugly hack and I'm missing something here, but was the only way I came up to 
avoid a NPE


I’ve posted the same question on SO: 
http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames

Thanks all!
- gustavo

Reply via email to