Hi
I need help fixing a time out exception thrown from ElasticSearch Hadoop. The 
ES cluster is up all the time.
I use ElasticSearch Hadoop to read data from ES into RDDs. I get a collection 
of these RDD which I traverse (with foreachRDD) and create more RDDs from each 
one RDD in the collection. The resulting RDDs I put in a Queue from which I 
create a DStream.
After about 10 minutes of running, the program's debug output hangs for a bit 
then throws:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out

This is the output:
.... [data from elastic search like the next line (in green)]
13:55:26.620 [Executor task launch worker-0] DEBUG httpclient.wire.content - << 
"toyota sprinter","toyota crown","toyota tundra","toyota prius","toyota 
aa","toyota stout","toyota camry","toyota vista","toyota","toyota 
classic","toyota sprinter","toyota crown","toyota tundra","toyota 
prius","toyota aa","toyota stout","toyota camry","toyota 
vista","toyota","toyota classic","toyota sprinter","toyota crown","toyota 
tundra","toyota prius","toyota aa","toyota stout","toyota camry","toyota 
vista","toyota","toyota classic","toyota sprinter","toyota crown","toyota 
tundra","toyota prius","toyota aa","toyota stout","toyota camry","toyota 
vista"],"timestamp":[1373976139000],"links.entity.rank":[0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9]}}]}}"
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Resorting to protocol version default close 
connection policy
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpMethodBase - Should NOT close connection, using HTTP/1.1
13:55:26.620 [Executor task launch worker-0] DEBUG 
o.a.c.httpclient.HttpConnection - Releasing connection back to connection 
manager.
13:55:26.631 [Executor task launch worker-0] ERROR 
org.apache.spark.executor.Executor - Exception in task 1.0 in stage 4.0 (TID 10)
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
                at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) 
~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[scala-library.jar:na]
                at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
~[scala-library.jar:na]
                at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
                at 
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
                at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
 ~[spark-core_2.10-1.1.0.jar:1.1.0]
                at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
                at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
                at org.apache.spark.scheduler.Task.run(Task.scala:54) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
~[spark-core_2.10-1.1.0.jar:1.1.0]
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_75]
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_75]
                at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
Caused by: java.net.SocketTimeoutException: Read timed out
                at java.net.SocketInputStream.socketRead0(Native Method) 
~[na:1.7.0_75]
                at java.net.SocketInputStream.read(SocketInputStream.java:152) 
~[na:1.7.0_75]
                at java.net.SocketInputStream.read(SocketInputStream.java:122) 
~[na:1.7.0_75]
                at 
java.io.BufferedInputStream.read1(BufferedInputStream.java:273) ~[na:1.7.0_75]
                at 
java.io.BufferedInputStream.read(BufferedInputStream.java:334) ~[na:1.7.0_75]
                at 
org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
 ~[commons-httpclient-3.1.jar:na]
                at 
org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
 ~[commons-httpclient-3.1.jar:na]
                at java.io.FilterInputStream.read(FilterInputStream.java:133) 
~[na:1.7.0_75]
                at 
org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
 ~[commons-httpclient-3.1.jar:na]
                at 
org.elasticsearch.hadoop.rest.DelegatingInputStream.read(DelegatingInputStream.java:57)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                at 
org.codehaus.jackson.impl.Utf8StreamParser.loadMore(Utf8StreamParser.java:172) 
~[jackson-core-asl-1.9.11.jar:1.9.11]
                at 
org.codehaus.jackson.impl.Utf8StreamParser._skipWSOrEnd(Utf8StreamParser.java:2309)
 ~[jackson-core-asl-1.9.11.jar:1.9.11]
                at 
org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:444) 
~[jackson-core-asl-1.9.11.jar:1.9.11]
                at 
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:84)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]
                ... 22 common frames omitted

13:55:26.639 [sparkDriver-akka.actor.default-dispatcher-5] DEBUG 
o.a.spark.scheduler.local.LocalActor - [actor] received message 
StatusUpdate(10,FAILED,java.nio.HeapByteBuffer[pos=0 lim=2763 cap=2763]) from 
Actor[akka://sparkDriver/deadLetters]
13:55:26.640 [sparkDriver-akka.actor.default-dispatcher-5] DEBUG 
o.a.s.scheduler.TaskSchedulerImpl - parentName: , name: TaskSet_4, 
runningTasks: 0
13:55:26.641 [sparkDriver-akka.actor.default-dispatcher-5] DEBUG 
o.a.spark.scheduler.local.LocalActor - [actor] handled message (1.77943 ms) 
StatusUpdate(10,FAILED,java.nio.HeapByteBuffer[pos=0 lim=2763 cap=2763]) from 
Actor[akka://sparkDriver/deadLetters]
13:55:26.644 [Result resolver thread-3] WARN  
o.a.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 4.0 (TID 10, 
localhost): 
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
java.net.SocketTimeoutException: Read timed out
        
org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
        
org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:245)
        
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:203)
        
org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:277)
        
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:200)
        
org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:156)
        
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102)
        
org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81)
        
org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314)
        org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
        
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
        scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
        org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
        
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

Reply via email to