Is it slowing things down or blocking progress.
>> I didn't see slowing of processing, but I do see jobs aborted
consecutively for a period of 18 batches (5 minute batch intervals). So I
am worried about what happened to the records that these jobs were
processing.
Also, one more thing to mention is that the
StreamingListenerBatchCompleted.numRecords information shows all received
records as processed even if the batch/job failed. The processing time as
well shows as the same time it takes for a successful batch.
It seems like it is the numRecords which was the input to the batch
regardless of whether they were successfully processed or not.

On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie <sparknewbie1...@gmail.com>
wrote:

> I ran 2 different spark 1.5 clusters that have been running for more than
> a day now. I do see jobs getting aborted due to task retry's maxing out
> (default 4) due to ConnectionException. It seems like the executors die and
> get restarted and I was unable to find the root cause (same app code and
> conf used on spark 1.4.1 I don't see ConnectionException).
>
> Another question related to this, what happens to the kinesis records
> received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I
> am using) does the job gets resubmitted with the same received records? Or
> does the kinesis-asl library get those records again based on sequence
> numbers it tracks? It would good for me to understand the story around
> lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when
> jobs are aborted. Any pointers or quick explanation would be very helpful.
>
>
> On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Is this happening too often? Is it slowing things down or blocking
>> progress. Failures once in a while is part of the norm, and the system
>> should take care of itself.
>>
>> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <sparknewbie1...@gmail.com>
>> wrote:
>>
>>> Hi Spark users,
>>>
>>> I'm seeing the below exception in my spark streaming application. It
>>> happens in the first stage where the kinesis receivers receive records and
>>> perform a flatMap operation on the unioned Dstream. A coalesce step also
>>> happens as a part of that stage for optimizing the performance.
>>>
>>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I
>>> look at the executor logs I do not see any exceptions indicating the root
>>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did
>>> that service go down.
>>>
>>> Any help debugging this problem will be helpful.
>>>
>>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while
>>> beginning fetch of 1 outstanding blocks
>>> java.io.IOException: Failed to connect to
>>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
>>>         at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>>         at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>>         at
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>>         at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>>         at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>>>         at
>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
>>>         at
>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
>>>         at
>>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
>>>         at
>>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
>>>         at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>         at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>         at
>>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593)
>>>         at
>>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579)
>>>         at
>>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623)
>>>         at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>>         at
>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
>>>         at
>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
>>>         at
>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>         at scala.collection.immutable.List.foreach(List.scala:318)
>>>         at
>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>         at
>>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>         at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.net.ConnectException: Connection refused:
>>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
>>>
>>> Thanks,
>>> Bharath
>>>
>>>
>>
>

Reply via email to