Re: How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames

2015-07-18 Thread Naveen Madhire
I am facing the same issue, i tried this but getting compilation error for
the "$" in the explode function

So, I had to modify to the below to make it work.

df.select(explode(new Column("entities.user_mentions")).as("mention"))




On Wed, Jun 24, 2015 at 2:48 PM, Michael Armbrust 
wrote:

> Starting in Spark 1.4 there is also an explode that you can use directly
> from the select clause (much like in HiveQL):
>
> import org.apache.spark.sql.functions._
> df.select(explode($"entities.user_mentions").as("mention"))
>
> Unlike standard HiveQL, you can also include other attributes in the
> select or even $"*".
>
>
> On Wed, Jun 24, 2015 at 8:34 AM, Yin Huai  wrote:
>
>> The function accepted by explode is f: Row => TraversableOnce[A]. Seems
>> user_mentions is an array of structs. So, can you change your
>> pattern matching to the following?
>>
>> case Row(rows: Seq[_]) => rows.asInstanceOf[Seq[Row]].map(elem => ...)
>>
>> On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones <
>> garjo...@socialmetrix.com> wrote:
>>
>>> Hi All,
>>>
>>> I am using the new *Apache Spark version 1.4.0 Data-frames API* to
>>> extract information from Twitter's Status JSON, mostly focused on the 
>>> Entities
>>> Object  - the relevant
>>> part to this question is showed below:
>>>
>>> {
>>>   ...
>>>   ...
>>>   "entities": {
>>> "hashtags": [],
>>> "trends": [],
>>> "urls": [],
>>> "user_mentions": [
>>>   {
>>> "screen_name": "linobocchini",
>>> "name": "Lino Bocchini",
>>> "id": 187356243,
>>> "id_str": "187356243",
>>> "indices": [ 3, 16 ]
>>>   },
>>>   {
>>> "screen_name": "jeanwyllys_real",
>>> "name": "Jean Wyllys",
>>> "id": 23176,
>>> "id_str": "23176",
>>> "indices": [ 79, 95 ]
>>>   }
>>> ],
>>> "symbols": []
>>>   },
>>>   ...
>>>   ...
>>> }
>>>
>>> There are several examples on how extract information from primitives
>>> types as string, integer, etc - but I couldn't find anything on how to
>>> process those kind of *complex* structures.
>>>
>>> I tried the code below but it is still doesn't work, it throws an
>>> Exception
>>>
>>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>> val tweets = sqlContext.read.json("tweets.json")
>>>
>>> // this function is just to filter empty entities.user_mentions[] nodes
>>> // some tweets doesn't contains any mentions
>>> import org.apache.spark.sql.functions.udf
>>> val isEmpty = udf((value: List[Any]) => value.isEmpty)
>>>
>>> import org.apache.spark.sql._
>>> import sqlContext.implicits._
>>> case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
>>> String, screenName: String)
>>>
>>> val mentions = tweets.select("entities.user_mentions").
>>>   filter(!isEmpty($"user_mentions")).
>>>   explode($"user_mentions") {
>>>   case Row(arr: Array[Row]) => arr.map { elem =>
>>> UserMention(
>>>   elem.getAs[Long]("id"),
>>>   elem.getAs[String]("is_str"),
>>>   elem.getAs[Array[Long]]("indices"),
>>>   elem.getAs[String]("name"),
>>>   elem.getAs[String]("screen_name"))
>>>   }
>>> }
>>>
>>> mentions.first
>>>
>>> Exception when I try to call mentions.first:
>>>
>>> scala> mentions.first
>>> 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
>>> scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
>>> Bocchini,linobocchini], [23176,23176,List(79, 95),Jean 
>>> Wyllys,jeanwyllys_real])] (of class 
>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>>> at 
>>> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
>>> at 
>>> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
>>> at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
>>> at 
>>> org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
>>>
>>> What is wrong here? I understand it is related to the types but I
>>> couldn't figure out it yet.
>>>
>>> As additional context, the structure mapped automatically is:
>>>
>>> scala> mentions.printSchema
>>> root
>>>  |-- user_mentions: array (nullable = true)
>>>  ||-- element: struct (containsNull = true)
>>>  |||-- id: long (nullable = true)
>>>  |||-- id_str: string (nullable = true)
>>>  |||-- indices: array (nullable = true)
>>>  ||||-- element: long (containsNull = true)
>>>  |||-- name: string (nullable = true)
>>>  |||-- screen_name: string (nullable = true)
>>>
>>> *NOTE 1:* I know it is possible to solve this using HiveQL but I would
>>> like to use Data-frames once there is so much momentum around it.
>>>
>>> SELECT explode(entities.user_mentions) as mentions
>>> FROM tweets
>>>
>>> *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) =>
>>> value.isEmpty) is a ugly h

Re: How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames

2015-06-24 Thread Michael Armbrust
Starting in Spark 1.4 there is also an explode that you can use directly
from the select clause (much like in HiveQL):

import org.apache.spark.sql.functions._
df.select(explode($"entities.user_mentions").as("mention"))

Unlike standard HiveQL, you can also include other attributes in the select
or even $"*".


On Wed, Jun 24, 2015 at 8:34 AM, Yin Huai  wrote:

> The function accepted by explode is f: Row => TraversableOnce[A]. Seems
> user_mentions is an array of structs. So, can you change your
> pattern matching to the following?
>
> case Row(rows: Seq[_]) => rows.asInstanceOf[Seq[Row]].map(elem => ...)
>
> On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones <
> garjo...@socialmetrix.com> wrote:
>
>> Hi All,
>>
>> I am using the new *Apache Spark version 1.4.0 Data-frames API* to
>> extract information from Twitter's Status JSON, mostly focused on the 
>> Entities
>> Object  - the relevant
>> part to this question is showed below:
>>
>> {
>>   ...
>>   ...
>>   "entities": {
>> "hashtags": [],
>> "trends": [],
>> "urls": [],
>> "user_mentions": [
>>   {
>> "screen_name": "linobocchini",
>> "name": "Lino Bocchini",
>> "id": 187356243,
>> "id_str": "187356243",
>> "indices": [ 3, 16 ]
>>   },
>>   {
>> "screen_name": "jeanwyllys_real",
>> "name": "Jean Wyllys",
>> "id": 23176,
>> "id_str": "23176",
>> "indices": [ 79, 95 ]
>>   }
>> ],
>> "symbols": []
>>   },
>>   ...
>>   ...
>> }
>>
>> There are several examples on how extract information from primitives
>> types as string, integer, etc - but I couldn't find anything on how to
>> process those kind of *complex* structures.
>>
>> I tried the code below but it is still doesn't work, it throws an
>> Exception
>>
>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> val tweets = sqlContext.read.json("tweets.json")
>>
>> // this function is just to filter empty entities.user_mentions[] nodes
>> // some tweets doesn't contains any mentions
>> import org.apache.spark.sql.functions.udf
>> val isEmpty = udf((value: List[Any]) => value.isEmpty)
>>
>> import org.apache.spark.sql._
>> import sqlContext.implicits._
>> case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
>> String, screenName: String)
>>
>> val mentions = tweets.select("entities.user_mentions").
>>   filter(!isEmpty($"user_mentions")).
>>   explode($"user_mentions") {
>>   case Row(arr: Array[Row]) => arr.map { elem =>
>> UserMention(
>>   elem.getAs[Long]("id"),
>>   elem.getAs[String]("is_str"),
>>   elem.getAs[Array[Long]]("indices"),
>>   elem.getAs[String]("name"),
>>   elem.getAs[String]("screen_name"))
>>   }
>> }
>>
>> mentions.first
>>
>> Exception when I try to call mentions.first:
>>
>> scala> mentions.first
>> 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
>> scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
>> Bocchini,linobocchini], [23176,23176,List(79, 95),Jean 
>> Wyllys,jeanwyllys_real])] (of class 
>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>> at 
>> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
>> at 
>> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
>> at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
>> at 
>> org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
>>
>> What is wrong here? I understand it is related to the types but I
>> couldn't figure out it yet.
>>
>> As additional context, the structure mapped automatically is:
>>
>> scala> mentions.printSchema
>> root
>>  |-- user_mentions: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- id: long (nullable = true)
>>  |||-- id_str: string (nullable = true)
>>  |||-- indices: array (nullable = true)
>>  ||||-- element: long (containsNull = true)
>>  |||-- name: string (nullable = true)
>>  |||-- screen_name: string (nullable = true)
>>
>> *NOTE 1:* I know it is possible to solve this using HiveQL but I would
>> like to use Data-frames once there is so much momentum around it.
>>
>> SELECT explode(entities.user_mentions) as mentions
>> FROM tweets
>>
>> *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) =>
>> value.isEmpty) is a ugly hack and I'm missing something here, but was
>> the only way I came up to avoid a NPE
>>
>> I’ve posted the same question on SO:
>> http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames
>>
>> Thanks all!
>> - gustavo
>>
>>
>


Re: How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames

2015-06-24 Thread Yin Huai
The function accepted by explode is f: Row => TraversableOnce[A]. Seems
user_mentions is an array of structs. So, can you change your
pattern matching to the following?

case Row(rows: Seq[_]) => rows.asInstanceOf[Seq[Row]].map(elem => ...)

On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones 
wrote:

> Hi All,
>
> I am using the new *Apache Spark version 1.4.0 Data-frames API* to
> extract information from Twitter's Status JSON, mostly focused on the Entities
> Object  - the relevant
> part to this question is showed below:
>
> {
>   ...
>   ...
>   "entities": {
> "hashtags": [],
> "trends": [],
> "urls": [],
> "user_mentions": [
>   {
> "screen_name": "linobocchini",
> "name": "Lino Bocchini",
> "id": 187356243,
> "id_str": "187356243",
> "indices": [ 3, 16 ]
>   },
>   {
> "screen_name": "jeanwyllys_real",
> "name": "Jean Wyllys",
> "id": 23176,
> "id_str": "23176",
> "indices": [ 79, 95 ]
>   }
> ],
> "symbols": []
>   },
>   ...
>   ...
> }
>
> There are several examples on how extract information from primitives
> types as string, integer, etc - but I couldn't find anything on how to
> process those kind of *complex* structures.
>
> I tried the code below but it is still doesn't work, it throws an Exception
>
> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> val tweets = sqlContext.read.json("tweets.json")
>
> // this function is just to filter empty entities.user_mentions[] nodes
> // some tweets doesn't contains any mentions
> import org.apache.spark.sql.functions.udf
> val isEmpty = udf((value: List[Any]) => value.isEmpty)
>
> import org.apache.spark.sql._
> import sqlContext.implicits._
> case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
> String, screenName: String)
>
> val mentions = tweets.select("entities.user_mentions").
>   filter(!isEmpty($"user_mentions")).
>   explode($"user_mentions") {
>   case Row(arr: Array[Row]) => arr.map { elem =>
> UserMention(
>   elem.getAs[Long]("id"),
>   elem.getAs[String]("is_str"),
>   elem.getAs[Array[Long]]("indices"),
>   elem.getAs[String]("name"),
>   elem.getAs[String]("screen_name"))
>   }
> }
>
> mentions.first
>
> Exception when I try to call mentions.first:
>
> scala> mentions.first
> 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
> scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
> Bocchini,linobocchini], [23176,23176,List(79, 95),Jean 
> Wyllys,jeanwyllys_real])] (of class 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
> at 
> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at 
> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
> at 
> org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
>
> What is wrong here? I understand it is related to the types but I couldn't
> figure out it yet.
>
> As additional context, the structure mapped automatically is:
>
> scala> mentions.printSchema
> root
>  |-- user_mentions: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- id: long (nullable = true)
>  |||-- id_str: string (nullable = true)
>  |||-- indices: array (nullable = true)
>  ||||-- element: long (containsNull = true)
>  |||-- name: string (nullable = true)
>  |||-- screen_name: string (nullable = true)
>
> *NOTE 1:* I know it is possible to solve this using HiveQL but I would
> like to use Data-frames once there is so much momentum around it.
>
> SELECT explode(entities.user_mentions) as mentions
> FROM tweets
>
> *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) => value.isEmpty) is
> a ugly hack and I'm missing something here, but was the only way I came up
> to avoid a NPE
>
> I’ve posted the same question on SO:
> http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames
>
> Thanks all!
> - gustavo
>
>


How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames

2015-06-24 Thread Gustavo Arjones
Hi All,

I am using the new Apache Spark version 1.4.0 Data-frames API to extract 
information from Twitter's Status JSON, mostly focused on the Entities Object 
 - the relevant part to this 
question is showed below:

{
  ...
  ...
  "entities": {
"hashtags": [],
"trends": [],
"urls": [],
"user_mentions": [
  {
"screen_name": "linobocchini",
"name": "Lino Bocchini",
"id": 187356243,
"id_str": "187356243",
"indices": [ 3, 16 ]
  },
  {
"screen_name": "jeanwyllys_real",
"name": "Jean Wyllys",
"id": 23176,
"id_str": "23176",
"indices": [ 79, 95 ]
  }
],
"symbols": []
  },
  ...
  ...
}
There are several examples on how extract information from primitives types as 
string, integer, etc - but I couldn't find anything on how to process those 
kind of complex structures.

I tried the code below but it is still doesn't work, it throws an Exception

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

val tweets = sqlContext.read.json("tweets.json")

// this function is just to filter empty entities.user_mentions[] nodes
// some tweets doesn't contains any mentions
import org.apache.spark.sql.functions.udf
val isEmpty = udf((value: List[Any]) => value.isEmpty)

import org.apache.spark.sql._
import sqlContext.implicits._
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
String, screenName: String)

val mentions = tweets.select("entities.user_mentions").
  filter(!isEmpty($"user_mentions")).
  explode($"user_mentions") {
  case Row(arr: Array[Row]) => arr.map { elem =>
UserMention(
  elem.getAs[Long]("id"),
  elem.getAs[String]("is_str"),
  elem.getAs[Array[Long]]("indices"),
  elem.getAs[String]("name"),
  elem.getAs[String]("screen_name"))
  }
}

mentions.first
Exception when I try to call mentions.first:

scala> mentions.first
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
Bocchini,linobocchini], [23176,23176,List(79, 95),Jean 
Wyllys,jeanwyllys_real])] (of class 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
at 
org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
What is wrong here? I understand it is related to the types but I couldn't 
figure out it yet.

As additional context, the structure mapped automatically is:

scala> mentions.printSchema
root
 |-- user_mentions: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- id: long (nullable = true)
 |||-- id_str: string (nullable = true)
 |||-- indices: array (nullable = true)
 ||||-- element: long (containsNull = true)
 |||-- name: string (nullable = true)
 |||-- screen_name: string (nullable = true)
NOTE 1: I know it is possible to solve this using HiveQL but I would like to 
use Data-frames once there is so much momentum around it.

SELECT explode(entities.user_mentions) as mentions
FROM tweets
NOTE 2: the UDF val isEmpty = udf((value: List[Any]) => value.isEmpty) is a 
ugly hack and I'm missing something here, but was the only way I came up to 
avoid a NPE


I’ve posted the same question on SO: 
http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames

Thanks all!
- gustavo