RE: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread java8964
You can use the HiveContext instead of SQLContext, which should support all the 
HiveQL, including lateral view explode.
SQLContext is not supporting that yet.
BTW, nice coding format in the email.
Yong

Date: Tue, 31 Mar 2015 18:18:19 -0400
Subject: Re: SparkSql - java.util.NoSuchElementException: key not found: node 
when access JSON Array
From: tsind...@gmail.com
To: user@spark.apache.org

So in looking at this a bit more, I gather the root cause is the fact that the 
nested fields are represented as rows within rows, is that correct?  If I don't 
know the size of the json array (it varies), using x.getAs[Row](0).getString(0) 
is not really a valid solution.  
Is the solution to apply a lateral view + explode to this? 
I have attempted to change to a lateral view, but looks like my syntax is off:








sqlContext.sql(
"SELECT path,`timestamp`, name, value, pe.value FROM metric 
 lateral view explode(pathElements) a AS pe")
.collect.foreach(println(_))
Which results in:
15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0
Exception in thread "main" java.lang.RuntimeException: [1.68] failure: 
``UNION'' expected but identifier view found

SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view 
explode(pathElements) a AS pe
   ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Is this the 
right approach?  Is this syntax available in 1.2.1:
SELECT
  v1.name, v2.city, v2.state 
FROM people
  LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1 
 as name, address
  LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2
 as city, state;
-Todd
On Tue, Mar 31, 2015 at 3:26 PM, Todd

Re: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread Todd Nist
So in looking at this a bit more, I gather the root cause is the fact that
the nested fields are represented as rows within rows, is that correct?  If
I don't know the size of the json array (it varies), using
x.getAs[Row](0).getString(0) is not really a valid solution.

Is the solution to apply a lateral view + explode to this?

I have attempted to change to a lateral view, but looks like my syntax is
off:

sqlContext.sql(
"SELECT path,`timestamp`, name, value, pe.value FROM metric
 lateral view explode(pathElements) a AS pe")
.collect.foreach(println(_))
Which results in:

15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0
Exception in thread "main" java.lang.RuntimeException: [1.68] failure:
``UNION'' expected but identifier view found

SELECT path,`timestamp`, name, value, pe.value FROM metric lateral
view explode(pathElements) a AS pe
   ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Is this the right approach?  Is this syntax available in 1.2.1:

SELECT
  v1.name, v2.city, v2.state
FROM people
  LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1
 as name, address
  LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2
 as city, state;


-Todd

On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist  wrote:

> I am accessing ElasticSearch via the elasticsearch-hadoop and attempting
> to expose it via SparkSQL. I am using spark 1.2.1, latest supported by
> elasticsearch-hadoop, and "org.elasticsearch" % "elasticsearch-hadoop" %
> "2.1.0.BUILD-SNAPSHOT" of elasticsearch-hadoop. I’m
> encountering an issue when I attempt to query the following json after
> creating a temporary table from it. The json looks like this:
>
> PUT /_template/device
> {
>   "template": "dev*",
>

SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread Todd Nist
I am accessing ElasticSearch via the elasticsearch-hadoop and attempting to
expose it via SparkSQL. I am using spark 1.2.1, latest supported by
elasticsearch-hadoop, and "org.elasticsearch" % "elasticsearch-hadoop" %
"2.1.0.BUILD-SNAPSHOT" of elasticsearch-hadoop. I’m
encountering an issue when I attempt to query the following json after
creating a temporary table from it. The json looks like this:

PUT /_template/device
{
  "template": "dev*",
  "settings": {
"number_of_shards": 1
  },
  "mappings": {
"metric": {
  "_timestamp" : {
"enabled" : true,
"stored" : true,
"path" : "timestamp",
"format" : "-MM-dd'T'HH:mm:ssZZ"
  },
  "properties": {
"pathId": {
  "type": "string"
},
"pathElements": {
  "properties": {
"node": {
  "type": "string"
},
"value": {
  "type": "string"
}
  }
},
"name": {
  "type": "string"
},
"value": {
  "type": "double"
},
"timestamp": {
  "type": "date",
  "store": true
}
  }
}
  }
}

Querying all columns work fine except for the pathElements which is a json
array. If this is added to the select it fails with
ajava.util.NoSuchElementException:
key not found: node.

*Details*.

The program is pretty basic, looks like this:

/**
 * A simple sample to read and write to ES using elasticsearch-hadoop.
 */

package com.opsdatastore.elasticsearch.spark

import java.io.File


// Scala imports
import scala.collection.JavaConversions._
// Spark imports
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}

// ES imports
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

// OpsDataStore
import com.opsdatastore.spark.utils.{Settings, Spark, ElasticSearch}

object ElasticSearchReadWrite {

  /**
   * Spark specific configuration
   */
  def sparkInit(): SparkContext = {
val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
conf.set("es.nodes", ElasticSearch.Nodes)
conf.set("es.port", ElasticSearch.HttpPort.toString())
conf.set("es.index.auto.create", "true");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.executor.memory","1g")
conf.set("spark.kryoserializer.buffer.mb","256")

val sparkContext = new SparkContext(conf)
sparkContext.addJar(Spark.JarPath + jar))
sparkContext
  }


  def main(args: Array[String]) {

val sc = sparkInit

val sqlContext = new SQLContext(sc)
import sqlContext._

val start = System.currentTimeMillis()

// specific query, just read all for now
sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}", "?q=*:*")

/*
 * Read from ES and provide some insight with Spark & SparkSQL
 */
val esData = sc.esRDD("device/metric")

esData.collect.foreach(println(_))

val end = System.currentTimeMillis()
println(s"Total time: ${end-start} ms")

println("Create Metric Temporary Table for querying")
val schemaRDD = sqlContext.sql(
  "CREATE TEMPORARY TABLE metric " +
  "USING org.elasticsearch.spark.sql " +
  "OPTIONS (resource 'device/metric')  " )

System.out.println("")
System.out.println("#  Scheam Definition   #")
System.out.println("")
schemaRDD.printSchema()

System.out.println("")
System.out.println("#  Data from SparkSQL  #")
System.out.println("")

sqlContext.sql("SELECT path, pathElements, `timestamp`, name,
value FROM metric").collect.foreach(println(_))
  }
}

So this works fine:

sc.esRDD(*"*device/metric")
esData.collect.foreach(println(_))

And results in this:

15/03/31 14:37:48 INFO DAGScheduler: Job 0 finished: collect at
ElasticSearchReadWrite.scala:67, took 4.948556 s
(AUxxDrs4cgadF5SlaMg0,Map(pathElements -> Buffer(Map(node -> State,
value -> PA), Map(node -> City, value -> Pittsburgh), Map(node ->
Street, value -> 12345 Westbrook Drive), Map(node -> level, value ->
main), Map(node -> device, value -> thermostat)), value ->
29.590943279257175, name -> Current Temperature, timestamp ->
2015-03-27T14:53:46+, path -> /PA/Pittsburgh/12345 Westbrook
Drive/main/theromostat-1))

Yet this fails:

sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value
FROM metric").collect.foreach(println(_))

With this exception:

Create Metric Temporary Table for
querying#  Scheam
Definition   #
root
#  Data from SparkSQL