Repository: spark
Updated Branches:
  refs/heads/branch-2.3 e58c4a929 -> 20c69816a


[SPARK-23020][CORE] Fix races in launcher code, test.

The race in the code is because the handle might update
its state to the wrong state if the connection handling
thread is still processing incoming data; so the handle
needs to wait for the connection to finish up before
checking the final state.

The race in the test is because when waiting for a handle
to reach a final state, the waitFor() method needs to wait
until all handle state is updated (which also includes
waiting for the connection thread above to finish).
Otherwise, waitFor() may return too early, which would cause
a bunch of different races (like the listener not being yet
notified of the state change, or being in the middle of
being notified, or the handle not being properly disposed
and causing postChecks() to assert).

On top of that I found, by code inspection, a couple of
potential races that could make a handle end up in the
wrong state when being killed.

Tested by running the existing unit tests a lot (and not
seeing the errors I was seeing before).

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20223 from vanzin/SPARK-23020.

(cherry picked from commit 66217dac4f8952a9923625908ad3dcb030763c81)
Signed-off-by: Sameer Agarwal <samee...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20c69816
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20c69816
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20c69816

Branch: refs/heads/branch-2.3
Commit: 20c69816a63071b82b1035d4b48798c358206421
Parents: e58c4a9
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Mon Jan 15 22:40:44 2018 -0800
Committer: Sameer Agarwal <samee...@apache.org>
Committed: Mon Jan 15 22:41:26 2018 -0800

----------------------------------------------------------------------
 .../spark/launcher/SparkLauncherSuite.java      | 49 +++++++++++++-------
 .../spark/launcher/AbstractAppHandle.java       | 22 +++++++--
 .../spark/launcher/ChildProcAppHandle.java      | 18 +++----
 .../spark/launcher/InProcessAppHandle.java      | 17 +++----
 .../spark/launcher/LauncherConnection.java      | 14 +++---
 .../apache/spark/launcher/LauncherServer.java   | 46 +++++++++++++++---
 .../org/apache/spark/launcher/BaseSuite.java    | 42 ++++++++++++++---
 .../spark/launcher/LauncherServerSuite.java     | 20 +++-----
 8 files changed, 156 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20c69816/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 9d2f563..a042375 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.launcher;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import static org.junit.Assume.*;
 import static org.mockito.Mockito.*;
 
 import org.apache.spark.SparkContext;
+import org.apache.spark.SparkContext$;
 import org.apache.spark.internal.config.package$;
 import org.apache.spark.util.Utils;
 
@@ -137,7 +139,9 @@ public class SparkLauncherSuite extends BaseSuite {
       // Here DAGScheduler is stopped, while SparkContext.clearActiveContext 
may not be called yet.
       // Wait for a reasonable amount of time to avoid creating two active 
SparkContext in JVM.
       // See SPARK-23019 and SparkContext.stop() for details.
-      TimeUnit.MILLISECONDS.sleep(500);
+      eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
+        assertTrue("SparkContext is still alive.", 
SparkContext$.MODULE$.getActive().isEmpty());
+      });
     }
   }
 
@@ -146,26 +150,35 @@ public class SparkLauncherSuite extends BaseSuite {
     SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
     doAnswer(invocation -> {
       SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
-      transitions.add(h.getState());
+      synchronized (transitions) {
+        transitions.add(h.getState());
+      }
       return null;
     }).when(listener).stateChanged(any(SparkAppHandle.class));
 
-    SparkAppHandle handle = new InProcessLauncher()
-      .setMaster("local")
-      .setAppResource(SparkLauncher.NO_RESOURCE)
-      .setMainClass(InProcessTestApp.class.getName())
-      .addAppArgs("hello")
-      .startApplication(listener);
-
-    waitFor(handle);
-    assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
-
-    // Matches the behavior of LocalSchedulerBackend.
-    List<SparkAppHandle.State> expected = Arrays.asList(
-      SparkAppHandle.State.CONNECTED,
-      SparkAppHandle.State.RUNNING,
-      SparkAppHandle.State.FINISHED);
-    assertEquals(expected, transitions);
+    SparkAppHandle handle = null;
+    try {
+      handle = new InProcessLauncher()
+        .setMaster("local")
+        .setAppResource(SparkLauncher.NO_RESOURCE)
+        .setMainClass(InProcessTestApp.class.getName())
+        .addAppArgs("hello")
+        .startApplication(listener);
+
+      waitFor(handle);
+      assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
+
+      // Matches the behavior of LocalSchedulerBackend.
+      List<SparkAppHandle.State> expected = Arrays.asList(
+        SparkAppHandle.State.CONNECTED,
+        SparkAppHandle.State.RUNNING,
+        SparkAppHandle.State.FINISHED);
+      assertEquals(expected, transitions);
+    } finally {
+      if (handle != null) {
+        handle.kill();
+      }
+    }
   }
 
   public static class SparkLauncherTestApp {

http://git-wip-us.apache.org/repos/asf/spark/blob/20c69816/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
index df1e731..daf0972 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
@@ -33,7 +33,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
   private List<Listener> listeners;
   private State state;
   private String appId;
-  private boolean disposed;
+  private volatile boolean disposed;
 
   protected AbstractAppHandle(LauncherServer server) {
     this.server = server;
@@ -70,8 +70,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
 
   @Override
   public synchronized void disconnect() {
-    if (!disposed) {
-      disposed = true;
+    if (!isDisposed()) {
       if (connection != null) {
         try {
           connection.close();
@@ -79,7 +78,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
           // no-op.
         }
       }
-      server.unregister(this);
+      dispose();
     }
   }
 
@@ -95,6 +94,21 @@ abstract class AbstractAppHandle implements SparkAppHandle {
     return disposed;
   }
 
+  /**
+   * Mark the handle as disposed, and set it as LOST in case the current state 
is not final.
+   */
+  synchronized void dispose() {
+    if (!isDisposed()) {
+      // Unregister first to make sure that the connection with the app has 
been really
+      // terminated.
+      server.unregister(this);
+      if (!getState().isFinal()) {
+        setState(State.LOST);
+      }
+      this.disposed = true;
+    }
+  }
+
   void setState(State s) {
     setState(s, false);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20c69816/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 8b3f427..2b99461 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -48,14 +48,16 @@ class ChildProcAppHandle extends AbstractAppHandle {
 
   @Override
   public synchronized void kill() {
-    disconnect();
-    if (childProc != null) {
-      if (childProc.isAlive()) {
-        childProc.destroyForcibly();
+    if (!isDisposed()) {
+      setState(State.KILLED);
+      disconnect();
+      if (childProc != null) {
+        if (childProc.isAlive()) {
+          childProc.destroyForcibly();
+        }
+        childProc = null;
       }
-      childProc = null;
     }
-    setState(State.KILLED);
   }
 
   void setChildProc(Process childProc, String loggerName, InputStream 
logStream) {
@@ -94,8 +96,6 @@ class ChildProcAppHandle extends AbstractAppHandle {
         return;
       }
 
-      disconnect();
-
       int ec;
       try {
         ec = proc.exitValue();
@@ -118,6 +118,8 @@ class ChildProcAppHandle extends AbstractAppHandle {
       if (newState != null) {
         setState(newState, true);
       }
+
+      disconnect();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20c69816/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
index acd64c9..f04263c 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
@@ -39,15 +39,16 @@ class InProcessAppHandle extends AbstractAppHandle {
 
   @Override
   public synchronized void kill() {
-    LOG.warning("kill() may leave the underlying app running in in-process 
mode.");
-    disconnect();
-
-    // Interrupt the thread. This is not guaranteed to kill the app, though.
-    if (app != null) {
-      app.interrupt();
+    if (!isDisposed()) {
+      LOG.warning("kill() may leave the underlying app running in in-process 
mode.");
+      setState(State.KILLED);
+      disconnect();
+
+      // Interrupt the thread. This is not guaranteed to kill the app, though.
+      if (app != null) {
+        app.interrupt();
+      }
     }
-
-    setState(State.KILLED);
   }
 
   synchronized void start(String appName, Method main, String[] args) {

http://git-wip-us.apache.org/repos/asf/spark/blob/20c69816/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java 
b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
index b4a8719..fd6f229 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
@@ -95,15 +95,15 @@ abstract class LauncherConnection implements Closeable, 
Runnable {
   }
 
   @Override
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
     if (!closed) {
-      synchronized (this) {
-        if (!closed) {
-          closed = true;
-          socket.close();
-        }
-      }
+      closed = true;
+      socket.close();
     }
   }
 
+  boolean isOpen() {
+    return !closed;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20c69816/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java 
b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index b8999a1..660c444 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -217,6 +217,33 @@ class LauncherServer implements Closeable {
         break;
       }
     }
+
+    // If there is a live connection for this handle, we need to wait for it 
to finish before
+    // returning, otherwise there might be a race between the connection 
thread processing
+    // buffered data and the handle cleaning up after itself, leading to 
potentially the wrong
+    // state being reported for the handle.
+    ServerConnection conn = null;
+    synchronized (clients) {
+      for (ServerConnection c : clients) {
+        if (c.handle == handle) {
+          conn = c;
+          break;
+        }
+      }
+    }
+
+    if (conn != null) {
+      synchronized (conn) {
+        if (conn.isOpen()) {
+          try {
+            conn.wait();
+          } catch (InterruptedException ie) {
+            // Ignore.
+          }
+        }
+      }
+    }
+
     unref();
   }
 
@@ -288,7 +315,7 @@ class LauncherServer implements Closeable {
   private class ServerConnection extends LauncherConnection {
 
     private TimerTask timeout;
-    private AbstractAppHandle handle;
+    volatile AbstractAppHandle handle;
 
     ServerConnection(Socket socket, TimerTask timeout) throws IOException {
       super(socket);
@@ -338,16 +365,21 @@ class LauncherServer implements Closeable {
 
     @Override
     public void close() throws IOException {
+      if (!isOpen()) {
+        return;
+      }
+
       synchronized (clients) {
         clients.remove(this);
       }
-      super.close();
+
+      synchronized (this) {
+        super.close();
+        notifyAll();
+      }
+
       if (handle != null) {
-        if (!handle.getState().isFinal()) {
-          LOG.log(Level.WARNING, "Lost connection to spark application.");
-          handle.setState(SparkAppHandle.State.LOST);
-        }
-        handle.disconnect();
+        handle.dispose();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20c69816/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
index 3e1a90e..3722a59 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.launcher;
 
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.After;
@@ -47,19 +48,46 @@ class BaseSuite {
     assertNull(server);
   }
 
-  protected void waitFor(SparkAppHandle handle) throws Exception {
-    long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+  protected void waitFor(final SparkAppHandle handle) throws Exception {
     try {
-      while (!handle.getState().isFinal()) {
-        assertTrue("Timed out waiting for handle to transition to final 
state.",
-          System.nanoTime() < deadline);
-        TimeUnit.MILLISECONDS.sleep(10);
-      }
+      eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
+        assertTrue("Handle is not in final state.", 
handle.getState().isFinal());
+      });
     } finally {
       if (!handle.getState().isFinal()) {
         handle.kill();
       }
     }
+
+    // Wait until the handle has been marked as disposed, to make sure all 
cleanup tasks
+    // have been performed.
+    AbstractAppHandle ahandle = (AbstractAppHandle) handle;
+    eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
+      assertTrue("Handle is still not marked as disposed.", 
ahandle.isDisposed());
+    });
+  }
+
+  /**
+   * Call a closure that performs a check every "period" until it succeeds, or 
the timeout
+   * elapses.
+   */
+  protected void eventually(Duration timeout, Duration period, Runnable check) 
throws Exception {
+    assertTrue("Timeout needs to be larger than period.", 
timeout.compareTo(period) > 0);
+    long deadline = System.nanoTime() + timeout.toNanos();
+    int count = 0;
+    while (true) {
+      try {
+        count++;
+        check.run();
+        return;
+      } catch (Throwable t) {
+        if (System.nanoTime() >= deadline) {
+          String msg = String.format("Failed check after %d tries: %s.", 
count, t.getMessage());
+          throw new IllegalStateException(msg, t);
+        }
+        Thread.sleep(period.toMillis());
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20c69816/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index 7e2b09c..75c1af0 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -23,12 +23,14 @@ import java.io.ObjectInputStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -197,28 +199,20 @@ public class LauncherServerSuite extends BaseSuite {
    * server-side close immediately.
    */
   private void waitForError(TestClient client, String secret) throws Exception 
{
-    boolean helloSent = false;
-    int maxTries = 10;
-    for (int i = 0; i < maxTries; i++) {
+    final AtomicBoolean helloSent = new AtomicBoolean();
+    eventually(Duration.ofSeconds(1), Duration.ofMillis(10), () -> {
       try {
-        if (!helloSent) {
+        if (!helloSent.get()) {
           client.send(new Hello(secret, "1.4.0"));
-          helloSent = true;
+          helloSent.set(true);
         } else {
           client.send(new SetAppId("appId"));
         }
         fail("Expected error but message went through.");
       } catch (IllegalStateException | IOException e) {
         // Expected.
-        break;
-      } catch (AssertionError e) {
-        if (i < maxTries - 1) {
-          Thread.sleep(100);
-        } else {
-          throw new AssertionError("Test failed after " + maxTries + " 
attempts.", e);
-        }
       }
-    }
+    });
   }
 
   private static class TestClient extends LauncherConnection {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to