I ran 2 different spark 1.5 clusters that have been running for more than a
day now. I do see jobs getting aborted due to task retry's maxing out
(default 4) due to ConnectionException. It seems like the executors die and
get restarted and I was unable to find the root cause (same app code and
conf used on spark 1.4.1 I don't see ConnectionException).

Another question related to this, what happens to the kinesis records
received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I
am using) does the job gets resubmitted with the same received records? Or
does the kinesis-asl library get those records again based on sequence
numbers it tracks? It would good for me to understand the story around
lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when
jobs are aborted. Any pointers or quick explanation would be very helpful.


On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com> wrote:

> Is this happening too often? Is it slowing things down or blocking
> progress. Failures once in a while is part of the norm, and the system
> should take care of itself.
>
> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <sparknewbie1...@gmail.com>
> wrote:
>
>> Hi Spark users,
>>
>> I'm seeing the below exception in my spark streaming application. It
>> happens in the first stage where the kinesis receivers receive records and
>> perform a flatMap operation on the unioned Dstream. A coalesce step also
>> happens as a part of that stage for optimizing the performance.
>>
>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I
>> look at the executor logs I do not see any exceptions indicating the root
>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did
>> that service go down.
>>
>> Any help debugging this problem will be helpful.
>>
>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while
>> beginning fetch of 1 outstanding blocks
>> java.io.IOException: Failed to connect to
>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
>>         at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>         at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>         at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>         at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>         at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>>         at
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
>>         at
>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
>>         at
>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
>>         at
>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593)
>>         at
>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579)
>>         at
>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623)
>>         at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>         at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
>>         at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
>>         at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>         at scala.collection.immutable.List.foreach(List.scala:318)
>>         at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>         at
>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>         at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.net.ConnectException: Connection refused:
>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
>>
>> Thanks,
>> Bharath
>>
>>
>

Reply via email to