I am accessing ElasticSearch via the elasticsearch-hadoop and attempting to
expose it via SparkSQL. I am using spark 1.2.1, latest supported by
elasticsearch-hadoop, and "org.elasticsearch" % "elasticsearch-hadoop" %
"2.1.0.BUILD-SNAPSHOT" of elasticsearch-hadoop. I’m
encountering an issue when I attempt to query the following json after
creating a temporary table from it. The json looks like this:

PUT /_template/device
{
  "template": "dev*",
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "metric": {
      "_timestamp" : {
        "enabled" : true,
        "stored" : true,
        "path" : "timestamp",
        "format" : "yyyy-MM-dd'T'HH:mm:ssZZ"
      },
      "properties": {
        "pathId": {
          "type": "string"
        },
        "pathElements": {
          "properties": {
            "node": {
              "type": "string"
            },
            "value": {
              "type": "string"
            }
          }
        },
        "name": {
          "type": "string"
        },
        "value": {
          "type": "double"
        },
        "timestamp": {
          "type": "date",
          "store": true
        }
      }
    }
  }
}

Querying all columns work fine except for the pathElements which is a json
array. If this is added to the select it fails with
ajava.util.NoSuchElementException:
key not found: node.

*Details*.

The program is pretty basic, looks like this:

/**
 * A simple sample to read and write to ES using elasticsearch-hadoop.
 */

package com.opsdatastore.elasticsearch.spark

import java.io.File


// Scala imports
import scala.collection.JavaConversions._
// Spark imports
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}

// ES imports
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

// OpsDataStore
import com.opsdatastore.spark.utils.{Settings, Spark, ElasticSearch}

object ElasticSearchReadWrite {

  /**
   * Spark specific configuration
   */
  def sparkInit(): SparkContext = {
    val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
    conf.set("es.nodes", ElasticSearch.Nodes)
    conf.set("es.port", ElasticSearch.HttpPort.toString())
    conf.set("es.index.auto.create", "true");
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.executor.memory","1g")
    conf.set("spark.kryoserializer.buffer.mb","256")

    val sparkContext = new SparkContext(conf)
sparkContext.addJar(Spark.JarPath + jar))
    sparkContext
  }


  def main(args: Array[String]) {

    val sc = sparkInit

    val sqlContext = new SQLContext(sc)
    import sqlContext._

    val start = System.currentTimeMillis()

    // specific query, just read all for now
sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}", "?q=*:*")

    /*
     * Read from ES and provide some insight with Spark & SparkSQL
     */
    val esData = sc.esRDD("device/metric")

    esData.collect.foreach(println(_))

    val end = System.currentTimeMillis()
    println(s"Total time: ${end-start} ms")

    println("Create Metric Temporary Table for querying")
    val schemaRDD = sqlContext.sql(
          "CREATE TEMPORARY TABLE metric     " +
          "USING org.elasticsearch.spark.sql " +
          "OPTIONS (resource 'device/metric')  " )

    System.out.println("########################################")
    System.out.println("#      Scheam Definition               #")
    System.out.println("########################################")
        schemaRDD.printSchema()

    System.out.println("########################################")
    System.out.println("#      Data from SparkSQL              #")
    System.out.println("########################################")

    sqlContext.sql("SELECT path, pathElements, `timestamp`, name,
value FROM metric").collect.foreach(println(_))
  }
}

So this works fine:

sc.esRDD(*"*device/metric")
esData.collect.foreach(println(_))

And results in this:

15/03/31 14:37:48 INFO DAGScheduler: Job 0 finished: collect at
ElasticSearchReadWrite.scala:67, took 4.948556 s
(AUxxDrs4cgadF5SlaMg0,Map(pathElements -> Buffer(Map(node -> State,
value -> PA), Map(node -> City, value -> Pittsburgh), Map(node ->
Street, value -> 12345 Westbrook Drive), Map(node -> level, value ->
main), Map(node -> device, value -> thermostat)), value ->
29.590943279257175, name -> Current Temperature, timestamp ->
2015-03-27T14:53:46+0000, path -> /PA/Pittsburgh/12345 Westbrook
Drive/main/theromostat-1))

Yet this fails:

sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value
FROM metric").collect.foreach(println(_))

With this exception:

Create Metric Temporary Table for
querying#########################################      Scheam
Definition               #########################################
root
#########################################      Data from SparkSQL
        #########################################15/03/31 14:37:49
INFO BlockManager: Removing broadcast 015/03/31 14:37:49 INFO
BlockManager: Removing block broadcast_015/03/31 14:37:49 INFO
MemoryStore: Block broadcast_0 of size 1264 dropped from memory (free
278018576)15/03/31 14:37:49 INFO BlockManager: Removing block
broadcast_0_piece015/03/31 14:37:49 INFO MemoryStore: Block
broadcast_0_piece0 of size 864 dropped from memory (free
278019440)15/03/31 14:37:49 INFO BlockManagerInfo: Removed
broadcast_0_piece0 on 192.168.1.5:57820 in memory (size: 864.0 B,
free: 265.1 MB)15/03/31 14:37:49 INFO BlockManagerMaster: Updated info
of block broadcast_0_piece015/03/31 14:37:49 INFO BlockManagerInfo:
Removed broadcast_0_piece0 on 192.168.1.5:57834 in memory (size: 864.0
B, free: 530.0 MB)15/03/31 14:37:49 INFO ContextCleaner: Cleaned
broadcast 015/03/31 14:37:49 INFO ScalaEsRowRDD: Reading from
[device/metric]15/03/31 14:37:49 INFO ScalaEsRowRDD: Discovered
mapping {device=[mappings=[metric=[name=STRING, path=STRING,
pathElements=[node=STRING, value=STRING], pathId=STRING,
timestamp=DATE, value=DOUBLE]]]} for [device/metric]15/03/31 14:37:49
INFO SparkContext: Starting job: collect at SparkPlan.scala:8415/03/31
14:37:49 INFO DAGScheduler: Got job 1 (collect at SparkPlan.scala:84)
with 1 output partitions (allowLocal=false)15/03/31 14:37:49 INFO
DAGScheduler: Final stage: Stage 1(collect at
SparkPlan.scala:84)15/03/31 14:37:49 INFO DAGScheduler: Parents of
final stage: List()15/03/31 14:37:49 INFO DAGScheduler: Missing
parents: List()15/03/31 14:37:49 INFO DAGScheduler: Submitting Stage 1
(MappedRDD[6] at map at SparkPlan.scala:84), which has no missing
parents15/03/31 14:37:49 INFO MemoryStore: ensureFreeSpace(4120)
called with curMem=0, maxMem=27801944015/03/31 14:37:49 INFO
MemoryStore: Block broadcast_1 stored as values in memory (estimated
size 4.0 KB, free 265.1 MB)15/03/31 14:37:49 INFO MemoryStore:
ensureFreeSpace(2403) called with curMem=4120,
maxMem=27801944015/03/31 14:37:49 INFO MemoryStore: Block
broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB,
free 265.1 MB)15/03/31 14:37:49 INFO BlockManagerInfo: Added
broadcast_1_piece0 in memory on 192.168.1.5:57820 (size: 2.3 KB, free:
265.1 MB)15/03/31 14:37:49 INFO BlockManagerMaster: Updated info of
block broadcast_1_piece015/03/31 14:37:49 INFO SparkContext: Created
broadcast 1 from broadcast at DAGScheduler.scala:83815/03/31 14:37:49
INFO DAGScheduler: Submitting 1 missing tasks from Stage 1
(MappedRDD[6] at map at SparkPlan.scala:84)15/03/31 14:37:49 INFO
TaskSchedulerImpl: Adding task set 1.0 with 1 tasks15/03/31 14:37:49
INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1,
192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:50 INFO
BlockManagerInfo: Added broadcast_1_piece0 in memory on
192.168.1.5:57836 (size: 2.3 KB, free: 530.0 MB)15/03/31 14:37:52 WARN
TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 192.168.1.5):
java.util.NoSuchElementException: key not found: node
    at scala.collection.MapLike$class.default(MapLike.scala:228)
    at scala.collection.AbstractMap.default(Map.scala:58)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:58)
    at 
org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32)
    at 
org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9)
    at 
org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:560)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:522)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:339)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186)
    at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165)
    at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
    at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
    at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
    at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.1 in stage 1.0
(TID 2, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:52 INFO
BlockManagerInfo: Added broadcast_1_piece0 in memory on
192.168.1.5:57834 (size: 2.3 KB, free: 530.0 MB)15/03/31 14:37:52 INFO
TaskSetManager: Lost task 0.1 in stage 1.0 (TID 2) on executor
192.168.1.5: java.util.NoSuchElementException (key not found: node)
[duplicate 1]15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.2
in stage 1.0 (TID 3, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31
14:37:52 INFO TaskSetManager: Lost task 0.2 in stage 1.0 (TID 3) on
executor 192.168.1.5: java.util.NoSuchElementException (key not found:
node) [duplicate 2]15/03/31 14:37:52 INFO TaskSetManager: Starting
task 0.3 in stage 1.0 (TID 4, 192.168.1.5, NODE_LOCAL, 3731
bytes)15/03/31 14:37:52 INFO TaskSetManager: Lost task 0.3 in stage
1.0 (TID 4) on executor 192.168.1.5: java.util.NoSuchElementException
(key not found: node) [duplicate 3]15/03/31 14:37:52 ERROR
TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting
job15/03/31 14:37:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0,
whose tasks have all completed, from pool15/03/31 14:37:52 INFO
TaskSchedulerImpl: Cancelling stage 115/03/31 14:37:52 INFO
DAGScheduler: Job 1 failed: collect at SparkPlan.scala:84, took
3.028325 s
Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 1.0 (TID 4, 192.168.1.5):
java.util.NoSuchElementException: key not found: node
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:58)
        at 
org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32)
        at 
org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9)
        at 
org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:560)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:522)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:339)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186)
        at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165)
        at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
        at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
        at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Any insights into where I am off?  I'm sure it is probably something small,
just not seeing it yet.

TIA for the assistance.

-Todd

Reply via email to