[GitHub] spark issue #21341: Revert "[SPARK-22938][SQL][FOLLOWUP] Assert that SQLConf...

2018-05-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21341
  
sure this is fine, but we'll see the flakiness back in the builds till 
https://github.com/apache/spark/pull/21299 is merged, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21299: [SPARK-24250][SQL] support accessing SQLConf insi...

2018-05-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21299#discussion_r188650358
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala ---
@@ -90,13 +92,42 @@ object SQLExecution {
* thread from the original one, this method can be used to connect the 
Spark jobs in this action
* with the known executionId, e.g., 
`BroadcastExchangeExec.relationFuture`.
*/
-  def withExecutionId[T](sc: SparkContext, executionId: String)(body: => 
T): T = {
+  def withExecutionId[T](sparkSession: SparkSession, executionId: 
String)(body: => T): T = {
+val sc = sparkSession.sparkContext
 val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+withSQLConfPropagated(sparkSession) {
+  try {
+sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
+body
+  } finally {
+sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
+  }
+}
+  }
+
+  def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T 
= {
+// Set all the specified SQL configs to local properties, so that they 
can be available at
+// the executor side.
--- End diff --

properties are serialized per task.  how unusual would it be for there to 
be a large list of properties?  if that would be reasonable, then it might make 
more sense to use a Broadcast.

(separately, task serialization should probably avoid re-serializing the 
properties every time, but this could make that existing issue much worse,)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-16 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/21346

[SPARK-6237][NETWORK] Network-layer changes to allow stream upload.

These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks.

Added unit tests for handling streaming data, including successfully 
sending data, and failures in reading the stream with concurrent requests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark upload_stream

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21346.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21346


commit 49e0a80f89433368d3a3116eb9fcd7854ceecb62
Author: Imran Rashid 
Date:   2018-05-02T14:55:15Z

[SPARK-6237][NETWORK] Network-layer changes to allow stream upload.

These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks.

Added unit tests.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21346
  
All good questions and stuff I had wondered about too -- I should actually 
be sure to comment on these on the jira as well:

> I recall that the problem with large shuffle blocks was that the 
OneForOneBlockFetcher strategy basically read the entire block as a single 
chunk, which becomes a problem for large blocks. I understand that we have now 
removed this limitation for shuffles by using a streaming transfer strategy 
only for large blocks (above some threshold). Is this patch conceptually doing 
the same thing for push-based communication where the action is initiated by a 
sender (e.g. to push a block for replication)? 

yes

> Does it also affect pull-based remote cache block reads or will that be 
handled separately?

that was already handled by 
https://issues.apache.org/jira/browse/SPARK-22062 (despite the title saying its 
something else entirely).  That said, I recently discovered that my tests doing 
this for large blocks was incorrect, so I need to reconfirm this (I need to 
rearrange my test a little, and I've got a different aspect of this in flight 
so will take a couple of days probably).

> Given that we already seem to have pull-based openStream() calls which 
can be initiated from the receive side, could we simplify things here by 
pushing a "this value is big, pull it" message and then have the remote end 
initiate a streaming read, similar to how DirectTaskResult and 
IndirectTaskResult work?

its certainly possible to do this, and I started taking this approach, but 
I stopped because [replication is 
synchronous](https://github.com/apache/spark/blob/bfd75cdfb22a8c2fb005da597621e1ccd3990e82/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1344).
  So you'd have to add a callback for when the block is finally fetched, to go 
back to this initial call -- but also add timeout logic to avoid waiting 
forever if the destination went away.  It all seemed much more complicated than 
doing it the way I'm proposing here.

> For remote reads of large cached blocks: is it true that this works today 
only if the block is on disk but fails if the block is in memory? If certain 
size limit problems only occur when things are cached in memory, can we 
simplify anything if we add a requirement that blocks above 2GB can only be 
cached on disk (regardless of storage level)?

Correct; I'm currently investigating what we can do to address this.  
(sorry, again I discovered my test was broken shortly after posting this.)  It 
would certainly simplify things if we only supported this for disk cached 
blocks -- what exactly are you proposing?  Just failing when its cached in 
memory, and telling the user to rerun with disk caching?  Changing the block 
manager to automatically cache on disk _also_ when the block is > 2gb?  Or when 
sending the block, just write it to a temp file, and then send from that?

The problem here is on the sending side, not the receiving side; netty uses 
an [`int` to manage the length of a  `ByteBuf` based 
msg](https://github.com/netty/netty/blob/netty-4.1.17.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L178-L180),
 but it uses a [`long` for a `FileRegion` based 
msg](https://github.com/netty/netty/blob/netty-4.1.17.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L213-L224)
  (code is a little different in the latest on branch 4.1, but same problem is 
still there).  I'm investigating making a "FileRegion" that is actually backed 
by a `ChunkedByteBuffer`.

But that would go into another jira under SPARK-6235


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-17 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21346
  
btw I may have made the pull-based approach sound more complex than I meant 
to, I'm happy to take that approach if you think its better.  The fact the 
replication is synchronous doesn't really matter, I just meant its not a 
fire-and-forget msg, we have to setup the callbacks to confirm the block has 
been fetched (or a failure).  It just seemed like extra indirection to me, and 
I thought it would be better to stay closer to the UploadBlock path.

Are there particular reasons you think that approach would be better?  I 
guess the receiver can throttle the requests, but on the other hand the task on 
the sender will block waiting for the replication to finish (whether its 
success or failure), so we really don't want it to wait too long.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21356: [SPARK-24309][CORE] AsyncEventQueue should stop o...

2018-05-17 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/21356

[SPARK-24309][CORE] AsyncEventQueue should stop on interrupt.

EventListeners can interrupt the event queue thread.  In particular,
when the EventLoggingListener writes to hdfs, hdfs can interrupt the
thread.  When there is an interrupt, the queue should be removed and stop
accepting any more events.  Before this change, the queue would continue
to take more events (till it was full), and then would not stop when the
application was complete because the PoisonPill couldn't be added.

Added a unit test which failed before this change.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-24309

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21356.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21356


commit a689f527e6b9bf626cef0e318a37e7aa3c008bc3
Author: Imran Rashid 
Date:   2018-05-17T20:45:52Z

[SPARK-24309][CORE] AsyncEventQueue should stop on interrupt.

EventListeners can interrupt the event queue thread.  In particular,
when the EventLoggingListener writes to hdfs, hdfs can interrupt the
thread.  When there is an interrupt, the queue should be removed and stop
accepting any more events.  Before this change, the queue would continue
to take more events (till it was full), and then would not stop when the
application was complete because the PoisonPill couldn't be added.

Added a unit test which failed before this change.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21356: [SPARK-24309][CORE] AsyncEventQueue should stop o...

2018-05-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21356#discussion_r189152423
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala ---
@@ -97,6 +98,11 @@ private class AsyncEventQueue(val name: String, conf: 
SparkConf, metrics: LiveLi
 } catch {
   case ie: InterruptedException =>
 logInfo(s"Stopping listener queue $name.", ie)
+stopped.set(true)
--- End diff --

The old bus would stop the entire spark context (details below).

I dunno what the right behavior is either -- I figured this was the 
intention given the logInfo.  Alternatively we could (a) stop the entire spark 
context, (b) skip this particular event and keep going or (c) stop the one 
listener which happened to be active on the interrupt, but keep the queue 
active (if there were more listeners).

more details on the 2.2 behavior:


[`ListenerBus.postToAll`](https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/util/ListenerBus.scala#L55)
 wouldn't catch the event.

And the polling thread in 
[`LiveListenerBus`](https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L77)
 wraps everything in `Utils.tryOrStopSparkContext`.

I did a similar test on branch-2.2: 
https://github.com/squito/spark/commit/72951bd69fca0c58c8a8b202ca59e167ebb5d71b

```
18/05/17 21:38:23.446 SparkListenerBus ERROR Utils: uncaught error in 
thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at 
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1282)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
18/05/17 21:38:23.448 SparkListenerBus ERROR Utils: throw uncaught fatal 
error in thread SparkListenerBus
java.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at 
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1282)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21356: [SPARK-24309][CORE] AsyncEventQueue should stop on inter...

2018-05-17 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21356
  
> does this mean a problematic listener can kill the queue and crash other 
listeners in the same queue? Shall we do some isolation?

yeah I think marcelo was asking about this above 
https://github.com/apache/spark/pull/21356#discussion_r189100750

of course, we can't stop a listener from causing an OOM or launching 20 
threads and hogging the CPU, but we could try for a little more isolation ...

> Not introduced by this PR, but I'm wondering why we only catch 
InterruptedException? what about other exceptions?

NonFatal is already handled inside 
[`ListenerBus.postToAll`](https://github.com/apache/spark/blob/d4a0895c628ca854895c3c35c46ed990af36ec61/core/src/main/scala/org/apache/spark/util/ListenerBus.scala#L81-L84)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21356: [SPARK-24309][CORE] AsyncEventQueue should stop on inter...

2018-05-18 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21356
  
I pushed an update which only removes the listener which was active at the 
interrupt.  Note that is not the same thing as the listener which *caused* the 
interrupt, necessarily -- we have no idea who caused the interrupt really, but 
I guess it would be rather contrived for another Thread to interrupt the event 
queue threads.

I'm not sure if I should leave some more guards against interrupts as well 
-- eg. check the interrupted status before posting the event to a specific 
listener, and if its set throwing InterruptedException, and leaving the generic 
handling of InterruptedException in the AsyncEventQueue, to shut the queue down 
entirely if there is any interrupt *outside* of a posting to a listener.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21356: [SPARK-24309][CORE] AsyncEventQueue should stop on inter...

2018-05-18 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21356
  
> As an alternative design option, PoisonPill could be handled differently, 
since some msgs should have higher priority and can be considered them as part 
of your "control plane".

I'm pretty sure the existing use of PoisonPill is intentionally put at the 
end of the queue, so that the everything is processed before the queue is shut 
down.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21356: [SPARK-24309][CORE] AsyncEventQueue should stop o...

2018-05-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21356#discussion_r189384006
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -80,6 +89,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
   }
   try {
 doPostEvent(listener, event)
+if (Thread.interrupted()) {
--- End diff --

yeah agree, I will handle this too -- but I'll wait to update till there is 
agreement on the right overall approach


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21356: [SPARK-24309][CORE] AsyncEventQueue should stop on inter...

2018-05-18 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21356
  
the problem was not actually an interrupted exception from the listener, it 
was that the Thread's state was getting set to interrupted, and then there 
would be a failure later in `queue.take()`:

```
18/05/16 17:46:36 INFO scheduler.AsyncEventQueue: Stopping listener queue 
eventLog.
java.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
at 
java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:94)
...
```

so really the question is what to do if a listener sets the interrupt state 
*or* throws an InterruptedException.  But from all the discussion its not that 
clear to me what we should (other than of course preventing the queue from 
hanging and the applciation from stopping).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21356: [SPARK-24309][CORE] AsyncEventQueue should stop o...

2018-05-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21356#discussion_r189483397
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -80,6 +89,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
   }
   try {
 doPostEvent(listener, event)
+if (Thread.interrupted()) {
--- End diff --

> This is ok right now since Spark code never explicitly interrupts these 
threads. If we ever need to do that, though, this might become a problem... but 
in that case I don't know how you'd handle this issue without just giving up 
and stopping everything.

If spark were to explicitly interrupt, then I think we'd also set some 
other flag indicating a reason, eg. `val requestedQueueStop: AtomicBoolean` so 
it shouldn't be hard to distinguish.

I've pushed an update to handle `InterruptedException` as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21356: [SPARK-24309][CORE] AsyncEventQueue should stop o...

2018-05-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21356#discussion_r189604443
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -80,7 +89,16 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
   }
   try {
 doPostEvent(listener, event)
+if (Thread.interrupted()) {
+  logError(s"Interrupted while posting to 
${Utils.getFormattedClassName(listener)}.  " +
+s"Removing that listener.")
+  removeListenerOnError(listener)
--- End diff --

`Thread.interrupted()` also clears the interrupted state.  So that alone 
isn't a problem -- we're basically declaring that we've handled the interrupt 
and nobody else gets to know about it anymore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...

2018-05-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21068
  
ping @tgravescs .  honestly I still don't love the blacklist limit, 
especially since it makes reporting back to the driver pretty confusing, and I 
don't think it buys us much.  But I can live with it.  and otherwise I think 
this is ready.

I've also looked at Attila's tests on a real cluster


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-05-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
thats fine with me, but as I'm neither a user of mesos nor am I in touch w/ 
many mesos users, I'd like wait a bit for more opinions, given the 
ramifications of this change. (that shouldn't block work on the better version, 
if anybody wants to take that on ...) @IgorBerman @susanxhuynh

blacklisting in yarn (at least, what spark does internally already) is 
really not much more sophisticated, at least before SPARK-16630; spark does 
tell yarn which hosts it has blacklisted so it wants to avoid for future 
executors, but thats about it.  Yarn itself is doing a little more as well, as 
it has its own disk health checker etc., and it'll try to exclude resources 
from *all* applications if it thinks they are bad.  but that is independent of 
changes within spark itself.

also I'd like to see a jira for mesos to discuss the other improvements 
we've discussed here to be more like SPARK-16630 so we don't forget about it.  
@skonto can you file that jira and try to capture some of the points that have 
been raised in the discussion here? (or maybe that jira exists already?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21356: [SPARK-24309][CORE] AsyncEventQueue should stop o...

2018-05-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21356#discussion_r189702478
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala ---
@@ -130,7 +129,11 @@ private class AsyncEventQueue(val name: String, conf: 
SparkConf, metrics: LiveLi
   eventCount.incrementAndGet()
   eventQueue.put(POISON_PILL)
 }
-dispatchThread.join()
+// this thread might be trying to stop itself as part of error 
handling -- we can't join
+// in that case.
+if (Thread.currentThread() != dispatchThread) {
--- End diff --

It does still happen, we need this.  We see the interrupt in postToAll, 
which is in the queue thread.  If it fails, we call `removeListenerOnError`.  
If that results in the queue being empty, we stop the queue.


```
"spark-listener-group-eventLog" #20 daemon prio=5 os_prio=31 
tid=0x7f831379e800 nid=0x6303 in Object.wait() [0x000129226000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00078047ae28> (a 
org.apache.spark.scheduler.AsyncEventQueue$$anon$1)
at java.lang.Thread.join(Thread.java:1245)
- locked <0x00078047ae28> (a 
org.apache.spark.scheduler.AsyncEventQueue$$anon$1)
at java.lang.Thread.join(Thread.java:1319)
at 
org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:135)
at 
org.apache.spark.scheduler.LiveListenerBus$$anonfun$removeListener$2.apply(LiveListenerBus.scala:123)
at 
org.apache.spark.scheduler.LiveListenerBus$$anonfun$removeListener$2.apply(LiveListenerBus.scala:121)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.LiveListenerBus.removeListener(LiveListenerBus.scala:121)
- locked <0x000780475fe8> (a 
org.apache.spark.scheduler.LiveListenerBus)
at 
org.apache.spark.scheduler.AsyncEventQueue.removeListenerOnError(AsyncEventQueue.scala:196)
at 
org.apache.spark.scheduler.AsyncEventQueue.removeListenerOnError(AsyncEventQueue.scala:37)
at 
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:101)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...

2018-05-23 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21068
  
I totally understand your motivation for wanting the limit.  But I'm trying 
to balance that against behavior which might not really achieve the desired 
effect and be even more confusing in some cases.

It won't achieve the desired effect if your cluster has more nodes, but 
they're all tied up in other applications.  It'll be confusing to users if they 
see notification about blacklisting in the logs and UI, but then still see 
spark trying to use those nodes anyway.  I wonder if putting this in will make 
it hard

All that said, I don't have a great alternative now, other than just 
removing the limit entirely for the moment and adding notification to the 
driver.  We could have a more general starvation detector, which wouldn't only 
look at node count, but also look at delays in acquiring containers and finding 
places to schedule tasks (related to SPARK-15815 & SPARK-22148), but I don't 
want to tackle all of that here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...

2018-05-23 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21068
  
I mean when `YarnAllocatorBlacklistTracker` decides to blacklist because of 
allocation failures, it doesn't send any message back to the driver -- so the 
driver doesn't have a msg in the logs, nor in the event log nor a UI update.  
So in client mode, the user would need to get AM logs to know what was going on.

Attila wanted to do it this way because of 
`mostRelevantSubsetOfBlacklistedNodes` -- it seemed weird to send an update to 
the driver when the blacklisting wasn't necessarily even in effect.  Though now 
that I'm thinking about this, maybe it should just send the update anyway, even 
though that blacklist may effectively be ignored.

Re: starvation -- I agree, though "eventually" for resources can be so long 
in practice that to users it all looks the same.

Anyway, though you say you're OK with removing the limit, it seems like you 
feel more strongly about this then I do.  So I think we can keep it, I don't 
think it prevents us from doing something else down the road.

I do think we should add the notification to the driver, including a 
listener event, which just ignores `mostRelevantSubsetOfBlacklistedNodes`, 
unless anyone has a reason for not doing it.  I suggest @attilapiros does that 
in a followup.

If that plan sounds OK, then this is probably nearly ready to merge.  But 
its been a little while since I've looked closely so I'll do another pass 
(probably tomorrow).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190346995
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +96,10 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of live stages, to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int),
+mutable.HashMap[String, PeakExecutorMetrics]]()
--- End diff --

you could just import mutable.HashMap (added bonus -- fits on one line)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190353891
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -209,6 +210,16 @@ class DAGScheduler(
   private[scheduler] val eventProcessLoop = new 
DAGSchedulerEventProcessLoop(this)
   taskScheduler.setDAGScheduler(this)
 
+  /** driver heartbeat for collecting metrics */
+  private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, 
"driver-heartbeater",
--- End diff --

lets not put this in the DAGScheduler please -- this class is fragile 
enough as it is :)

I think this should just go in SparkContext.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190346570
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -81,7 +84,7 @@ private[spark] class EventLoggingListener(
   private val compressionCodecName = compressionCodec.map { c =>
 CompressionCodec.getShortName(c.getClass.getName)
   }
-
+logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + 
shouldLogExecutorMetricsUpdates)
--- End diff --

doesn't really seem necessary at all, definitely not at INFO level (and 
indentation is wrong).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190363619
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
+  createStageSubmittedEvent(1),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
+  createStageCompletedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
+  createExecutorRemovedEvent(1),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
+  createStageCompletedEvent(1),
+  SparkListenerApplicationEnd(1000L))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+events.foreach(event => listenerBus.post(event))
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events.
+// Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
+// are consolidated, 

[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190351033
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +183,35 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each executor
--- End diff --

I'd add a comment here that this will log metrics for all executors that 
were alive while the stage was running, whether or not they ran any tasks for 
that stage (I think that's what it will do here, right?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190345745
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -800,26 +812,50 @@ private[spark] class Executor(
 }
 }
   }
-
-  /**
-   * Schedules a task to report heartbeat and partial metrics for active 
tasks to driver.
-   */
-  private def startDriverHeartbeater(): Unit = {
-val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", 
"10s")
-
-// Wait a random interval so the heartbeats don't end up in sync
-val initialDelay = intervalMs + (math.random * 
intervalMs).asInstanceOf[Int]
-
-val heartbeatTask = new Runnable() {
-  override def run(): Unit = 
Utils.logUncaughtExceptions(reportHeartBeat())
-}
-heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, 
intervalMs, TimeUnit.MILLISECONDS)
-  }
 }
 
 private[spark] object Executor {
   // This is reserved for internal use by components that need to read 
task properties before a
   // task is fully deserialized. When possible, the 
TaskContext.getLocalProperty call should be
   // used instead.
   val taskDeserializationProps: ThreadLocal[Properties] = new 
ThreadLocal[Properties]
+
+  val DIRECT_BUFFER_POOL_NAME = "direct"
+  val MAPPED_BUFFER_POOL_NAME = "mapped"
+
+  /** Get the BufferPoolMXBean for the specified buffer pool. */
+  def getBufferPool(pool: String): BufferPoolMXBean = {
+val name = new ObjectName("java.nio:type=BufferPool,name=" + pool)
+
ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer,
+  name.toString, classOf[BufferPoolMXBean])
+  }
+
+  /**
+   * Get the current executor level memory metrics.
+   *
+   * @param memoryManager the memory manager
+   * @param direct the direct memory buffer pool
+   * @param mapped the mapped memory buffer pool
+   * @return the executor memory metrics
+   */
+  def getCurrentExecutorMetrics(
+  memoryManager: MemoryManager,
+  direct: BufferPoolMXBean,
+  mapped: BufferPoolMXBean) : ExecutorMetrics = {
--- End diff --

does it make more sense to move this inside `Heartbeater`?  Then you don't 
need to pass in any BufferPoolMXBeans.  also rename to "getCurrentMemoryMetrics"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190359417
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
+  createStageSubmittedEvent(1),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
+  createStageCompletedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
+  createExecutorRemovedEvent(1),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
+  createStageCompletedEvent(1),
+  SparkListenerApplicationEnd(1000L))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+events.foreach(event => listenerBus.post(event))
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events.
+// Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
+// are consolidated, 

[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190363971
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
+  createStageSubmittedEvent(1),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
+  createStageCompletedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
+  createExecutorRemovedEvent(1),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
+  createStageCompletedEvent(1),
+  SparkListenerApplicationEnd(1000L))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+events.foreach(event => listenerBus.post(event))
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events.
+// Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
+// are consolidated, 

[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190364971
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
+  createStageSubmittedEvent(1),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
+  createStageCompletedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
+  createExecutorRemovedEvent(1),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
+  createStageCompletedEvent(1),
+  SparkListenerApplicationEnd(1000L))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+events.foreach(event => listenerBus.post(event))
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events.
+// Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
+// are consolidated, 

[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190405223
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
+  private var _jvmUsedHeapMemory = -1L;
+  private var _jvmUsedNonHeapMemory = 0L;
+  private var _onHeapExecutionMemory = 0L
+  private var _offHeapExecutionMemory = 0L
+  private var _onHeapStorageMemory = 0L
+  private var _offHeapStorageMemory = 0L
+  private var _onHeapUnifiedMemory = 0L
+  private var _offHeapUnifiedMemory = 0L
+  private var _directMemory = 0L
+  private var _mappedMemory = 0L
+
+  def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory
+
+  def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory
+
+  def onHeapExecutionMemory: Long = _onHeapExecutionMemory
+
+  def offHeapExecutionMemory: Long = _offHeapExecutionMemory
+
+  def onHeapStorageMemory: Long = _onHeapStorageMemory
+
+  def offHeapStorageMemory: Long = _offHeapStorageMemory
+
+  def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory
+
+  def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory
+
+  def directMemory: Long = _directMemory
+
+  def mappedMemory: Long = _mappedMemory
+
+  /**
+   * Compare the specified memory values with the saved peak executor 
memory
+   * values, and update if there is a new peak value.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
+  _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
+  updated = true
+}
+if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
+  _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
+  updated = true
+}
+if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
+  _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
+  _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
+  _onHeapStorageMemory = executorMetrics.onHeapStorageMemory
+  updated = true
+}
+if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
+  _offHeapStorageMemory = executorMetrics.offHeapStorageMemory
--- End diff --

I know spark has this kind of code all over the place already, but I really 
hate how error prone it is -- way too easy for a copy paste error to result in 
comparing the wrong two metrics, or updating the wrong value, or forgetting to 
update this when another metric is added, etc.

I just opened this https://github.com/edwinalu/spark/pull/1 as another way 
to do this that would eliminate a ton of boilerplate IMO.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-25 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21346
  
Last failures are known flakies.

A few updates here from my last set of comments.  I've posted an overall 
design doc, and shared the tests I'm running on a cluster.  I think the tests 
cover all the cases care about, but would appreciate review on that tests too.  
I can change this to use the existing pull approach for large blocks, rather 
than updating the push one if you want.  If you're OK with this, there will be 
one more PR on top of this to make use of the new uploadStream functionality.

There will be another PR as well to cover reading large remote blocks in 
memory for SPARK-24307


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191059478
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import com.google.common.io.Files;
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
--- End diff --

ooops, sorry got used to the style checker warning finding these in scala.  
fixed these now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-26 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/21440

[SPARK-24307][CORE] Support reading remote cached partitions > 2gb

(1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.

(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
mapping the entire file in chunks.

Added unit tests.  Ran the randomized test a couple of hundred times on my 
laptop.  Tests cover the equivalent of SPARK-24107 for the 
ChunkedByteBufferFileRegion.  Also tested on a cluster with remote cache reads 
>2gb (in memory and on disk).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark chunked_bb_file_region

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21440.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21440


commit 4373e27c2ec96b77a2311f5c5997ae5ca84bf6c5
Author: Imran Rashid 
Date:   2018-05-23T03:59:40Z

[SPARK-24307][CORE] Support reading remote cached partitions > 2gb

(1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.

(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
mapping the entire file in chunks.

Added unit tests.  Also tested on a cluster with remote cache reads >
2gb (in memory and on disk).




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191471289
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+  // Chunk size in bytes
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = chunkedByteBuffer.size
--- End diff --

no difference, `count()` is just to satisfy an interface.  My mistake for 
having them look different, I'll make them the same


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191472540
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimi

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191472949
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimi

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191476566
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can cause the process to use too much 
memory and get killed
+// by the OS / cluster manager (not a java OOM, since its a 
memory-mapped file) even though
+// we've read the data to disk.
--- End diff --

btw this fix is such low-hanging fruit that I would try to do this 
immediately afterwards.  (I haven't filed a jira yet just because there are 
already so many defunct jira related to this, I was going to wait till my 
changes got some traction).

I think its OK to get it in like this first, as this makes the behavior for 
2.01 gb basically the same as it was for 1.99 gb.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21440: [SPARK-24307][CORE] Support reading remote cached partit...

2018-05-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21440
  
thanks for the reviews @markhamstra @Ngone51 , I've updated the pr


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r191489062
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
+  private var _jvmUsedHeapMemory = -1L;
+  private var _jvmUsedNonHeapMemory = 0L;
+  private var _onHeapExecutionMemory = 0L
+  private var _offHeapExecutionMemory = 0L
+  private var _onHeapStorageMemory = 0L
+  private var _offHeapStorageMemory = 0L
+  private var _onHeapUnifiedMemory = 0L
+  private var _offHeapUnifiedMemory = 0L
+  private var _directMemory = 0L
+  private var _mappedMemory = 0L
+
+  def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory
+
+  def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory
+
+  def onHeapExecutionMemory: Long = _onHeapExecutionMemory
+
+  def offHeapExecutionMemory: Long = _offHeapExecutionMemory
+
+  def onHeapStorageMemory: Long = _onHeapStorageMemory
+
+  def offHeapStorageMemory: Long = _offHeapStorageMemory
+
+  def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory
+
+  def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory
+
+  def directMemory: Long = _directMemory
+
+  def mappedMemory: Long = _mappedMemory
+
+  /**
+   * Compare the specified memory values with the saved peak executor 
memory
+   * values, and update if there is a new peak value.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
+  _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
+  updated = true
+}
+if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
+  _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
+  updated = true
+}
+if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
+  _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
+  _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
+  _onHeapStorageMemory = executorMetrics.onHeapStorageMemory
+  updated = true
+}
+if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
+  _offHeapStorageMemory = executorMetrics.offHeapStorageMemory
--- End diff --

The more you can take it over from here, the better :) But let me know if 
there is anything which is confusing, or if the TODOs that I've left actually 
don't seem possible etc. and I can take a closer look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-29 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/21451

[SPARK-24296][CORE][WIP] Replicate large blocks as a stream.

When replicating large cached RDD blocks, it can be helpful to replicate
them as a stream, to avoid using large amounts of memory during the
transfer.  This also allows blocks larger than 2GB to be replicated.

Added unit tests in DistributedSuite.  Also ran tests on a cluster for
blocks > 2gb.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark clean_replication

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21451.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21451


commit 05967808f5440835919f02d6c5d0d3563482d304
Author: Imran Rashid 
Date:   2018-05-02T14:55:15Z

[SPARK-6237][NETWORK] Network-layer changes to allow stream upload.

These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks.

Added unit tests.

commit 43658df6d6b7dffacd528a2573e8846ab6469e81
Author: Imran Rashid 
Date:   2018-05-23T03:59:40Z

[SPARK-24307][CORE] Support reading remote cached partitions > 2gb

(1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.

(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
mapping the entire file in chunks.

Added unit tests.  Also tested on a cluster with remote cache reads >
2gb (in memory and on disk).

commit 7e517e4ea0ff66dc57121b54fdd71f8391edd8f2
Author: Imran Rashid 
Date:   2018-05-15T16:48:51Z

[SPARK-24296][CORE] Replicate large blocks as a stream.

When replicating large cached RDD blocks, it can be helpful to replicate
them as a stream, to avoid using large amounts of memory during the
transfer.  This also allows blocks larger than 2GB to be replicated.

Added unit tests in DistributedSuite.  Also ran tests on a cluster for
blocks > 2gb.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

2018-05-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21451
  
This is the change for SPARK-24296, on top of 
https://github.com/apache/spark/pull/21346 and 
https://github.com/apache/spark/pull/21440

Posting here for testing.  Review are welcome on this commit which has just 
the relevant changes: 
https://github.com/apache/spark/pull/21451/commits/7e517e4ea0ff66dc57121b54fdd71f8391edd8f2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21456: [SPARK-24356] [CORE] Duplicate strings in File.path mana...

2018-05-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21456
  
Ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-30 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21346
  
yeah I see what you're saying about better error handling, but I'd really 
rather not take that on here.  I think some prior attempts at solving the 2gb 
limit  have tried to take on too much, and I'd like to keep this is simple as 
possible, and leave more for future improvements.  I guess it means that when 
(if) we do make the changes you're proposing, we'd have to go back to changing 
the network layer again, possibly introducing new message types etc.  But we're 
not really painting ourselves in a corner at all, we can do that if it becomes 
necessary.

fwiw, there are other things that are higher on my list to fix when the 
basic functionality goes in:
1) when you do a remote read of a cached data, even if you fetch to disk, 
you memory map the entire file, rather than just using a FileInputStream
2) if you replicate a disk-cached block, it'll get written to disk to a 
temp file, then read back from that file into memory, and then written to the 
new location.
3) when you a do remote read of cached data, you shouldn't actually have to 
wait till you fetch all the data, you should just be able to treat it as an 
inputstream


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r175164254
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // we assume blocks from the latest rdd are most relevant
--- End diff --

right, I'm suggesting that you expand the *comment* to be what I had 
written above, so its a little easier to follow the logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2018-03-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19041
  
Thanks @brad-kaiser -- want to re-iterate my comment from Feb 2nd, I think 
that is really the most important part to address before getting into the 
details of the current implementation:

> Thought some more about the race between RemoveBlock getting sent back 
from the executor vs when the CacheRecoveryManager tries to replicate the next 
block -- actually why is there the back-and-forth with the driver for every 
block? Why isn't there just one message from the CacheRecoveryManager to the 
executor, saying "Drain all RDD blocks" and then one message from the executor 
back to the driver when its done?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...

2018-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20742#discussion_r175170426
  
--- Diff: docs/security.md ---
@@ -3,47 +3,291 @@ layout: global
 displayTitle: Spark Security
 title: Security
 ---
+* This will become a table of contents (this text will be scraped).
+{:toc}
 
-Spark currently supports authentication via a shared secret. 
Authentication can be configured to be on via the `spark.authenticate` 
configuration parameter. This parameter controls whether the Spark 
communication protocols do authentication using the shared secret. This 
authentication is a basic handshake to make sure both sides have the same 
shared secret and are allowed to communicate. If the shared secret is not 
identical they will not be allowed to communicate. The shared secret is created 
as follows:
+# Spark RPC
 
-* For Spark on [YARN](running-on-yarn.html) and local deployments, 
configuring `spark.authenticate` to `true` will automatically handle generating 
and distributing the shared secret. Each application will use a unique shared 
secret.
-* For other types of Spark deployments, the Spark parameter 
`spark.authenticate.secret` should be configured on each of the nodes. This 
secret will be used by all the Master/Workers and applications.
+## Authentication
 
-## Web UI
+Spark currently supports authentication for RPC channels using a shared 
secret. Authentication can
+be turned on by setting the `spark.authenticate` configuration parameter.
 
-The Spark UI can be secured by using [javax servlet 
filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the 
`spark.ui.filters` setting
-and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via [SSL 
settings](security.html#ssl-configuration).
+The exact mechanism used to generate and distribute the shared secret is 
deployment-specific.
 
-### Authentication
+For Spark on [YARN](running-on-yarn.html) and local deployments, Spark 
will automatically handle
+generating and distributing the shared secret. Each application will use a 
unique shared secret. In
+the case of YARN, this feature relies on YARN RPC encryption being enabled 
for the distribution of
+secrets to be secure.
 
-A user may want to secure the UI if it has data that other users should 
not be allowed to see. The javax servlet filter specified by the user can 
authenticate the user and then once the user is logged in, Spark can compare 
that user versus the view ACLs to make sure they are authorized to view the UI. 
The configs `spark.acls.enable`, `spark.ui.view.acls` and 
`spark.ui.view.acls.groups` control the behavior of the ACLs. Note that the 
user who started the application always has view access to the UI.  On YARN, 
the Spark UI uses the standard YARN web application proxy mechanism and will 
authenticate via any installed Hadoop filters.
+For other resource managers, `spark.authenticate.secret` must be 
configured on each of the nodes.
+This secret will be shared by all the daemons and applications, so this 
deployment configuration is
+not as secure as the above, especially when considering multi-tenant 
clusters.
 
-Spark also supports modify ACLs to control who has access to modify a 
running Spark application. This includes things like killing the application or 
a task. This is controlled by the configs `spark.acls.enable`, 
`spark.modify.acls` and `spark.modify.acls.groups`. Note that if you are 
authenticating the web UI, in order to use the kill button on the web UI it 
might be necessary to add the users in the modify acls to the view acls also. 
On YARN, the modify acls are passed in and control who has modify access via 
YARN interfaces.
-Spark allows for a set of administrators to be specified in the acls who 
always have view and modify permissions to all the applications. is controlled 
by the configs `spark.admin.acls` and `spark.admin.acls.groups`. This is useful 
on a shared cluster where you might have administrators or support staff who 
help users debug applications.
+
+Property NameDefaultMeaning
+
+  spark.authenticate
+  false
+  Whether Spark authenticates its internal connections.
+
+
+  spark.authenticate.secret
+  None
+  
+The secret key used authentication. See above for when this 
configuration should be set.
+  
+
+
 
-## Event Logging
+## Encryption
 
-If your applications are using event logging, the directory where the 
event logs go (`spark.eventLog.dir`) should be manually created and have the 
proper permissions set on it. If you want those log files secured, the 
permissions should be set to `drwxrwxrwxt` for that directory. The owner of the 
directory should be the super user who is running the history server and the 
group permissions should be

[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...

2018-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20742#discussion_r175173523
  
--- Diff: docs/security.md ---
@@ -182,54 +582,70 @@ configure those ports.
   
 
 
-### HTTP Security Headers
 
-Apache Spark can be configured to include HTTP Headers which aids in 
preventing Cross 
-Site Scripting (XSS), Cross-Frame Scripting (XFS), MIME-Sniffing and also 
enforces HTTP 
-Strict Transport Security.
+# Kerberos
+
+Spark supports submitting applications in environments that use Kerberos 
for authentication.
+In most cases, Spark relies on the credentials of the current logged in 
user when authenticating
+to Kerberos-aware services. Such credentials can be obtained by logging in 
to the configured KDC
+with tools like `kinit`.
+
+When talking to Hadoop-based services, Spark needs to obtain delegation 
tokens so that non-local
+processes can authenticate. Spark ships with support for HDFS and other 
Hadoop file systems, Hive
+and HBase.
+
+When using a Hadoop filesystem (such HDFS or WebHDFS), Spark will acquire 
the relevant tokens
+for the service hosting the user's home directory.
+
+An HBase token will be obtained if HBase is in the application's 
classpath, and the HBase
+configuration has Kerberos authentication turned 
(`hbase.security.authentication=kerberos`).
+
+Similarly, a Hive token will be obtained if Hive is in the classpath, and 
the configuration includes
+a URIs for remote metastore services (`hive.metastore.uris` is not empty).
--- End diff --

nit: either "a URI" or "URIs"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...

2018-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20742#discussion_r175171592
  
--- Diff: docs/security.md ---
@@ -3,47 +3,291 @@ layout: global
 displayTitle: Spark Security
 title: Security
 ---
+* This will become a table of contents (this text will be scraped).
+{:toc}
 
-Spark currently supports authentication via a shared secret. 
Authentication can be configured to be on via the `spark.authenticate` 
configuration parameter. This parameter controls whether the Spark 
communication protocols do authentication using the shared secret. This 
authentication is a basic handshake to make sure both sides have the same 
shared secret and are allowed to communicate. If the shared secret is not 
identical they will not be allowed to communicate. The shared secret is created 
as follows:
+# Spark RPC
 
-* For Spark on [YARN](running-on-yarn.html) and local deployments, 
configuring `spark.authenticate` to `true` will automatically handle generating 
and distributing the shared secret. Each application will use a unique shared 
secret.
-* For other types of Spark deployments, the Spark parameter 
`spark.authenticate.secret` should be configured on each of the nodes. This 
secret will be used by all the Master/Workers and applications.
+## Authentication
 
-## Web UI
+Spark currently supports authentication for RPC channels using a shared 
secret. Authentication can
+be turned on by setting the `spark.authenticate` configuration parameter.
 
-The Spark UI can be secured by using [javax servlet 
filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the 
`spark.ui.filters` setting
-and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via [SSL 
settings](security.html#ssl-configuration).
+The exact mechanism used to generate and distribute the shared secret is 
deployment-specific.
 
-### Authentication
+For Spark on [YARN](running-on-yarn.html) and local deployments, Spark 
will automatically handle
+generating and distributing the shared secret. Each application will use a 
unique shared secret. In
+the case of YARN, this feature relies on YARN RPC encryption being enabled 
for the distribution of
+secrets to be secure.
 
-A user may want to secure the UI if it has data that other users should 
not be allowed to see. The javax servlet filter specified by the user can 
authenticate the user and then once the user is logged in, Spark can compare 
that user versus the view ACLs to make sure they are authorized to view the UI. 
The configs `spark.acls.enable`, `spark.ui.view.acls` and 
`spark.ui.view.acls.groups` control the behavior of the ACLs. Note that the 
user who started the application always has view access to the UI.  On YARN, 
the Spark UI uses the standard YARN web application proxy mechanism and will 
authenticate via any installed Hadoop filters.
+For other resource managers, `spark.authenticate.secret` must be 
configured on each of the nodes.
+This secret will be shared by all the daemons and applications, so this 
deployment configuration is
+not as secure as the above, especially when considering multi-tenant 
clusters.
 
-Spark also supports modify ACLs to control who has access to modify a 
running Spark application. This includes things like killing the application or 
a task. This is controlled by the configs `spark.acls.enable`, 
`spark.modify.acls` and `spark.modify.acls.groups`. Note that if you are 
authenticating the web UI, in order to use the kill button on the web UI it 
might be necessary to add the users in the modify acls to the view acls also. 
On YARN, the modify acls are passed in and control who has modify access via 
YARN interfaces.
-Spark allows for a set of administrators to be specified in the acls who 
always have view and modify permissions to all the applications. is controlled 
by the configs `spark.admin.acls` and `spark.admin.acls.groups`. This is useful 
on a shared cluster where you might have administrators or support staff who 
help users debug applications.
+
+Property NameDefaultMeaning
+
+  spark.authenticate
+  false
+  Whether Spark authenticates its internal connections.
+
+
+  spark.authenticate.secret
+  None
+  
+The secret key used authentication. See above for when this 
configuration should be set.
+  
+
+
 
-## Event Logging
+## Encryption
 
-If your applications are using event logging, the directory where the 
event logs go (`spark.eventLog.dir`) should be manually created and have the 
proper permissions set on it. If you want those log files secured, the 
permissions should be set to `drwxrwxrwxt` for that directory. The owner of the 
directory should be the super user who is running the history server and the 
group permissions should be

[GitHub] spark issue #20742: [SPARK-23572][docs] Bring "security.md" up to date.

2018-03-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20742
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20770: [SPARK-23626][CORE] DAGScheduler blocked due to JobSubmi...

2018-03-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20770
  
took a quick look, agree with shivaram's observations, you've got to handle 
`shuffleIdToMapStage` which will not be so easy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20853: [SPARK-23729][SS] Glob resolution is done without the fr...

2018-03-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20853
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

2018-03-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20657
  
@jerryshao I know you said you wanted to take a deeper look, but its been a 
while.  otherwise I'll merge in the next day or two


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-03-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20604
  
@vanzin @sitalkedia @jiangxb1987 I was looking at this code again, and I'd 
appreciate your thoughts on how this relates to 
[SPARK-21834](https://issues.apache.org/jira/browse/SPARK-21834)  
https://github.com/apache/spark/pull/19081

I actually think that SPARK-21834 probably solves the bug I was describing 
initially.  I hit the bug on 2.2.0, and didn't properly understand the change 
of SPARK-21834 when proposing this change.  Nonetheless, I still think this fix 
is a good one -- it improves code clarity in general and fixes a couple other 
minor cases.  I'd also link the issues in jira etc. so the relationship is more 
clear.

I'd go even further and suggest that with this fix in, we can actually 
remove SPARK-21834, as its no longer necessary.  its not harmful, but its just 
confusing.

thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...

2018-03-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20888
  
I think you're right about killing the wrong stage, but I don't think its 
exactly what you've outlined.  The original code doesn't try to kill a stage 
with ID == 0 -- instead its just waiting until that volatile is set to 
something > 0, and then proceeds.  that seems to work fine, we do see that the 
stage does get canceled OK once.

However, I think the problem is because the test [runs twice, with and 
without 
codegen](https://github.com/apache/spark/blob/4d37008c78d7d6b8f8a649b375ecc090700eca4f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala#L165).
  The first time, it'll always wait to till the stage Id is set, because of 
that `eventually { ... stageToKill > 0}`.

however, on the second iteration, that `stageToKill` may still be > 0 based 
on the first iteration, not because its been set by the second iteration.  So I 
think you just need to reset the value to -1 between iterations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...

2018-03-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20888
  
hmm you're right, I was looking at a different branch in my editor and 
didn't pay attention that it was reset in the code I linked to on master, oops.

I still dont' understand your proposed solution, though -- how is checking 
`stageToKill != -1` better than checking `stageToKill > 0` in this case?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...

2018-03-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20888
  
> if I execute the test on my machine alone it never pass.

you mean it never fails on your machine, right?  its only flaky when you 
run everything on jenkins?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...

2018-03-23 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20888
  
ah ok, yes when run in isolation, the stage will be 0, so your change makes 
sense.  But that is not what is making it flaky in a full test run


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178959007
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
--- End diff --

sorry my comment was worded very poorly.  Is there any reason you wouldn't 
want to transfer the on-disk blocks?  I assume you'd want to replicate all of 
them, and just needed to change the comments elsewhere.  Intentional tradeoff, 
as users are more likely to limit the amount of in-memory caching to only the 
most important stuff?

Also just to be really precise -- the check below isn't whether the block 
is in-memory currently, its whether its been requested to cache in memory.  It 
may have been cached as MEMORY_AND_DISK, but currently only resides on disk.  
Depending on why you want to limit to in-memory only, this may not be 
applicable.  Maybe you actually want `!useDisk`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178968393
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
--- End diff --

I find `recoveringExecutors` pretty confusing, I think its executors that 
are recovering from some problem, but are going to be OK -- not executors that 
are about to die, which we are recovering data from.  how about 
`drainingExecutors`?  (though I have a feeling this name may have been 
discussed in earlier rounds of comments and this is what we settled on ... if 
so, thats fine.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178964472
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -250,6 +255,44 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // As a heuristic, prioritize replicating the latest rdd. If this 
succeeds,
+  // CacheRecoveryManager will try to replicate the remaining rdds.
--- End diff --

rather than the latest rdd, it would actually make more sense to take 
advantage of the LRU already in the MemoryStore: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L91

but maybe that is not easy to expose.

But I think that also means that the *receiving* end will put the 
replicated block at the back of that LinkedHashMap, even though it really 
hasn't been accessed at all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178967087
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
+.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS)
+.build[String, String]()
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   * @return a sequence of futures of Unit that will complete once the 
executor has been killed.
+   */
+  def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+val canBeRecovered: Future[Seq[String]] = checkMem(execIds)
+
+canBeRecovered.flatMap { execIds =>
+  execIds.foreach { execId => recoveringExecutors.put(execId, execId) }
+  Future.sequence(execIds.map { replicateUntilTimeoutThenKill })
+}
+  }
+
+  def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = {
+val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS)
+val replicationFuture = replicateUntilDone(execId)
+
+Future.firstCompletedOf(List(timeoutFuture, 
replicationFuture)).andThen {
+  case scala.util.Success(DoneRecovering) =>
+logTrace(s"Done recovering blocks on $execId, killing now")
+  case scala.util.Success(Timeout) =>
+logWarning(s"Couldn't recover cache on $execId before 
$forceKillAfterS second timeout")
+  case Failure(ex) =>
+logWarning(s"Error recovering cache on $execId", ex)
+}.andThen {
+  case _ =>
+kill(execId)
+}
+  }
+
+  /**
+   * Given a list of executors that will be shut down, check if there is 
enough free memory on the
+   * rest of the cluster to hold their data. Return a list of just the 
executors for which there
+   * will be enough space. Executors are included smallest first.
+   *
+   * This is a best guess implementation and it is not guaranteed that all 
returned executors
+   * will succeed. For example a block might be too big to fit on any one 
specific executor.
+   *
+   * @param execIds executors which will be shut down
 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178967925
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
+.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS)
+.build[String, String]()
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   * @return a sequence of futures of Unit that will complete once the 
executor has been killed.
+   */
+  def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+val canBeRecovered: Future[Seq[String]] = checkMem(execIds)
+
+canBeRecovered.flatMap { execIds =>
+  execIds.foreach { execId => recoveringExecutors.put(execId, execId) }
+  Future.sequence(execIds.map { replicateUntilTimeoutThenKill })
+}
+  }
+
+  def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = {
+val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS)
+val replicationFuture = replicateUntilDone(execId)
+
+Future.firstCompletedOf(List(timeoutFuture, 
replicationFuture)).andThen {
+  case scala.util.Success(DoneRecovering) =>
+logTrace(s"Done recovering blocks on $execId, killing now")
+  case scala.util.Success(Timeout) =>
+logWarning(s"Couldn't recover cache on $execId before 
$forceKillAfterS second timeout")
+  case Failure(ex) =>
+logWarning(s"Error recovering cache on $execId", ex)
+}.andThen {
+  case _ =>
+kill(execId)
+}
+  }
+
+  /**
+   * Given a list of executors that will be shut down, check if there is 
enough free memory on the
+   * rest of the cluster to hold their data. Return a list of just the 
executors for which there
+   * will be enough space. Executors are included smallest first.
+   *
+   * This is a best guess implementation and it is not guaranteed that all 
returned executors
+   * will succeed. For example a block might be too big to fit on any one 
specific executor.
+   *
+   * @param execIds executors which will be shut down
 

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178966943
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
+.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS)
+.build[String, String]()
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   * @return a sequence of futures of Unit that will complete once the 
executor has been killed.
+   */
+  def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+val canBeRecovered: Future[Seq[String]] = checkMem(execIds)
--- End diff --

shouldn't you immediately kill those executors which dont' pass `checkMem`? 
 are they getting killed somewhere else?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20640#discussion_r179012891
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -571,7 +568,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   cpus + totalCoresAcquired <= maxCores &&
   mem <= offerMem &&
   numExecutors < executorLimit &&
-  slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < 
MAX_SLAVE_FAILURES &&
+  !scheduler.nodeBlacklist().contains(offerHostname) &&
--- End diff --

I just want to make really sure everybody understands the big change in 
behavior here -- `nodeBlacklist()` currently *only* gets updated based on 
failures in *spark* tasks.  If a mesos task fails to even start -- that is, if 
a spark executor fails to launch on a node -- `nodeBlacklist` does not get 
updated.  So you could have a node that is misconfigured somehow, and you might 
end up repeatedly trying to launch executors on it after this changed, with the 
executor even failing to start each time.  That is even if you have 
blacklisting on.

This is SPARK-16630 for the non-mesos case.  That is being actively worked 
on now -- however the work there will probably have to be yarn-specific, so 
there will still be followup work to get the same thing for mesos after that is 
in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20640#discussion_r179013270
  
--- Diff: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 verifyTaskLaunched(driver, "o2")
   }
 
+  test("mesos declines offers from blacklisted slave") {
+setBackend()
+
+// launches a task on a valid offer on slave s1
+val minMem = backend.executorMemory(sc) + 1024
+val minCpu = 4
+val offer1 = Resources(minMem, minCpu)
+offerResources(List(offer1))
+verifyTaskLaunched(driver, "o1")
+
+// for any reason executor(aka mesos task) failed on s1
+val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
+backend.statusUpdate(driver, status)
+when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1"))
--- End diff --

just to re-iterate my point above -- in many cases, having an executor fail 
will *not* lead to `taskScheduler.nodeBlacklist()` changing as you're doing 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20640#discussion_r179012299
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -648,14 +645,8 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   totalGpusAcquired -= gpus
   gpusByTaskId -= taskId
 }
-// If it was a failure, mark the slave as failed for blacklisting 
purposes
 if (TaskState.isFailed(state)) {
-  slave.taskFailures += 1
-
-  if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
-logInfo(s"Blacklisting Mesos slave $slaveId due to too many 
failures; " +
-"is Spark installed on it?")
-  }
+  logError(s"Task $taskId failed on Mesos slave $slaveId.")
--- End diff --

minor: I think it would be nice to say "Mesos task $taskId...".  Maybe its 
obvious for those spending more time with mesos, but I find I'm easily confused 
by the difference between a mesos task and a spark task.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179179616
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.util.Try
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage._
+
+/**
+ * This is an integration test for the cache recovery feature using a 
local spark cluster. It
+ * extends the unit tests in CacheRecoveryManagerSuite which mocks a lot 
of cluster infrastructure.
+ */
+class CacheRecoveryIntegrationSuite extends SparkFunSuite
+with Matchers
+with BeforeAndAfterEach
+with BeforeAndAfterAll
+with Eventually {
+
+  private var conf: SparkConf = makeBaseConf()
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, 
"shuffle", numUsableCores = 4)
+  private val rpcHandler = new ExternalShuffleBlockHandler(transportConf, 
null)
+  private val transportContext = new TransportContext(transportConf, 
rpcHandler)
+  private val shuffleService = transportContext.createServer()
+  private var sc: SparkContext = _
+
+  private def makeBaseConf() = new SparkConf()
+.setAppName("test")
+.setMaster("local-cluster[4, 1, 512]")
+.set("spark.dynamicAllocation.enabled", "true")
+.set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always
+.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s")
+.set(EXECUTOR_MEMORY.key, "512m")
+.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s")
+.set(EXECUTOR_INSTANCES.key, "1")
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4")
+.set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3")
+
+  override def beforeEach(): Unit = {
+conf = makeBaseConf()
+conf.set("spark.shuffle.service.port", shuffleService.getPort.toString)
+  }
+
+  override def afterEach(): Unit = {
+sc.stop()
+conf = null
+  }
+
+  override def afterAll(): Unit = {
+shuffleService.close()
+  }
+
+  private def getLocations(
+  sc: SparkContext,
+  rdd: RDD[_]): Map[BlockId, Map[BlockManagerId, BlockStatus]] = {
+import scala.collection.breakOut
+val blockIds: Array[BlockId] = rdd.partitions.map(p => 
RDDBlockId(rdd.id, p.index))
+blockIds.map { id =>
+  id -> 
Try(sc.env.blockManager.master.getBlockStatus(id)).getOrElse(Map.empty)
+}(breakOut)
+  }
+
+  test("cached data is replicated before dynamic de-allocation") {
+sc = new SparkContext(conf)
+TestUtils.waitUntilExecutorsUp(sc, 4, 6)
+
+val rdd = sc.parallelize(1 to 1000, 4).map(_ * 4).cache()
+rdd.reduce(_ + _) shouldBe 2002000
+sc.getExecutorIds().size shouldBe 4
+getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty } 
shouldBe true
+
+eventually(timeout(Span(5, Seconds)), interval(Span(1, Seconds))) {
+  sc.getExecutorIds().size shouldBe 3
+  getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179182498
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -420,63 +432,53 @@ private[spark] class ExecutorAllocationManager(
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
-  private def removeExecutors(executors: Seq[String]): Seq[String] = 
synchronized {
-val executorIdsToBeRemoved = new ArrayBuffer[String]
-
+  private def removeExecutors(executors: Seq[String]): Unit = synchronized 
{
--- End diff --

Since you're not actually removing executors here immediately with the new 
cache recovery path, you should update the doc to describe that too.

Also we only mention the return type if we've got something interesting to 
say about it, so you can just skip mentioning it entirely.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179183962
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -449,18 +457,26 @@ class ExecutorAllocationManagerSuite
 assert(executorIds(manager).size === 12)
 assert(numExecutorsTarget(manager) === 8)
 
-assert(removeExecutor(manager, "1"))
-assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", 
"4"))
-assert(!removeExecutor(manager, "5")) // lower limit reached
-assert(!removeExecutor(manager, "6"))
+removeExecutor(manager, "1")
+assert(executorsPendingToRemove(manager).contains("1"))
+removeExecutors(manager, Seq("2", "3", "4"))
+assert(executorsPendingToRemove(manager).contains("2"))
+assert(executorsPendingToRemove(manager).contains("3"))
+assert(executorsPendingToRemove(manager).contains("4"))
+removeExecutor(manager, "5") // lower limit reached
+assert(!executorsPendingToRemove(manager).contains("5"))
+removeExecutor(manager, "6")
+assert(!executorsPendingToRemove(manager).contains("6"))
+removeExecutor(manager, "5") // lower limit reached
--- End diff --

I don't think this is supposed to be here twice (also on L466)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179174442
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.util.Try
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage._
+
+/**
+ * This is an integration test for the cache recovery feature using a 
local spark cluster. It
+ * extends the unit tests in CacheRecoveryManagerSuite which mocks a lot 
of cluster infrastructure.
+ */
+class CacheRecoveryIntegrationSuite extends SparkFunSuite
+with Matchers
+with BeforeAndAfterEach
+with BeforeAndAfterAll
+with Eventually {
+
+  private var conf: SparkConf = makeBaseConf()
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, 
"shuffle", numUsableCores = 4)
+  private val rpcHandler = new ExternalShuffleBlockHandler(transportConf, 
null)
+  private val transportContext = new TransportContext(transportConf, 
rpcHandler)
+  private val shuffleService = transportContext.createServer()
+  private var sc: SparkContext = _
+
+  private def makeBaseConf() = new SparkConf()
+.setAppName("test")
+.setMaster("local-cluster[4, 1, 512]")
+.set("spark.dynamicAllocation.enabled", "true")
+.set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always
+.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s")
+.set(EXECUTOR_MEMORY.key, "512m")
+.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s")
+.set(EXECUTOR_INSTANCES.key, "1")
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4")
+.set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3")
+
+  override def beforeEach(): Unit = {
+conf = makeBaseConf()
+conf.set("spark.shuffle.service.port", shuffleService.getPort.toString)
+  }
+
+  override def afterEach(): Unit = {
+sc.stop()
+conf = null
+  }
+
+  override def afterAll(): Unit = {
+shuffleService.close()
+  }
+
+  private def getLocations(
+  sc: SparkContext,
+  rdd: RDD[_]): Map[BlockId, Map[BlockManagerId, BlockStatus]] = {
+import scala.collection.breakOut
+val blockIds: Array[BlockId] = rdd.partitions.map(p => 
RDDBlockId(rdd.id, p.index))
+blockIds.map { id =>
+  id -> 
Try(sc.env.blockManager.master.getBlockStatus(id)).getOrElse(Map.empty)
+}(breakOut)
+  }
+
+  test("cached data is replicated before dynamic de-allocation") {
+sc = new SparkContext(conf)
+TestUtils.waitUntilExecutorsUp(sc, 4, 6)
+
+val rdd = sc.parallelize(1 to 1000, 4).map(_ * 4).cache()
+rdd.reduce(_ + _) shouldBe 2002000
+sc.getExecutorIds().size shouldBe 4
+getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty } 
shouldBe true
+
+eventually(timeout(Span(5, Seconds)), interval(Span(1, Seconds))) {
+  sc.getExecutorIds().size shouldBe 3
+  getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179191524
  
--- Diff: 
core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.CacheRecoveryManager._
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rpc._
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+class CacheRecoveryManagerSuite
+  extends SparkFunSuite with MockitoSugar with Matchers {
+
+  val oneGB: Long = ByteUnit.GiB.toBytes(1).toLong
+
+  val plentyOfMem = Map(
+BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB)))
+
+  test("replicate blocks until empty and then kill executor") {
+val conf = new SparkConf()
+val eam = mock[ExecutorAllocationManager]
+val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1"))
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe DoneRecovering
--- End diff --

rather than just checking head, you can check the whole result as easily.  
and also here, prefer assert over shouldBe

```
assert(results === Seq(DoneRecovering))
```

throughout this file


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179171981
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.util.Try
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage._
+
+/**
+ * This is an integration test for the cache recovery feature using a 
local spark cluster. It
+ * extends the unit tests in CacheRecoveryManagerSuite which mocks a lot 
of cluster infrastructure.
+ */
+class CacheRecoveryIntegrationSuite extends SparkFunSuite
+with Matchers
+with BeforeAndAfterEach
+with BeforeAndAfterAll
+with Eventually {
+
+  private var conf: SparkConf = makeBaseConf()
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, 
"shuffle", numUsableCores = 4)
+  private val rpcHandler = new ExternalShuffleBlockHandler(transportConf, 
null)
+  private val transportContext = new TransportContext(transportConf, 
rpcHandler)
+  private val shuffleService = transportContext.createServer()
+  private var sc: SparkContext = _
+
+  private def makeBaseConf() = new SparkConf()
+.setAppName("test")
+.setMaster("local-cluster[4, 1, 512]")
+.set("spark.dynamicAllocation.enabled", "true")
+.set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always
+.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s")
+.set(EXECUTOR_MEMORY.key, "512m")
+.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s")
+.set(EXECUTOR_INSTANCES.key, "1")
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4")
+.set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3")
+
+  override def beforeEach(): Unit = {
+conf = makeBaseConf()
+conf.set("spark.shuffle.service.port", shuffleService.getPort.toString)
+  }
+
+  override def afterEach(): Unit = {
+sc.stop()
+conf = null
+  }
+
+  override def afterAll(): Unit = {
+shuffleService.close()
+  }
+
+  private def getLocations(
+  sc: SparkContext,
+  rdd: RDD[_]): Map[BlockId, Map[BlockManagerId, BlockStatus]] = {
+import scala.collection.breakOut
+val blockIds: Array[BlockId] = rdd.partitions.map(p => 
RDDBlockId(rdd.id, p.index))
+blockIds.map { id =>
+  id -> 
Try(sc.env.blockManager.master.getBlockStatus(id)).getOrElse(Map.empty)
+}(breakOut)
+  }
+
+  test("cached data is replicated before dynamic de-allocation") {
+sc = new SparkContext(conf)
+TestUtils.waitUntilExecutorsUp(sc, 4, 6)
+
+val rdd = sc.parallelize(1 to 1000, 4).map(_ * 4).cache()
+rdd.reduce(_ + _) shouldBe 2002000
--- End diff --

I know there is a mix in the existing code, but we prefer to use asserts 
rather than "should" matchers now -- there have been some cases where a small 
syntax mistake makes should matchers do nothing, though code still compiles and 
looks ok.  and I think its less familiar to most people


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179206187
  
--- Diff: 
core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.CacheRecoveryManager._
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rpc._
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+class CacheRecoveryManagerSuite
+  extends SparkFunSuite with MockitoSugar with Matchers {
+
+  val oneGB: Long = ByteUnit.GiB.toBytes(1).toLong
+
+  val plentyOfMem = Map(
+BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB)))
+
+  test("replicate blocks until empty and then kill executor") {
+val conf = new SparkConf()
+val eam = mock[ExecutorAllocationManager]
+val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1"))
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe DoneRecovering
+  verify(eam).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 2
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("kill executor if it takes too long to replicate") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1), RDDBlockId(2, 1), RDDBlockId(3, 1), 
RDDBlockId(4, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem, pauseIndefinitely = 
true)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe Timeout
+  verify(eam, times(1)).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 1
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("shutdown timer will get cancelled if replication finishes") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179196510
  
--- Diff: 
core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.CacheRecoveryManager._
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rpc._
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+class CacheRecoveryManagerSuite
+  extends SparkFunSuite with MockitoSugar with Matchers {
+
+  val oneGB: Long = ByteUnit.GiB.toBytes(1).toLong
+
+  val plentyOfMem = Map(
+BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB)))
+
+  test("replicate blocks until empty and then kill executor") {
+val conf = new SparkConf()
+val eam = mock[ExecutorAllocationManager]
+val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1"))
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe DoneRecovering
+  verify(eam).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 2
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("kill executor if it takes too long to replicate") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1), RDDBlockId(2, 1), RDDBlockId(3, 1), 
RDDBlockId(4, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem, pauseIndefinitely = 
true)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe Timeout
+  verify(eam, times(1)).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 1
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("shutdown timer will get cancelled if replication finishes") {
--- End diff --

I don't see any meaningful difference between this and "replicate blocks 
until empty and then kill executor".  This isn't actually checking the timer is 
canceled


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179180337
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.util.Try
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage._
+
+/**
+ * This is an integration test for the cache recovery feature using a 
local spark cluster. It
+ * extends the unit tests in CacheRecoveryManagerSuite which mocks a lot 
of cluster infrastructure.
+ */
+class CacheRecoveryIntegrationSuite extends SparkFunSuite
+with Matchers
+with BeforeAndAfterEach
+with BeforeAndAfterAll
+with Eventually {
+
+  private var conf: SparkConf = makeBaseConf()
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, 
"shuffle", numUsableCores = 4)
+  private val rpcHandler = new ExternalShuffleBlockHandler(transportConf, 
null)
+  private val transportContext = new TransportContext(transportConf, 
rpcHandler)
+  private val shuffleService = transportContext.createServer()
+  private var sc: SparkContext = _
+
+  private def makeBaseConf() = new SparkConf()
+.setAppName("test")
+.setMaster("local-cluster[4, 1, 512]")
+.set("spark.dynamicAllocation.enabled", "true")
+.set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always
+.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s")
+.set(EXECUTOR_MEMORY.key, "512m")
+.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s")
+.set(EXECUTOR_INSTANCES.key, "1")
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4")
+.set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3")
+
+  override def beforeEach(): Unit = {
+conf = makeBaseConf()
+conf.set("spark.shuffle.service.port", shuffleService.getPort.toString)
+  }
+
+  override def afterEach(): Unit = {
+sc.stop()
+conf = null
+  }
+
+  override def afterAll(): Unit = {
+shuffleService.close()
+  }
+
+  private def getLocations(
+  sc: SparkContext,
+  rdd: RDD[_]): Map[BlockId, Map[BlockManagerId, BlockStatus]] = {
+import scala.collection.breakOut
+val blockIds: Array[BlockId] = rdd.partitions.map(p => 
RDDBlockId(rdd.id, p.index))
+blockIds.map { id =>
+  id -> 
Try(sc.env.blockManager.master.getBlockStatus(id)).getOrElse(Map.empty)
+}(breakOut)
+  }
+
+  test("cached data is replicated before dynamic de-allocation") {
+sc = new SparkContext(conf)
+TestUtils.waitUntilExecutorsUp(sc, 4, 6)
+
+val rdd = sc.parallelize(1 to 1000, 4).map(_ * 4).cache()
+rdd.reduce(_ + _) shouldBe 2002000
+sc.getExecutorIds().size shouldBe 4
+getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty } 
shouldBe true
+
+eventually(timeout(Span(5, Seconds)), interval(Span(1, Seconds))) {
+  sc.getExecutorIds().size shouldBe 3
+  getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty }

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179201817
  
--- Diff: 
core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.CacheRecoveryManager._
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rpc._
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+class CacheRecoveryManagerSuite
+  extends SparkFunSuite with MockitoSugar with Matchers {
+
+  val oneGB: Long = ByteUnit.GiB.toBytes(1).toLong
+
+  val plentyOfMem = Map(
+BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB)))
+
+  test("replicate blocks until empty and then kill executor") {
+val conf = new SparkConf()
+val eam = mock[ExecutorAllocationManager]
+val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1"))
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe DoneRecovering
+  verify(eam).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 2
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("kill executor if it takes too long to replicate") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1), RDDBlockId(2, 1), RDDBlockId(3, 1), 
RDDBlockId(4, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem, pauseIndefinitely = 
true)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe Timeout
+  verify(eam, times(1)).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 1
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("shutdown timer will get cancelled if replication finishes") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179200666
  
--- Diff: 
core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.CacheRecoveryManager._
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rpc._
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+class CacheRecoveryManagerSuite
+  extends SparkFunSuite with MockitoSugar with Matchers {
+
+  val oneGB: Long = ByteUnit.GiB.toBytes(1).toLong
+
+  val plentyOfMem = Map(
+BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB)))
+
+  test("replicate blocks until empty and then kill executor") {
+val conf = new SparkConf()
+val eam = mock[ExecutorAllocationManager]
+val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1"))
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe DoneRecovering
+  verify(eam).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 2
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("kill executor if it takes too long to replicate") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1), RDDBlockId(2, 1), RDDBlockId(3, 1), 
RDDBlockId(4, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem, pauseIndefinitely = 
true)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe Timeout
+  verify(eam, times(1)).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 1
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("shutdown timer will get cancelled if replication finishes") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179206819
  
--- Diff: 
core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.CacheRecoveryManager._
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rpc._
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+class CacheRecoveryManagerSuite
+  extends SparkFunSuite with MockitoSugar with Matchers {
+
+  val oneGB: Long = ByteUnit.GiB.toBytes(1).toLong
+
+  val plentyOfMem = Map(
+BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB)))
+
+  test("replicate blocks until empty and then kill executor") {
+val conf = new SparkConf()
+val eam = mock[ExecutorAllocationManager]
+val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1"))
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe DoneRecovering
+  verify(eam).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 2
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("kill executor if it takes too long to replicate") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1), RDDBlockId(2, 1), RDDBlockId(3, 1), 
RDDBlockId(4, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem, pauseIndefinitely = 
true)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe Timeout
+  verify(eam, times(1)).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 1
+} finally {
+  cacheRecoveryManager.stop()
+}
+  }
+
+  test("shutdown timer will get cancelled if replication finishes") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179266769
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -252,6 +257,44 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // As a heuristic, prioritize replicating the latest rdd. If this 
succeeds,
+  // CacheRecoveryManager will try to replicate the remaining rdds.
+  firstBlock <- if (blocks.isEmpty) None else 
Some(blocks.maxBy(_.rddId))
+  replicaSet <- blockLocations.asScala.get(firstBlock)
+  // Add 2 to force this block to be replicated to one new executor.
+  maxReps = replicaSet.size + 2
--- End diff --

I figured out why you need +2 instead of +1.  The existing code wants you 
to explicitly *remove* id of the blockManager you're trying to replicate from 
in `replicaSet`.  See:


https://github.com/apache/spark/blob/cccaaa14ad775fb981e501452ba2cc06ff5c0f0a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L236-L239

While the existing code is confusing, I definitely don't like using +2 here 
as a workaround, as it gets pretty confusing.  I'd at least update the comments 
on `BlockManager.replicate()` etc., or maybe just change its behavior and 
update the callsites.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2018-04-04 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19041
  
thanks for the updates @brad-kaiser.  I think I understand and don't have 
any major concerns.  It doesn't seem easy to use the LRU from MemoryStore, so 
can set that aside for now

btw as you push updates, I'd prefer to just add new commits on top (even 
merges to master), as that makes it easier for reviewers to see incremental 
changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179508180
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
+.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS)
+.build[String, String]()
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   * @return a sequence of futures of Unit that will complete once the 
executor has been killed.
+   */
+  def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+val canBeRecovered: Future[Seq[String]] = checkMem(execIds)
+
+canBeRecovered.flatMap { execIds =>
+  execIds.foreach { execId => recoveringExecutors.put(execId, execId) }
+  Future.sequence(execIds.map { replicateUntilTimeoutThenKill })
+}
+  }
+
+  def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = {
+val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS)
+val replicationFuture = replicateUntilDone(execId)
+
+Future.firstCompletedOf(List(timeoutFuture, 
replicationFuture)).andThen {
--- End diff --

I dont' think this will do what you want.  Try this code (if you do it in 
the scala repl, be sure to use )

```scala
import scala.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}

val scheduler = Executors.newSingleThreadScheduledExecutor()
val pool = Executors.newSingleThreadExecutor()
implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(pool)

def returnAfterTimeout[T](value: T, seconds: Long): Future[T] = {
val p = Promise[T]()
val runnable = new Runnable {
  def run(): Unit = { println("time's up"); p.success(value) }
}
scheduler.schedule(runnable, seconds, TimeUnit.SECONDS)
p.future
  }

def printStuff(x: Int): String = {
  (0 until x).foreach{ i => println(i); Thread.sleep(1000)}
  "done"
} 

// The *value* of the future is correct here, but you'll notice the timer 
keeps going, we 

[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2018-04-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19041
  
hey @brad-kaiser lemme temper what I said in my previous comments a bit -- 
I understand what you're doing here now and I think it makes sense, i don't see 
any serious design issues.  But this is adding something new to a pretty core 
area of spark, so expect some time still on reviews etc.  I also think you 
should probably go through the SPIP process -- though its not huge, I think its 
better to increase visibility a bit: 
https://spark.apache.org/improvement-proposals.html

anyway still think this is looking good, but want to set expectations on 
what is left to do here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179585326
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
--- End diff --

I don't have a strong opinion, but it sounds like your original approach 
was motivated by a small set of just the in-memory cached blocks.  Lets stick 
to in-memory only blocks for this version (`!useDisk`) as that limits the 
blocks we try to replicated, and we can consider relaxing that in the future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20987: [SPARK-23816][CORE] Killed tasks should ignore Fe...

2018-04-05 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/20987

[SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-23816

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20987.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20987


commit d886ba3840ab06cd3a5d9dea7d47a8e156d5eb72
Author: Imran Rashid 
Date:   2018-04-05T16:29:01Z

[SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20987: [SPARK-23816][CORE] Killed tasks should ignore FetchFail...

2018-04-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20987
  
pinging some potential reviewers: @tgravescs @kayousterhout @zsxwing 
@mridulm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20987: [SPARK-23816][CORE] Killed tasks should ignore Fe...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20987#discussion_r179776576
  
--- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
---
@@ -330,6 +362,15 @@ class FetchFailureHidingRDD(
   case t: Throwable =>
 if (throwOOM) {
   throw new OutOfMemoryError("OOM while handling another 
exception")
+} else if (interrupt) {
+  // make sure our test is setup correctly
+  
assert(TaskContext.get().asInstanceOf[TaskContextImpl].fetchFailed.isDefined)
+  // signal our test is ready for the task to get killed
--- End diff --

I prefer the original comment -- the mechanics of what is waiting on the 
latch are easy enough to follow, its more important to explain why.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20987: [SPARK-23816][CORE] Killed tasks should ignore Fe...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20987#discussion_r179776639
  
--- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
---
@@ -173,8 +173,26 @@ class ExecutorSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSug
   }
 
   test("SPARK-19276: OOMs correctly handled with a FetchFailure") {
+val (failReason, uncaughtExceptionHandler) = 
testFetchFailureHandling(true)
+assert(failReason.isInstanceOf[ExceptionFailure])
+val exceptionCaptor = ArgumentCaptor.forClass(classOf[Throwable])
+verify(uncaughtExceptionHandler).uncaughtException(any(), 
exceptionCaptor.capture())
+assert(exceptionCaptor.getAllValues.size === 1)
+
assert(exceptionCaptor.getAllValues().get(0).isInstanceOf[OutOfMemoryError])
+  }
+
+  test(s"SPARK-23816: interrupts are not masked by a FetchFailure") {
+// If killing the task causes a fetch failure, we still treat it as a 
task that was killed,
+// as the fetch failure could easily be caused by interrupting the 
thread.
+val (failReason, _) = testFetchFailureHandling(false)
+assert(failReason.isInstanceOf[TaskKilled])
+  }
+
+  def testFetchFailureHandling(oom: Boolean): (TaskFailedReason, 
UncaughtExceptionHandler) = {
--- End diff --

yeah good point, this is pretty confusing now, I pushed an update with more 
comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20987: [SPARK-23816][CORE] Killed tasks should ignore Fe...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20987#discussion_r179776800
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -494,19 +507,6 @@ private[spark] class Executor(
   setTaskFinishedAndClearInterruptStatus()
   execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(reason))
 
-case t: TaskKilledException =>
-  logInfo(s"Executor killed $taskName (TID $taskId), reason: 
${t.reason}")
-  setTaskFinishedAndClearInterruptStatus()
-  execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled(t.reason)))
-
-case _: InterruptedException | NonFatal(_) if
-task != null && task.reasonIfKilled.isDefined =>
-  val killReason = task.reasonIfKilled.getOrElse("unknown reason")
-  logInfo(s"Executor interrupted and killed $taskName (TID 
$taskId), reason: $killReason")
-  setTaskFinishedAndClearInterruptStatus()
-  execBackend.statusUpdate(
-taskId, TaskState.KILLED, 
ser.serialize(TaskKilled(killReason)))
-
 case CausedBy(cDE: CommitDeniedException) =>
--- End diff --

I shoudl have caught it too :)  Kay mentioned OOM handling on the original 
pr, but we didn't think about interrupts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20987: [SPARK-23816][CORE] Killed tasks should ignore FetchFail...

2018-04-06 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20987
  
> Things I'm concerned about is that does there exists a situation like 'a 
task gets killed after it gets a FetchFailure, but re-run later and gets a 
FetchFailure too without TaskKilledException' (or this fix against speculative 
tasks only).

I don't think its worth trying to be fancy here -- in almost all 
situations, we don't care about the fetch failure handling when the task is 
killed.  Even if this task is not speculative, it could be that its killed 
because *another* speculative task finished, and so this one gets aborted.

Suppose there is a real fetch failure, and you just happen to kill the task 
*just* after that.  Since the task was killed, that means you don't really care 
about the input shuffle data at the moment in any case.  You *might* run 
another job later on which tries to read the same shuffle data, and then it'll 
have to rediscover the missing shuffle data.  But, thats about it.  oh well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179797706
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
+SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) =
+SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) =
+SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInf

[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179801207
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
+SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) =
+SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) =
+SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInf

[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179796802
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
 
   def hasMemoryInfo: Boolean = totalOnHeap >= 0L
 
+  // peak values for executor level metrics
+  var peakExecutorMetrics = new PeakExecutorMetrics
--- End diff --

I think this can be a `val` -- its mutated, not replaced

Also I don't see any tests that this is updated correctly, both in a live 
app and when replayed from the limited logs?  OTOH, maybe you should just 
remove this from this change, and wait till SPARK-23431 as there you'll need to 
add something else to expose this for each executor / stage, which may be more 
important than just the overall total for the executor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179796239
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +94,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // Peak metric values for each executor
+  private var peakExecutorMetrics = new mutable.HashMap[String, 
PeakExecutorMetrics]()
--- End diff --

you need to handle overlapping stages.  I think you actually need to key on 
both executor and stage, and on stage end, you only clear the metrics for that 
stage.

EDIT: ok after I went through everything, I think I see how this works -- 
since you log on every new peak, you'll also get a logged message for the 
earlier update.  But as I mention below, this strategy seems like it'll result 
in a lot of extra logging.  Maybe I'm wrong, though, would be great to see how 
much the logs grow this way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179797803
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
--- End diff --

nit: multiline methods enclosed in `{}`, here and elsewhere, even if body 
is only one line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179803848
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

What about including the jvms direct & memory mapped usage as well? see 
https://issues.apache.org/jira/browse/SPARK-22483


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179795171
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

I wouldn't think you'd want to log for every new peak, as I'd expect it 
would be natural for the peak to keep growing, so you'd just end up with a lot 
of logs.  I'd expect you'd just log the peak when the stage ended, or when the 
executor died.

the downside of that approach is that you never log a peak if the driver 
dies ... but then you've got to figure out the driver issue anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179802392
  
--- Diff: core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
---
@@ -654,6 +681,25 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
 assert(metrics1.bytesRead === metrics2.bytesRead)
   }
 
+  private def assertEquals(metrics1: Option[ExecutorMetrics], metrics2: 
Option[ExecutorMetrics]) {
+metrics1 match {
+  case Some(m1) =>
+metrics2 match {
+  case Some(m2) =>
+assert(m1.timestamp === m2.timestamp)
+assert(m1.jvmUsedMemory === m2.jvmUsedMemory)
+assert(m1.onHeapExecutionMemory === m2.onHeapExecutionMemory)
+assert(m1.offHeapExecutionMemory === m2.offHeapExecutionMemory)
+assert(m1.onHeapStorageMemory === m2.onHeapStorageMemory)
+assert(m1.offHeapStorageMemory === m2.offHeapStorageMemory)
+  case None =>
+assert(false)
+}
+  case None =>
+assert(metrics2.isEmpty)
--- End diff --

this  version looks correct, but I think the matching I mentioned above is 
a little cleaner.  And then you should be able to have 
`EventLoggingListenerSuite` jsut use this method rather than repeating.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20987: [SPARK-23816][CORE] Killed tasks should ignore Fe...

2018-04-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20987#discussion_r179931481
  
--- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
---
@@ -257,19 +281,32 @@ class ExecutorSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSug
   }
 
   private def runTaskAndGetFailReason(taskDescription: TaskDescription): 
TaskFailedReason = {
-runTaskGetFailReasonAndExceptionHandler(taskDescription)._1
+runTaskGetFailReasonAndExceptionHandler(taskDescription, false)._1
   }
 
   private def runTaskGetFailReasonAndExceptionHandler(
-  taskDescription: TaskDescription): (TaskFailedReason, 
UncaughtExceptionHandler) = {
+  taskDescription: TaskDescription,
+  killTask: Boolean): (TaskFailedReason, UncaughtExceptionHandler) = {
 val mockBackend = mock[ExecutorBackend]
 val mockUncaughtExceptionHandler = mock[UncaughtExceptionHandler]
 var executor: Executor = null
+var killingThread: Thread = null
--- End diff --

yeah good point -- I was originally thinking of that but I don't think that 
is needed.  however I did get rid of the indefinite awaits.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20987: [SPARK-23816][CORE] Killed tasks should ignore FetchFail...

2018-04-07 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20987
  
I filed https://issues.apache.org/jira/browse/SPARK-23894 for the test 
failure -- appears to be a flaky test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20925: [SPARK-22941][core] Do not exit JVM when submit f...

2018-04-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20925#discussion_r179987409
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java 
---
@@ -154,9 +165,17 @@
 
   List buildSparkSubmitArgs() {
 List args = new ArrayList<>();
-SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
+OptionParser parser = new OptionParser(false);
+boolean isStartingApp = isAppResourceReq;
+
+// If the user args array is not empty, we need to parse it to detect 
exactly what
+// the user is trying to run, so that checks below are correct.
+if (!userArgs.isEmpty()) {
+  parser.parse(userArgs);
+  isStartingApp = parser.isAppResourceReq;
--- End diff --

I don't really care whether the name is `isStartingApp` or 
`isAppResourceReq`, but seems the name should be same here and in OptionParser, 
unless there is some difference I'm missing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20925: [SPARK-22941][core] Do not exit JVM when submit f...

2018-04-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20925#discussion_r179952361
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -775,17 +781,17 @@ class SparkSubmitSuite
   }
 
   test("SPARK_CONF_DIR overrides spark-defaults.conf") {
-forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
+forConfDir(Map("spark.executor.memory" -> "3g")) { path =>
--- End diff --

why this change?  you no longer support fractional values?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    3   4   5   6   7   8   9   10   11   >