juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1409826101


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => 
SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, 
SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, 
SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get 
accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, 
sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be 
accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be 
accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with 
closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is 
shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = 
SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          
SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every 
$interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in 
periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {

Review Comment:
   > Why is this expiry time rather than custom inactivity time?
   
   Good question. In the previous PR https://github.com/apache/spark/pull/43913 
where I was adding an ExtendSession RPC, I found it a less ambiguous interface 
to pass expiry time rather by extending by X. This way the client and server 
could know what the deadline is precisely.
   Here I kept it this way.
   What you're saying is that we could also set custom inactivity time, and 
then could do it only once. I.e. make this override the server side default 
config.
   Yeah, I guess this could be more consistent with the other config being an 
inactivity time, and also allow it to be set only once instead of having to 
keep resetting the expiry time. I'll change it.
   
   > Who sets this in PR?
   
   At this point, as we discussed offline to not introduce the ExtendSession 
RPC at this point, nobody. But I am leaving it here for use by extensions. If 
someone wanted to implement custom session expiry rules, they could still do it 
e.g. using a custom Interceptor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to