It seem an issue with the ES connector
https://github.com/elastic/elasticsearch-hadoop/issues/482

Thanks
Best Regards

On Tue, Jul 28, 2015 at 6:14 AM, An Tran <tra...@gmail.com> wrote:

> Hello all,
>
> I am currently having an error with Spark SQL access Elasticsearch using
> Elasticsearch Spark integration.  Below is the series of command I issued
> along with the stacktrace.  I am unclear what the error could mean.  I can
> print the schema correctly but error out if i try and display a few
> results.  Can you guys point me in the right direction?
>
> scala>
> sqlContext.read.format("org.elasticsearch.spark.sql").options(esOptions).load("reddit_comment_public-201507-v3/default").registerTempTable("reddit_comment")
>
>
> scala> reddit_comment_df.printSchema
>
> root
>
>  |-- data: struct (nullable = true)
>
>  |    |-- archived: boolean (nullable = true)
>
>  |    |-- author: string (nullable = true)
>
>  |    |-- author_flair_css_class: string (nullable = true)
>
>  |    |-- author_flair_text: string (nullable = true)
>
>  |    |-- body: string (nullable = true)
>
>  |    |-- body_html: string (nullable = true)
>
>  |    |-- controversiality: long (nullable = true)
>
>  |    |-- created: long (nullable = true)
>
>  |    |-- created_utc: long (nullable = true)
>
>  |    |-- distinguished: string (nullable = true)
>
>  |    |-- downs: long (nullable = true)
>
>  |    |-- edited: long (nullable = true)
>
>  |    |-- gilded: long (nullable = true)
>
>  |    |-- id: string (nullable = true)
>
>  |    |-- link_author: string (nullable = true)
>
>  |    |-- link_id: string (nullable = true)
>
>  |    |-- link_title: string (nullable = true)
>
>  |    |-- link_url: string (nullable = true)
>
>  |    |-- name: string (nullable = true)
>
>  |    |-- parent_id: string (nullable = true)
>
>  |    |-- replies: string (nullable = true)
>
>  |    |-- saved: boolean (nullable = true)
>
>  |    |-- score: long (nullable = true)
>
>  |    |-- score_hidden: boolean (nullable = true)
>
>  |    |-- subreddit: string (nullable = true)
>
>  |    |-- subreddit_id: string (nullable = true)
>
>  |    |-- ups: long (nullable = true)
>
>
>
> scala> reddit_comment_df.show
>
> 15/07/27 20:38:31 INFO ScalaEsRowRDD: Reading from
> [reddit_comment_public-201507-v3/default]
>
> 15/07/27 20:38:31 INFO ScalaEsRowRDD: Discovered mapping
> {reddit_comment_public-201507-v3=[mappings=[default=[acquire_date=DATE,
> elasticsearch_date_partition_index=STRING,
> elasticsearch_language_partition_index=STRING, elasticsearch_type=STRING,
> source=[data=[archived=BOOLEAN, author=STRING,
> author_flair_css_class=STRING, author_flair_text=STRING, body=STRING,
> body_html=STRING, controversiality=LONG, created=LONG, created_utc=LONG,
> distinguished=STRING, downs=LONG, edited=LONG, gilded=LONG, id=STRING,
> link_author=STRING, link_id=STRING, link_title=STRING, link_url=STRING,
> name=STRING, parent_id=STRING, replies=STRING, saved=BOOLEAN, score=LONG,
> score_hidden=BOOLEAN, subreddit=STRING, subreddit_id=STRING, ups=LONG],
> kind=STRING], source_geo_location=GEO_POINT, source_id=STRING,
> source_language=STRING, source_time=DATE]]]} for
> [reddit_comment_public-201507-v3/default]
>
> 15/07/27 20:38:31 INFO SparkContext: Starting job: show at <console>:26
>
> 15/07/27 20:38:31 INFO DAGScheduler: Got job 13 (show at <console>:26)
> with 1 output partitions (allowLocal=false)
>
> 15/07/27 20:38:31 INFO DAGScheduler: Final stage: ResultStage 16(show at
> <console>:26)
>
> 15/07/27 20:38:31 INFO DAGScheduler: Parents of final stage: List()
>
> 15/07/27 20:38:31 INFO DAGScheduler: Missing parents: List()
>
> 15/07/27 20:38:31 INFO DAGScheduler: Submitting ResultStage 16
> (MapPartitionsRDD[65] at show at <console>:26), which has no missing parents
>
> 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(7520) called with
> curMem=71364, maxMem=2778778828
>
> 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13 stored as values in
> memory (estimated size 7.3 KB, free 2.6 GB)
>
> 15/07/27 20:38:31 INFO MemoryStore: ensureFreeSpace(3804) called with
> curMem=78884, maxMem=2778778828
>
> 15/07/27 20:38:31 INFO MemoryStore: Block broadcast_13_piece0 stored as
> bytes in memory (estimated size 3.7 KB, free 2.6 GB)
>
> 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in
> memory on 172.25.185.239:58296 (size: 3.7 KB, free: 2.6 GB)
>
> 15/07/27 20:38:31 INFO SparkContext: Created broadcast 13 from broadcast
> at DAGScheduler.scala:874
>
> 15/07/27 20:38:31 INFO DAGScheduler: Submitting 1 missing tasks from
> ResultStage 16 (MapPartitionsRDD[65] at show at <console>:26)
>
> 15/07/27 20:38:31 INFO TaskSchedulerImpl: Adding task set 16.0 with 1 tasks
>
> 15/07/27 20:38:31 INFO FairSchedulableBuilder: Added task set TaskSet_16
> tasks to pool default
>
> 15/07/27 20:38:31 INFO TaskSetManager: Starting task 0.0 in stage 16.0
> (TID 172, 172.25.185.164, ANY, 5085 bytes)
>
> 15/07/27 20:38:31 INFO BlockManagerInfo: Added broadcast_13_piece0 in
> memory on 172.25.185.164:50275 (size: 3.7 KB, free: 3.6 GB)
>
> 15/07/27 20:38:31 WARN TaskSetManager: Lost task 0.0 in stage 16.0 (TID
> 172, 172.25.185.164): java.lang.ArrayIndexOutOfBoundsException: -1
>
> at
> scala.collection.mutable.ResizableArray$class.update(ResizableArray.scala:49)
>
> at scala.collection.mutable.ArrayBuffer.update(ArrayBuffer.scala:47)
>
> at
> org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:29)
>
> at
> org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaEsRowValueReader.scala:13)
>
> at
> org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaEsRowValueReader.scala:39)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
>
> at
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
>
> at
> org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
>
> at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
>
> at
> org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>

Reply via email to