I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.:
15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set("es.nodes", ElasticSearch.Nodes) conf.set("es.port", ElasticSearch.HttpPort.toString()) conf.set("es.index.auto.create", "true"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.executor.memory","1g") conf.set("spark.kryoserializer.buffer.mb","256") val sparkContext = new SparkContext(conf) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis() /* * Read from ES and query with with Spark & SparkSQL */ val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}") esData.collect.foreach(println(_)) val end = System.currentTimeMillis() println(s"Total time: ${end-start} ms") This works fine and and prints the content of esData out as one would expect. 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67, took 6.897443 s (4,Map(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> Flores, email -> rodriquezflo...@tourmania.com, firstname -> Rodriquez, account_number -> 4)) (9,Map(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, state -> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, email -> opalmead...@cedward.com, firstname -> Opal, account_number -> 9))... As does creating a new index and type like this: println("read json in and store in ES") // read in JSON and store in ES val path = "document.json" val rdd : SchemaRDD = sqlContext.jsonFile(path) rdd.saveToEs("myIndex/myDoc") However, when I attempt to access the the table via the sqlContext like this I get the exception shown above: println("Create Temporary Table for querying") val schemaRDD: SchemaRDD = sqlContext.sql( "CREATE TEMPORARY TABLE account " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'bank/account') " ) } } I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the elasticsearch-hadoop: "org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT" Any insight on what I am doing wrong? TIA for the assistance. -Todd