This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 145cc2b  [LIVY-633][SERVER] session should not be gc-ed for long 
running queries
145cc2b is described below

commit 145cc2b77db4c7d7bdd8f953dc3a16856d9fcf0f
Author: yihengwang <yihengw...@tencent.com>
AuthorDate: Tue Sep 17 17:21:28 2019 +0800

    [LIVY-633][SERVER] session should not be gc-ed for long running queries
    
    ## What changes were proposed in this pull request?
    Currently, Livy records the last activity time of the session before 
statement execution. If a statement runs too long, exceeding then the session 
timeout, the session will be garbage collected after the statement execution.
    
    This should not be the expected behavior. The statement execution time 
should not be count into idle. We should update the last activity time after 
the statement execution.
    
    We cannot be updated when session changes state from busy to idle in the 
Session class. So in this patch, we add a replLastActivity field into the 
rscClient, which will be updated when the repl state changes. So when session 
changes its state from busy to idle, this field will catch the time and finally 
reflect on the session last activity.
    
    ## How was this patch tested?
    Manual test. Also, add a new unit test.
    
    Existing unit tests and integration tests.
    
    Author: yihengwang <yihengw...@tencent.com>
    Author: Yiheng Wang <yihe...@gmail.com>
    
    Closes #224 from yiheng/fix_633.
---
 rsc/pom.xml                                            |  7 +++++++
 rsc/src/main/java/org/apache/livy/rsc/RSCClient.java   | 18 ++++++++++++++++++
 .../livy/server/interactive/InteractiveSession.scala   | 12 ++++++++++++
 .../server/interactive/InteractiveSessionSpec.scala    | 15 +++++++++++++++
 4 files changed, 52 insertions(+)

diff --git a/rsc/pom.xml b/rsc/pom.xml
index dcb58a6..1f3d6a3 100644
--- a/rsc/pom.xml
+++ b/rsc/pom.xml
@@ -49,6 +49,13 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.livy</groupId>
+      <artifactId>livy-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <!-- Or it will conflict with repl jars -->
+      <scope>provided</scope>
+    </dependency>
 
     <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java 
b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
index f2879b8..c1c9534 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
@@ -44,6 +44,7 @@ import org.apache.livy.client.common.BufferUtils;
 import org.apache.livy.rsc.driver.AddFileJob;
 import org.apache.livy.rsc.driver.AddJarJob;
 import org.apache.livy.rsc.rpc.Rpc;
+import org.apache.livy.sessions.SessionState;
 
 import static org.apache.livy.rsc.RSCConf.Entry.*;
 
@@ -64,6 +65,8 @@ public class RSCClient implements LivyClient {
   private Process driverProcess;
   private volatile boolean isAlive;
   private volatile String replState;
+  // Record the last activity timestamp of the repl
+  private volatile long replLastActivity = System.nanoTime();
 
   RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) 
throws IOException {
     this.conf = conf;
@@ -315,6 +318,16 @@ public class RSCClient implements LivyClient {
     return replState;
   }
 
+  /**
+   * Get the timestamp of the last activity of the repl. It will be updated 
when the repl state
+   * changed from busy to idle
+   *
+   * @return last activity timestamp
+   */
+  public long getReplLastActivity() {
+    return replLastActivity;
+  }
+
   private class ClientProtocol extends BaseProtocol {
 
     <T> JobHandleImpl<T> submit(Job<T> job) {
@@ -411,6 +424,11 @@ public class RSCClient implements LivyClient {
 
     private void handle(ChannelHandlerContext ctx, ReplState msg) {
       LOG.trace("Received repl state for {}", msg.state);
+      // Update last activity timestamp when state change is from busy to idle.
+      if (SessionState.Busy$.MODULE$.state().equals(replState) && msg != null 
&&
+        SessionState.Idle$.MODULE$.state().equals(msg.state)) {
+        replLastActivity = System.nanoTime();
+      }
       replState = msg.state;
     }
   }
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index bccdb4d..cdeddda 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -626,4 +626,16 @@ class InteractiveSession(
   }
 
   override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
+
+  override def lastActivity: Long = {
+    val serverSideLastActivity = super.lastActivity
+    if (serverSideState == SessionState.Running) {
+      // If the rsc client is running, we compare the lastActivity of the 
session and the repl,
+      // and return the more latest one
+      client.flatMap { s => Option(s.getReplLastActivity) }.filter(_ > 
serverSideLastActivity)
+        .getOrElse(serverSideLastActivity)
+    } else {
+      serverSideLastActivity
+    }
+  }
 }
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index 2e21483..55f0e21 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -247,6 +247,21 @@ class InteractiveSessionSpec extends FunSpec
       }
     }
 
+    withSession("should refresh last activity time when statement finished") { 
session =>
+      val code =
+        """
+          |from time import sleep
+          |sleep(3)
+        """.stripMargin
+      session.executeStatement(ExecuteRequest(code, None))
+      val executionBeginTime = session.lastActivity
+
+      eventually(timeout(10 seconds), interval(100 millis)) {
+        session.state should be(SessionState.Idle)
+        session.lastActivity should be > executionBeginTime
+      }
+    }
+
     withSession("should error out the session if the interpreter dies") { 
session =>
       session.executeStatement(ExecuteRequest("import os; os._exit(666)", 
None))
       eventually(timeout(30 seconds), interval(100 millis)) {

Reply via email to