Re: Blacklisting in Spark Stateful Structured Streaming

2020-11-20 Thread Eric Beabes
Yes I agree that blacklisting structure can be put in the user-defined
state but still the state would remain open for a long time, right? Am I
misunderstanding something?

I like the idea of blacklisting in a "Broadcast" variable but I can't
figure out how to use the "Broadcast" variable in the 'mapGroupWithState'
function. For example, I've this code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
  updateAcrossEvents
)

and updateAcrossEvents is defined as:

def updateAcrossEvents(tuple5: (String, String, String, String,
String), inputs: Iterator[MyObject], oldState: GroupState[MyState])


How do I pass a "Broadcast Variable" into the 'updateAcrossEvents'
method? Please advise. Thanks.



On Mon, Nov 16, 2020 at 3:40 AM Yuanjian Li  wrote:

> If you use the `flatMap/mapGroupsWithState` API for a "stateful" SS job,
> the blacklisting structure can be put into the user-defined state.
> To use a 3rd-party cache should also be a good choice.
>
> Eric Beabes  于2020年11月11日周三 上午6:54写道:
>
>> Currently we’ve a “Stateful” Spark Structured Streaming job that computes
>> aggregates for each ID. I need to implement a new requirement which says
>> that if the no. of incoming messages for a particular ID exceeds a certain
>> value then add this ID to a blacklist & remove the state for it. Going
>> forward for any ID that’s blacklisted we will not create a state for it.
>> The message will simply get filtered out if the ID is blacklisted.
>>
>> What’s the best way to implement this in Spark Structured Streaming?
>> Essentially what we need to do is create a Distributed HashSet that gets
>> updated intermittently & make this HashSet available to all Executors so
>> that they can filter out unwanted messages.
>>
>> Any pointers would be greatly appreciated. Is the only option to use a
>> 3rdparty Distributed Cache tool such as EhCache, Redis etc?
>>
>>
>>
>>


Re: Spark Exception

2020-11-20 Thread Russell Spitzer
The general exceptions here mean that components within the Spark cluster
can't communicate. The most common cause for this is failures of the
processors that are supposed to be communicating. I generally see this when
one of the processes goes into a GC storm or is shut down because of an
exception or something.

On Fri, Nov 20, 2020 at 10:52 AM Amit Sharma  wrote:

> Russell i increased the rpc timeout to 240 seconds but i am still getting
> this issue once a while and after this issue my spark streaming job stuck
> and do not process any request then i need to restart this every time. Any
> suggestion please.
>
>
> Thanks
> Amit
>
> On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma  wrote:
>
>> Hi, we are running a spark streaming  job and sometimes it throws below
>> two exceptions . I am not understanding  what is the difference between
>> these two exception for one timeout is 120 seconds and another is 600
>> seconds. What could be the reason for these
>>
>>
>>  Error running job streaming job 1605709968000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures
>> timed out after [120 seconds]. This timeout is controlled by
>> spark.rpc.askTimeout
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>> at org.apache.spark.rpc.RpcTimeout.org
>> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
>> at
>> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
>> at
>> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
>> at
>> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76)
>> at org.apache.spark.storage.BlockManager.org
>> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466)
>> at org.apache.spark.storage.BlockManager.org
>> $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445)
>> at
>> org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519)
>> at
>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047)
>>
>>
>>
>>
>>
>> 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread
>> heartbeat-receiver-event-loop-thread
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600
>> seconds]. This timeout is controlled by BlockManagerHeartbeat
>> at org.apache.spark.rpc.RpcTimeout.org
>> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
>> at
>> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
>> at
>> org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251)
>> at
>> org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455)
>> at
>> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
>> at
>> org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
>> at
>> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>


Re: Spark Exception

2020-11-20 Thread Amit Sharma
Russell i increased the rpc timeout to 240 seconds but i am still getting
this issue once a while and after this issue my spark streaming job stuck
and do not process any request then i need to restart this every time. Any
suggestion please.


Thanks
Amit

On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma  wrote:

> Hi, we are running a spark streaming  job and sometimes it throws below
> two exceptions . I am not understanding  what is the difference between
> these two exception for one timeout is 120 seconds and another is 600
> seconds. What could be the reason for these
>
>
>  Error running job streaming job 1605709968000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures
> timed out after [120 seconds]. This timeout is controlled by
> spark.rpc.askTimeout
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
> at
> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445)
> at
> org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519)
> at
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047)
>
>
>
>
>
> 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread
> heartbeat-receiver-event-loop-thread
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600
> seconds]. This timeout is controlled by BlockManagerHeartbeat
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at
> org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455)
> at
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
> at
> org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
> at
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>


Re: Cannot perform operation after producer has been closed

2020-11-20 Thread Gabor Somogyi
Happy that saved some time for you :)
We've invested quite an effort in the latest releases into streaming and
hope there will be less and less headaches like this.

On Thu, Nov 19, 2020 at 5:55 PM Eric Beabes 
wrote:

> THANK YOU SO MUCH! Will try it out & revert.
>
> On Thu, Nov 19, 2020 at 8:18 AM Gabor Somogyi 
> wrote:
>
>> "spark.kafka.producer.cache.timeout" is available since 2.2.1 which can
>> be increased as a temporary workaround.
>> This is not super elegant but works which gives enough time to migrate to
>> Spark 3.
>>
>>
>> On Wed, Nov 18, 2020 at 11:12 PM Eric Beabes 
>> wrote:
>>
>>> I must say.. *Spark has let me down in this case*. I am surprised an
>>> important issue like this hasn't been fixed in Spark 2.4.
>>>
>>> I am fighting a battle of 'Spark Structured Streaming' Vs 'Flink' at
>>> work & now because Spark 2.4 can't handle this *I've been asked to
>>> rewrite the code in Flink*.
>>>
>>> Moving to Spark 3.0 is not an easy option 'cause Cloudera 6.2 doesn't
>>> have a Spark 3.0 parcel So we can't upgrade to 3.0.
>>>
>>> So sad. Let me ask one more time. *Is there no way to fix this in Spark
>>> 2.4?*
>>>
>>>
>>> On Tue, Nov 10, 2020 at 11:33 AM Eric Beabes 
>>> wrote:
>>>
 BTW, we are seeing this message as well: 
 *"org.apache.kafka.common.KafkaException:
 Producer** closed while send in progress"*. I am assuming this happens
 because of the previous issue.."producer has been closed", right? Or are
 they unrelated? Please advise. Thanks.

 On Tue, Nov 10, 2020 at 11:17 AM Eric Beabes 
 wrote:

> Thanks for the reply. We are on Spark 2.4. Is there no way to get this
> fixed in Spark 2.4?
>
> On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Which Spark version do you use? There's a known issue on Kafka
>> producer pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to
>> check whether your case is bound to the known issue or not.
>>
>> https://issues.apache.org/jira/browse/SPARK-21869
>>
>>
>> On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes 
>> wrote:
>>
>>> I know this is related to Kafka but it happens during the Spark
>>> Structured Streaming job that's why I am asking on this mailing list.
>>>
>>> How would you debug this or get around this in Spark Structured
>>> Streaming? Any tips would be appreciated. Thanks.
>>>
>>>
>>> java.lang.IllegalStateException: Cannot perform operation after
>>> producer has been closed at
>>> org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853)
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
>>> at
>>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
>>> at
>>> org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92)
>>> at
>>> org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95)
>>>
>>


Re: Out of memory issue

2020-11-20 Thread Russell Spitzer
Well if the system doesn't change, then the data must be different. The
exact exception probably won't be helpful since it only tells us the last
allocation that failed. My guess is that your ingestion changed and there
is either now slightly more data than previously or it's skewed
differently. One of the two things is probably happening and is overloading
one executor.

The solution is to increase executor heap.

On Fri, Nov 20, 2020 at 8:25 AM Amit Sharma  wrote:

> please help.
>
>
> Thanks
> Amit
>
> On Mon, Nov 9, 2020 at 4:18 PM Amit Sharma  wrote:
>
>> Please find below the exact exception
>>
>> Exception in thread "streaming-job-executor-3"
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3332)
>> at
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
>> at
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>> at
>> scala.StringContext.standardInterpolator(StringContext.scala:126)
>> at scala.StringContext.s(StringContext.scala:95)
>> at sparkStreaming.TRReview.getTRReviews(TRReview.scala:307)
>> at
>> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:154)
>> at
>> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:138)
>> at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
>> at scala.util.Try$.apply(Try.scala:192)
>> at scala.util.Success.map(Try.scala:237)
>>
>> On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma  wrote:
>>
>>> Hi , I am using 16 nodes spark cluster with below config
>>> 1. Executor memory  8 GB
>>> 2. 5 cores per executor
>>> 3. Driver memory 12 GB.
>>>
>>>
>>> We have streaming job. We do not see problem but sometimes we get
>>> exception executor-1 heap memory issue. I am not understanding if data size
>>> is same and this job receive a request and process it but suddenly it’s
>>> start giving out of memory error . It will throw exception for 1 executor
>>> then throw for other executor also and it stop processing the request.
>>>
>>> Thanks
>>> Amit
>>>
>>


unsubscribe

2020-11-20 Thread youso b
On Fri, Nov 20, 2020, 8:25 AM Amit Sharma  wrote:

> please help.
>
>
> Thanks
> Amit
>
> On Mon, Nov 9, 2020 at 4:18 PM Amit Sharma  wrote:
>
>> Please find below the exact exception
>>
>> Exception in thread "streaming-job-executor-3"
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3332)
>> at
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
>> at
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>> at
>> scala.StringContext.standardInterpolator(StringContext.scala:126)
>> at scala.StringContext.s(StringContext.scala:95)
>> at sparkStreaming.TRReview.getTRReviews(TRReview.scala:307)
>> at
>> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:154)
>> at
>> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:138)
>> at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
>> at scala.util.Try$.apply(Try.scala:192)
>> at scala.util.Success.map(Try.scala:237)
>>
>> On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma  wrote:
>>
>>> Hi , I am using 16 nodes spark cluster with below config
>>> 1. Executor memory  8 GB
>>> 2. 5 cores per executor
>>> 3. Driver memory 12 GB.
>>>
>>>
>>> We have streaming job. We do not see problem but sometimes we get
>>> exception executor-1 heap memory issue. I am not understanding if data size
>>> is same and this job receive a request and process it but suddenly it’s
>>> start giving out of memory error . It will throw exception for 1 executor
>>> then throw for other executor also and it stop processing the request.
>>>
>>> Thanks
>>> Amit
>>>
>>


Re: Out of memory issue

2020-11-20 Thread Amit Sharma
please help.


Thanks
Amit

On Mon, Nov 9, 2020 at 4:18 PM Amit Sharma  wrote:

> Please find below the exact exception
>
> Exception in thread "streaming-job-executor-3" java.lang.OutOfMemoryError:
> Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
> at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
> at java.lang.StringBuilder.append(StringBuilder.java:136)
> at
> scala.StringContext.standardInterpolator(StringContext.scala:126)
> at scala.StringContext.s(StringContext.scala:95)
> at sparkStreaming.TRReview.getTRReviews(TRReview.scala:307)
> at
> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:154)
> at
> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:138)
> at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
> at scala.util.Try$.apply(Try.scala:192)
> at scala.util.Success.map(Try.scala:237)
>
> On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma  wrote:
>
>> Hi , I am using 16 nodes spark cluster with below config
>> 1. Executor memory  8 GB
>> 2. 5 cores per executor
>> 3. Driver memory 12 GB.
>>
>>
>> We have streaming job. We do not see problem but sometimes we get
>> exception executor-1 heap memory issue. I am not understanding if data size
>> is same and this job receive a request and process it but suddenly it’s
>> start giving out of memory error . It will throw exception for 1 executor
>> then throw for other executor also and it stop processing the request.
>>
>> Thanks
>> Amit
>>
>


Re: Spark Exception

2020-11-20 Thread Amit Sharma
Please help.


Thanks
Amit

On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma  wrote:

> Hi, we are running a spark streaming  job and sometimes it throws below
> two exceptions . I am not understanding  what is the difference between
> these two exception for one timeout is 120 seconds and another is 600
> seconds. What could be the reason for these
>
>
>  Error running job streaming job 1605709968000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures
> timed out after [120 seconds]. This timeout is controlled by
> spark.rpc.askTimeout
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
> at
> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445)
> at
> org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519)
> at
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047)
>
>
>
>
>
> 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread
> heartbeat-receiver-event-loop-thread
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600
> seconds]. This timeout is controlled by BlockManagerHeartbeat
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at
> org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455)
> at
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
> at
> org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
> at
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>