Repository: spark
Updated Branches:
  refs/heads/master 90848d507 -> 969eda4a0


[SPARK-23020][CORE] Fix another race in the in-process launcher test.

First the bad news: there's an unfixable race in the launcher code.
(By unfixable I mean it would take a lot more effort than this change
to fix it.) The good news is that it should only affect super short
lived applications, such as the one run by the flaky test, so it's
possible to work around it in our test.

The fix also uncovered an issue with the recently added "closeAndWait()"
method; closing the connection would still possibly cause data loss,
so this change waits a while for the connection to finish itself, and
closes the socket if that times out. The existing connection timeout
is reused so that if desired it's possible to control how long to wait.

As part of that I also restored the old behavior that disconnect() would
force a disconnection from the child app; the "wait for data to arrive"
approach is only taken when disposing of the handle.

I tested this by inserting a bunch of sleeps in the test and the socket
handling code in the launcher library; with those I was able to reproduce
the error from the jenkins jobs. With the changes, even with all the
sleeps still in place, all tests pass.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20462 from vanzin/SPARK-23020.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/969eda4a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/969eda4a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/969eda4a

Branch: refs/heads/master
Commit: 969eda4a02faa7ca6cf3aff5cd10e6d51026b845
Parents: 90848d5
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri Feb 2 11:43:22 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Feb 2 11:43:22 2018 +0800

----------------------------------------------------------------------
 .../spark/launcher/SparkLauncherSuite.java      | 40 ++++++++++++++---
 .../spark/launcher/AbstractAppHandle.java       | 45 +++++++++++++-------
 .../spark/launcher/ChildProcAppHandle.java      |  2 +-
 .../spark/launcher/InProcessAppHandle.java      |  2 +-
 .../apache/spark/launcher/LauncherServer.java   | 30 ++++++++-----
 .../spark/launcher/LauncherServerSuite.java     |  2 +-
 6 files changed, 87 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/969eda4a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 1543f4f..2225591 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -157,12 +157,24 @@ public class SparkLauncherSuite extends BaseSuite {
 
     SparkAppHandle handle = null;
     try {
-      handle = new InProcessLauncher()
-        .setMaster("local")
-        .setAppResource(SparkLauncher.NO_RESOURCE)
-        .setMainClass(InProcessTestApp.class.getName())
-        .addAppArgs("hello")
-        .startApplication(listener);
+      synchronized (InProcessTestApp.LOCK) {
+        handle = new InProcessLauncher()
+          .setMaster("local")
+          .setAppResource(SparkLauncher.NO_RESOURCE)
+          .setMainClass(InProcessTestApp.class.getName())
+          .addAppArgs("hello")
+          .startApplication(listener);
+
+        // SPARK-23020: see doc for InProcessTestApp.LOCK for a description of 
the race. Here
+        // we wait until we know that the connection between the app and the 
launcher has been
+        // established before allowing the app to finish.
+        final SparkAppHandle _handle = handle;
+        eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
+          assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
+        });
+
+        InProcessTestApp.LOCK.wait(5000);
+      }
 
       waitFor(handle);
       assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
@@ -193,10 +205,26 @@ public class SparkLauncherSuite extends BaseSuite {
 
   public static class InProcessTestApp {
 
+    /**
+     * SPARK-23020: there's a race caused by a child app finishing too 
quickly. This would cause
+     * the InProcessAppHandle to dispose of itself even before the child 
connection was properly
+     * established, so no state changes would be detected for the application 
and its final
+     * state would be LOST.
+     *
+     * It's not really possible to fix that race safely in the handle code 
itself without changing
+     * the way in-process apps talk to the launcher library, so we work around 
that in the test by
+     * synchronizing on this object.
+     */
+    public static final Object LOCK = new Object();
+
     public static void main(String[] args) throws Exception {
       assertNotEquals(0, args.length);
       assertEquals(args[0], "hello");
       new SparkContext().stop();
+
+      synchronized (LOCK) {
+        LOCK.notifyAll();
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/969eda4a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
index 84a25a5..9cbebda 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
@@ -18,22 +18,22 @@
 package org.apache.spark.launcher;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 abstract class AbstractAppHandle implements SparkAppHandle {
 
-  private static final Logger LOG = 
Logger.getLogger(ChildProcAppHandle.class.getName());
+  private static final Logger LOG = 
Logger.getLogger(AbstractAppHandle.class.getName());
 
   private final LauncherServer server;
 
   private LauncherServer.ServerConnection connection;
   private List<Listener> listeners;
   private AtomicReference<State> state;
-  private String appId;
+  private volatile String appId;
   private volatile boolean disposed;
 
   protected AbstractAppHandle(LauncherServer server) {
@@ -44,7 +44,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
   @Override
   public synchronized void addListener(Listener l) {
     if (listeners == null) {
-      listeners = new ArrayList<>();
+      listeners = new CopyOnWriteArrayList<>();
     }
     listeners.add(l);
   }
@@ -71,16 +71,14 @@ abstract class AbstractAppHandle implements SparkAppHandle {
 
   @Override
   public synchronized void disconnect() {
-    if (!isDisposed()) {
-      if (connection != null) {
-        try {
-          connection.closeAndWait();
-        } catch (IOException ioe) {
-          // no-op.
-        }
+    if (connection != null && connection.isOpen()) {
+      try {
+        connection.close();
+      } catch (IOException ioe) {
+        // no-op.
       }
-      dispose();
     }
+    dispose();
   }
 
   void setConnection(LauncherServer.ServerConnection connection) {
@@ -97,10 +95,25 @@ abstract class AbstractAppHandle implements SparkAppHandle {
 
   /**
    * Mark the handle as disposed, and set it as LOST in case the current state 
is not final.
+   *
+   * This method should be called only when there's a reasonable expectation 
that the communication
+   * with the child application is not needed anymore, either because the code 
managing the handle
+   * has said so, or because the child application is finished.
    */
   synchronized void dispose() {
     if (!isDisposed()) {
+      // First wait for all data from the connection to be read. Then 
unregister the handle.
+      // Otherwise, unregistering might cause the server to be stopped and all 
child connections
+      // to be closed.
+      if (connection != null) {
+        try {
+          connection.waitForClose();
+        } catch (IOException ioe) {
+          // no-op.
+        }
+      }
       server.unregister(this);
+
       // Set state to LOST if not yet final.
       setState(State.LOST, false);
       this.disposed = true;
@@ -127,11 +140,13 @@ abstract class AbstractAppHandle implements 
SparkAppHandle {
       current = state.get();
     }
 
-    LOG.log(Level.WARNING, "Backend requested transition from final state {0} 
to {1}.",
-      new Object[] { current, s });
+    if (s != State.LOST) {
+      LOG.log(Level.WARNING, "Backend requested transition from final state 
{0} to {1}.",
+        new Object[] { current, s });
+    }
   }
 
-  synchronized void setAppId(String appId) {
+  void setAppId(String appId) {
     this.appId = appId;
     fireEvent(true);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/969eda4a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 5e3c956..5609f84 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -112,7 +112,7 @@ class ChildProcAppHandle extends AbstractAppHandle {
         }
       }
 
-      disconnect();
+      dispose();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/969eda4a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
index b8030e0..4b740d3 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
@@ -66,7 +66,7 @@ class InProcessAppHandle extends AbstractAppHandle {
         setState(State.FAILED);
       }
 
-      disconnect();
+      dispose();
     });
 
     app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), 
appName));

http://git-wip-us.apache.org/repos/asf/spark/blob/969eda4a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java 
b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index f4ecd52..607879f 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -238,6 +238,7 @@ class LauncherServer implements Closeable {
         };
         ServerConnection clientConnection = new ServerConnection(client, 
timeout);
         Thread clientThread = factory.newThread(clientConnection);
+        clientConnection.setConnectionThread(clientThread);
         synchronized (clients) {
           clients.add(clientConnection);
         }
@@ -290,17 +291,15 @@ class LauncherServer implements Closeable {
 
     private TimerTask timeout;
     private volatile Thread connectionThread;
-    volatile AbstractAppHandle handle;
+    private volatile AbstractAppHandle handle;
 
     ServerConnection(Socket socket, TimerTask timeout) throws IOException {
       super(socket);
       this.timeout = timeout;
     }
 
-    @Override
-    public void run() {
-      this.connectionThread = Thread.currentThread();
-      super.run();
+    void setConnectionThread(Thread t) {
+      this.connectionThread = t;
     }
 
     @Override
@@ -361,19 +360,30 @@ class LauncherServer implements Closeable {
     }
 
     /**
-     * Close the connection and wait for any buffered data to be processed 
before returning.
+     * Wait for the remote side to close the connection so that any pending 
data is processed.
      * This ensures any changes reported by the child application take effect.
+     *
+     * This method allows a short period for the above to happen (same amount 
of time as the
+     * connection timeout, which is configurable). This should be fine for 
well-behaved
+     * applications, where they close the connection arond the same time the 
app handle detects the
+     * app has finished.
+     *
+     * In case the connection is not closed within the grace period, this 
method forcefully closes
+     * it and any subsequent data that may arrive will be ignored.
      */
-    public void closeAndWait() throws IOException {
-      close();
-
+    public void waitForClose() throws IOException {
       Thread connThread = this.connectionThread;
       if (Thread.currentThread() != connThread) {
         try {
-          connThread.join();
+          connThread.join(getConnectionTimeout());
         } catch (InterruptedException ie) {
           // Ignore.
         }
+
+        if (connThread.isAlive()) {
+          LOG.log(Level.WARNING, "Timed out waiting for child connection to 
close.");
+          close();
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/969eda4a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index 024efac..d16337a 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -94,8 +94,8 @@ public class LauncherServerSuite extends BaseSuite {
       Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
       assertTrue(stopMsg instanceof Stop);
     } finally {
-      handle.kill();
       close(client);
+      handle.kill();
       client.clientThread.join();
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to