I am attempting to access ElasticSearch and expose it’s data through
SparkSQL using the elasticsearch-hadoop project.  I am encountering the
following exception when trying to create a Temporary table from a resource
in ElasticSearch.:

15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at
EsSparkSQL.scala:51, took 0.862184 s
Create Temporary Table for querying
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V
at 
org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)
at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
at 
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)
at 
org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)
at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)
at 
org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)
at 
org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)
at 
org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)
at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)
at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)
at 
io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

I have loaded the “accounts.json” file from ElasticSearch into my
ElasticSearch cluster. The mapping looks as follows:

radtech:elastic-search-work tnist$ curl -XGET
'http://localhost:9200/bank/_mapping'
{"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}

I can read the data just fine doing the following:

import java.io.File

import scala.collection.JavaConversions._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SchemaRDD,SQLContext}

// ES imports
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}

object ElasticSearchReadWrite {

  /**
   * Spark specific configuration
   */
  def sparkInit(): SparkContext = {
    val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
    conf.set("es.nodes", ElasticSearch.Nodes)
    conf.set("es.port", ElasticSearch.HttpPort.toString())
    conf.set("es.index.auto.create", "true");
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.executor.memory","1g")
    conf.set("spark.kryoserializer.buffer.mb","256")

    val sparkContext = new SparkContext(conf)

    sparkContext
  }

  def main(args: Array[String]) {

    val sc = sparkInit

    val sqlContext = new SQLContext(sc)
    import sqlContext._

    val start = System.currentTimeMillis()

    /*
     * Read from ES and query with with Spark & SparkSQL
     */
    val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")

    esData.collect.foreach(println(_))

    val end = System.currentTimeMillis()
    println(s"Total time: ${end-start} ms")

This works fine and and prints the content of esData out as one would
expect.

15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at
ElasticSearchReadWrite.scala:67, took 6.897443 s
(4,Map(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff
Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F,
lastname -> Flores, email -> rodriquezflo...@tourmania.com, firstname
-> Rodriquez, account_number -> 4))
(9,Map(employer -> Cedward, city -> Olney, address -> 963 Neptune
Avenue, state -> OH, balance -> 24776, age -> 39, gender -> M,
lastname -> Meadows, email -> opalmead...@cedward.com, firstname ->
Opal, account_number -> 9))...

As does creating a new index and type like this:

    println("read json in and store in ES")
    // read in JSON and store in ES
    val path = "document.json"
    val rdd : SchemaRDD = sqlContext.jsonFile(path)

    rdd.saveToEs("myIndex/myDoc")

However, when I attempt to access the the table via the sqlContext like
this I get the exception shown above:

    println("Create Temporary Table for querying")

    val schemaRDD: SchemaRDD = sqlContext.sql(
          "CREATE TEMPORARY TABLE account    " +
          "USING org.elasticsearch.spark.sql " +
          "OPTIONS (resource 'bank/account')  " )
  }
}

I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the
elasticsearch-hadoop:

"org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"

Any insight on what I am doing wrong?

TIA for the assistance.

-Todd

Reply via email to