spark git commit: [SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of SparkLauncher

2016-06-06 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5363783af -> 62765cbeb


[SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of 
SparkLauncher

## What changes were proposed in this pull request?
This situation can happen when the LauncherConnection gets an exception while 
reading through the socket and terminating silently without notifying making 
the client/listener think that the job is still in previous state.
The fix force sends a notification to client that the job finished with unknown 
status and let client handle it accordingly.

## How was this patch tested?
Added a unit test.

Author: Subroto Sanyal 

Closes #13497 from subrotosanyal/SPARK-15652-handle-spark-submit-jvm-crash.

(cherry picked from commit c409e23abd128dad33557025f1e824ef47e6222f)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62765cbe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62765cbe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62765cbe

Branch: refs/heads/branch-2.0
Commit: 62765cbebe0cb41b0c4fdc344828337ee15e1fd2
Parents: 5363783
Author: Subroto Sanyal 
Authored: Mon Jun 6 16:05:40 2016 -0700
Committer: Marcelo Vanzin 
Committed: Mon Jun 6 16:05:52 2016 -0700

--
 .../apache/spark/launcher/LauncherServer.java   |  4 +++
 .../apache/spark/launcher/SparkAppHandle.java   |  4 ++-
 .../spark/launcher/LauncherServerSuite.java | 31 
 3 files changed, 38 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
--
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java 
b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index e3413fd..28e9420 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -337,6 +337,10 @@ class LauncherServer implements Closeable {
   }
   super.close();
   if (handle != null) {
+   if (!handle.getState().isFinal()) {
+ LOG.log(Level.WARNING, "Lost connection to spark application.");
+ handle.setState(SparkAppHandle.State.LOST);
+   }
 handle.disconnect();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
--
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 625d026..0aa7bd1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -46,7 +46,9 @@ public interface SparkAppHandle {
 /** The application finished with a failed status. */
 FAILED(true),
 /** The application was killed. */
-KILLED(true);
+KILLED(true),
+/** The Spark Submit JVM exited with a unknown status. */
+LOST(true);
 
 private final boolean isFinal;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
--
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index bfe1fcc..12f1a0c 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -152,6 +152,37 @@ public class LauncherServerSuite extends BaseSuite {
 }
   }
 
+  @Test
+  public void testSparkSubmitVmShutsDown() throws Exception {
+ChildProcAppHandle handle = LauncherServer.newAppHandle();
+TestClient client = null;
+final Semaphore semaphore = new Semaphore(0);
+try {
+  Socket s = new Socket(InetAddress.getLoopbackAddress(),
+LauncherServer.getServerInstance().getPort());
+  handle.addListener(new SparkAppHandle.Listener() {
+public void stateChanged(SparkAppHandle handle) {
+  semaphore.release();
+}
+public void infoChanged(SparkAppHandle handle) {
+  semaphore.release();
+}
+  });
+  client = new TestClient(s);
+  client.send(new Hello(handle.getSecret(), "1.4.0"));
+  assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+  // Make sure the server 

spark git commit: [SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of SparkLauncher

2016-06-06 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 36d3dfa59 -> c409e23ab


[SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of 
SparkLauncher

## What changes were proposed in this pull request?
This situation can happen when the LauncherConnection gets an exception while 
reading through the socket and terminating silently without notifying making 
the client/listener think that the job is still in previous state.
The fix force sends a notification to client that the job finished with unknown 
status and let client handle it accordingly.

## How was this patch tested?
Added a unit test.

Author: Subroto Sanyal 

Closes #13497 from subrotosanyal/SPARK-15652-handle-spark-submit-jvm-crash.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c409e23a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c409e23a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c409e23a

Branch: refs/heads/master
Commit: c409e23abd128dad33557025f1e824ef47e6222f
Parents: 36d3dfa
Author: Subroto Sanyal 
Authored: Mon Jun 6 16:05:40 2016 -0700
Committer: Marcelo Vanzin 
Committed: Mon Jun 6 16:05:40 2016 -0700

--
 .../apache/spark/launcher/LauncherServer.java   |  4 +++
 .../apache/spark/launcher/SparkAppHandle.java   |  4 ++-
 .../spark/launcher/LauncherServerSuite.java | 31 
 3 files changed, 38 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c409e23a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
--
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java 
b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index e3413fd..28e9420 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -337,6 +337,10 @@ class LauncherServer implements Closeable {
   }
   super.close();
   if (handle != null) {
+   if (!handle.getState().isFinal()) {
+ LOG.log(Level.WARNING, "Lost connection to spark application.");
+ handle.setState(SparkAppHandle.State.LOST);
+   }
 handle.disconnect();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c409e23a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
--
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 625d026..0aa7bd1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -46,7 +46,9 @@ public interface SparkAppHandle {
 /** The application finished with a failed status. */
 FAILED(true),
 /** The application was killed. */
-KILLED(true);
+KILLED(true),
+/** The Spark Submit JVM exited with a unknown status. */
+LOST(true);
 
 private final boolean isFinal;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c409e23a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
--
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index bfe1fcc..12f1a0c 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -152,6 +152,37 @@ public class LauncherServerSuite extends BaseSuite {
 }
   }
 
+  @Test
+  public void testSparkSubmitVmShutsDown() throws Exception {
+ChildProcAppHandle handle = LauncherServer.newAppHandle();
+TestClient client = null;
+final Semaphore semaphore = new Semaphore(0);
+try {
+  Socket s = new Socket(InetAddress.getLoopbackAddress(),
+LauncherServer.getServerInstance().getPort());
+  handle.addListener(new SparkAppHandle.Listener() {
+public void stateChanged(SparkAppHandle handle) {
+  semaphore.release();
+}
+public void infoChanged(SparkAppHandle handle) {
+  semaphore.release();
+}
+  });
+  client = new TestClient(s);
+  client.send(new Hello(handle.getSecret(), "1.4.0"));
+  assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+  // Make sure the server matched the client to the handle.
+  assertNotNull(handle.getConnection());
+  close(client);
+