I am accessing ElasticSearch via the elasticsearch-hadoop and attempting to expose it via SparkSQL. I am using spark 1.2.1, latest supported by elasticsearch-hadoop, and "org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT" of elasticsearch-hadoop. I’m encountering an issue when I attempt to query the following json after creating a temporary table from it. The json looks like this:
PUT /_template/device { "template": "dev*", "settings": { "number_of_shards": 1 }, "mappings": { "metric": { "_timestamp" : { "enabled" : true, "stored" : true, "path" : "timestamp", "format" : "yyyy-MM-dd'T'HH:mm:ssZZ" }, "properties": { "pathId": { "type": "string" }, "pathElements": { "properties": { "node": { "type": "string" }, "value": { "type": "string" } } }, "name": { "type": "string" }, "value": { "type": "double" }, "timestamp": { "type": "date", "store": true } } } } } Querying all columns work fine except for the pathElements which is a json array. If this is added to the select it fails with ajava.util.NoSuchElementException: key not found: node. *Details*. The program is pretty basic, looks like this: /** * A simple sample to read and write to ES using elasticsearch-hadoop. */ package com.opsdatastore.elasticsearch.spark import java.io.File // Scala imports import scala.collection.JavaConversions._ // Spark imports import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ // OpsDataStore import com.opsdatastore.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set("es.nodes", ElasticSearch.Nodes) conf.set("es.port", ElasticSearch.HttpPort.toString()) conf.set("es.index.auto.create", "true"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.executor.memory","1g") conf.set("spark.kryoserializer.buffer.mb","256") val sparkContext = new SparkContext(conf) sparkContext.addJar(Spark.JarPath + jar)) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis() // specific query, just read all for now sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}", "?q=*:*") /* * Read from ES and provide some insight with Spark & SparkSQL */ val esData = sc.esRDD("device/metric") esData.collect.foreach(println(_)) val end = System.currentTimeMillis() println(s"Total time: ${end-start} ms") println("Create Metric Temporary Table for querying") val schemaRDD = sqlContext.sql( "CREATE TEMPORARY TABLE metric " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'device/metric') " ) System.out.println("########################################") System.out.println("# Scheam Definition #") System.out.println("########################################") schemaRDD.printSchema() System.out.println("########################################") System.out.println("# Data from SparkSQL #") System.out.println("########################################") sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value FROM metric").collect.foreach(println(_)) } } So this works fine: sc.esRDD(*"*device/metric") esData.collect.foreach(println(_)) And results in this: 15/03/31 14:37:48 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67, took 4.948556 s (AUxxDrs4cgadF5SlaMg0,Map(pathElements -> Buffer(Map(node -> State, value -> PA), Map(node -> City, value -> Pittsburgh), Map(node -> Street, value -> 12345 Westbrook Drive), Map(node -> level, value -> main), Map(node -> device, value -> thermostat)), value -> 29.590943279257175, name -> Current Temperature, timestamp -> 2015-03-27T14:53:46+0000, path -> /PA/Pittsburgh/12345 Westbrook Drive/main/theromostat-1)) Yet this fails: sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value FROM metric").collect.foreach(println(_)) With this exception: Create Metric Temporary Table for querying######################################### Scheam Definition ######################################### root ######################################### Data from SparkSQL #########################################15/03/31 14:37:49 INFO BlockManager: Removing broadcast 015/03/31 14:37:49 INFO BlockManager: Removing block broadcast_015/03/31 14:37:49 INFO MemoryStore: Block broadcast_0 of size 1264 dropped from memory (free 278018576)15/03/31 14:37:49 INFO BlockManager: Removing block broadcast_0_piece015/03/31 14:37:49 INFO MemoryStore: Block broadcast_0_piece0 of size 864 dropped from memory (free 278019440)15/03/31 14:37:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.5:57820 in memory (size: 864.0 B, free: 265.1 MB)15/03/31 14:37:49 INFO BlockManagerMaster: Updated info of block broadcast_0_piece015/03/31 14:37:49 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.5:57834 in memory (size: 864.0 B, free: 530.0 MB)15/03/31 14:37:49 INFO ContextCleaner: Cleaned broadcast 015/03/31 14:37:49 INFO ScalaEsRowRDD: Reading from [device/metric]15/03/31 14:37:49 INFO ScalaEsRowRDD: Discovered mapping {device=[mappings=[metric=[name=STRING, path=STRING, pathElements=[node=STRING, value=STRING], pathId=STRING, timestamp=DATE, value=DOUBLE]]]} for [device/metric]15/03/31 14:37:49 INFO SparkContext: Starting job: collect at SparkPlan.scala:8415/03/31 14:37:49 INFO DAGScheduler: Got job 1 (collect at SparkPlan.scala:84) with 1 output partitions (allowLocal=false)15/03/31 14:37:49 INFO DAGScheduler: Final stage: Stage 1(collect at SparkPlan.scala:84)15/03/31 14:37:49 INFO DAGScheduler: Parents of final stage: List()15/03/31 14:37:49 INFO DAGScheduler: Missing parents: List()15/03/31 14:37:49 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[6] at map at SparkPlan.scala:84), which has no missing parents15/03/31 14:37:49 INFO MemoryStore: ensureFreeSpace(4120) called with curMem=0, maxMem=27801944015/03/31 14:37:49 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.0 KB, free 265.1 MB)15/03/31 14:37:49 INFO MemoryStore: ensureFreeSpace(2403) called with curMem=4120, maxMem=27801944015/03/31 14:37:49 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 265.1 MB)15/03/31 14:37:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.5:57820 (size: 2.3 KB, free: 265.1 MB)15/03/31 14:37:49 INFO BlockManagerMaster: Updated info of block broadcast_1_piece015/03/31 14:37:49 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:83815/03/31 14:37:49 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedRDD[6] at map at SparkPlan.scala:84)15/03/31 14:37:49 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks15/03/31 14:37:49 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.5:57836 (size: 2.3 KB, free: 530.0 MB)15/03/31 14:37:52 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 192.168.1.5): java.util.NoSuchElementException: key not found: node at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32) at org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9) at org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16) at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519) at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:560) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:522) at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519) at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:339) at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403) at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 2, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.5:57834 (size: 2.3 KB, free: 530.0 MB)15/03/31 14:37:52 INFO TaskSetManager: Lost task 0.1 in stage 1.0 (TID 2) on executor 192.168.1.5: java.util.NoSuchElementException (key not found: node) [duplicate 1]15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.2 in stage 1.0 (TID 3, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:52 INFO TaskSetManager: Lost task 0.2 in stage 1.0 (TID 3) on executor 192.168.1.5: java.util.NoSuchElementException (key not found: node) [duplicate 2]15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.3 in stage 1.0 (TID 4, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:52 INFO TaskSetManager: Lost task 0.3 in stage 1.0 (TID 4) on executor 192.168.1.5: java.util.NoSuchElementException (key not found: node) [duplicate 3]15/03/31 14:37:52 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job15/03/31 14:37:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool15/03/31 14:37:52 INFO TaskSchedulerImpl: Cancelling stage 115/03/31 14:37:52 INFO DAGScheduler: Job 1 failed: collect at SparkPlan.scala:84, took 3.028325 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 192.168.1.5): java.util.NoSuchElementException: key not found: node at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32) at org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9) at org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16) at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519) at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:560) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:522) at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519) at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:339) at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186) at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403) at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any insights into where I am off? I'm sure it is probably something small, just not seeing it yet. TIA for the assistance. -Todd