Seems the elasticsearch-hadoop project was built with an old version of Spark, 
and then you upgraded the Spark version in execution env, as I know the 
StructField changed the definition in Spark 1.2, can you confirm the version 
problem first?

From: Todd Nist [mailto:tsind...@gmail.com]
Sent: Thursday, March 19, 2015 7:49 AM
To: user@spark.apache.org
Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table



I am attempting to access ElasticSearch and expose it’s data through SparkSQL 
using the elasticsearch-hadoop project.  I am encountering the following 
exception when trying to create a Temporary table from a resource in 
ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at 
EsSparkSQL.scala:51, took 0.862184 s

Create Temporary Table for querying

Exception in thread "main" java.lang.NoSuchMethodError: 
org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V

at 
org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)

at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at 
org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)

at 
org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)

at 
org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)

at 
org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)

at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)

at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)

at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)

at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)

at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)

at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch 
cluster. The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET 
'http://localhost:9200/bank/_mapping'

{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}

I can read the data just fine doing the following:

import java.io.File



import scala.collection.JavaConversions._



import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}



// ES imports

import org.elasticsearch.spark._

import org.elasticsearch.spark.sql._



import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}



object ElasticSearchReadWrite {



  /**

   * Spark specific configuration

   */

  def sparkInit(): SparkContext = {

    val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)

    conf.set("es.nodes", ElasticSearch.Nodes)

    conf.set("es.port", ElasticSearch.HttpPort.toString())

    conf.set("es.index.auto.create", "true");

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

    conf.set("spark.executor.memory","1g")

    conf.set("spark.kryoserializer.buffer.mb","256")



    val sparkContext = new SparkContext(conf)



    sparkContext

  }



  def main(args: Array[String]) {



    val sc = sparkInit



    val sqlContext = new SQLContext(sc)

    import sqlContext._



    val start = System.currentTimeMillis()



    /*

     * Read from ES and query with with Spark & SparkSQL

     */

    val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")



    esData.collect.foreach(println(_))



    val end = System.currentTimeMillis()

    println(s"Total time: ${end-start} ms")

This works fine and and prints the content of esData out as one would expect.

15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at 
ElasticSearchReadWrite.scala:67, took 6.897443 s

(4,Map(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff Avenue, 
state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> Flores, 
email -> rodriquezflo...@tourmania.com<mailto:rodriquezflo...@tourmania.com>, 
firstname -> Rodriquez, account_number -> 4))

(9,Map(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, state 
-> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, email -> 
opalmead...@cedward.com<mailto:opalmead...@cedward.com>, firstname -> Opal, 
account_number -> 9))

...

As does creating a new index and type like this:

    println("read json in and store in ES")

    // read in JSON and store in ES

    val path = "document.json"

    val rdd : SchemaRDD = sqlContext.jsonFile(path)



    rdd.saveToEs("myIndex/myDoc")

However, when I attempt to access the the table via the sqlContext like this I 
get the exception shown above:

    println("Create Temporary Table for querying")



    val schemaRDD: SchemaRDD = sqlContext.sql(

          "CREATE TEMPORARY TABLE account    " +

          "USING org.elasticsearch.spark.sql " +

          "OPTIONS (resource 'bank/account')  " )

  }

}

I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the 
elasticsearch-hadoop:

"org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"

Any insight on what I am doing wrong?

TIA for the assistance.
[https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]

-Todd

Reply via email to