This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new a15d4ff062 refactor(auth): move USER_LAST_ACTIVE_TIME write out of 
JwtAuthFilter (#4888)
a15d4ff062 is described below

commit a15d4ff0623f0e3b5451690cd4c23ecb78aa973b
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 20:52:18 2026 -0700

    refactor(auth): move USER_LAST_ACTIVE_TIME write out of JwtAuthFilter 
(#4888)
    
    ### What changes were proposed in this PR?
    
    `JwtAuthFilter` did a synchronous `USER_LAST_ACTIVE_TIME` upsert on
    every authenticated request — coupling JWT verification to a DB
    round-trip and mixing user-management into the auth pipeline. Strip the
    DB write out of the filter.
    
    The replacement:
    
    - `UserActivityTracker` (in `common/auth`) — per-uid in-memory cooldown
    (5 min default) + single-thread daemon executor, so request threads
    never wait on DB.
    - `UserActivityEventListener` (in access-control-service) — a Jersey
    `ApplicationEventListener`, **not** a `ContainerRequestFilter`. It
    observes `RESOURCE_METHOD_FINISHED` and forwards the uid to the tracker.
    Cannot reject or transform requests.
    
    Listener is registered only in access-control-service. Other services
    keep `JwtAuthFilter` but no longer write `USER_LAST_ACTIVE_TIME` —
    authenticated client sessions necessarily contact access-control-service
    often enough (UI navigation, permission checks, LiteLLM proxy) to
    capture activity with high recall.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4887
    
    ### How was this PR tested?
    
    Unit specs cover tracker cooldown/CAS (synchronous executor + injectable
    clock, no DB) and listener gating (Mockito stubs for `RequestEvent`, no
    Jersey runtime). All green; format/lint clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7)
---
 .../texera/service/AccessControlService.scala      |   7 +
 .../activity/UserActivityEventListener.scala       |  79 ++++++++++
 .../service/AccessControlServiceRunSpec.scala      |  49 ++++++
 .../activity/UserActivityEventListenerSpec.scala   | 136 +++++++++++++++++
 common/auth/build.sbt                              |   3 +-
 .../org/apache/texera/auth/JwtAuthFilter.scala     |  21 +--
 .../apache/texera/auth/UserActivityTracker.scala   | 169 +++++++++++++++++++++
 .../texera/auth/UserActivityTrackerSpec.scala      | 143 +++++++++++++++++
 8 files changed, 587 insertions(+), 20 deletions(-)

diff --git 
a/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala
 
b/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala
index 0ab9f0fbfe..21d367e2bb 100644
--- 
a/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala
+++ 
b/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala
@@ -26,6 +26,7 @@ import io.dropwizard.core.setup.{Bootstrap, Environment}
 import org.apache.texera.amber.config.StorageConfig
 import org.apache.texera.auth.{JwtAuthFilter, RequestLoggingFilter, 
SessionUser}
 import org.apache.texera.dao.SqlServer
+import org.apache.texera.service.activity.UserActivityEventListener
 import org.apache.texera.service.resource.{
   AccessControlResource,
   HealthCheckResource,
@@ -77,6 +78,12 @@ class AccessControlService extends 
Application[AccessControlServiceConfiguration
       new 
io.dropwizard.auth.AuthValueFactoryProvider.Binder(classOf[SessionUser])
     )
 
+    // Record USER_LAST_ACTIVE_TIME on every matched, completed request.
+    // Lives only in this service because authenticated client sessions
+    // contact access-control-service often enough to capture activity
+    // with high recall.
+    environment.jersey.register(new UserActivityEventListener())
+
     // Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL
     RequestLoggingFilter.register(environment.getApplicationContext)
   }
diff --git 
a/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala
 
b/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala
new file mode 100644
index 0000000000..e5fa785f53
--- /dev/null
+++ 
b/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.activity
+
+import jakarta.ws.rs.ext.Provider
+import org.apache.texera.auth.{SessionUser, UserActivityTracker}
+import org.glassfish.jersey.server.monitoring.{
+  ApplicationEvent,
+  ApplicationEventListener,
+  RequestEvent,
+  RequestEventListener
+}
+
+/** Records user activity (USER_LAST_ACTIVE_TIME) once per matched, completed
+  * request. Intentionally NOT a ContainerRequestFilter:
+  *
+  *   - It cannot reject or transform a request — it only observes.
+  *   - It runs at Jersey's monitoring layer, not the auth pipeline, so
+  *     activity tracking is decoupled from authentication concerns.
+  *   - It listens for RESOURCE_METHOD_FINISHED only, so requests that
+  *     fail before reaching a handler (no auth, 404, 4xx in earlier
+  *     filters) do not count as user activity.
+  *
+  * The DB write itself is throttled per-uid by [[UserActivityTracker]].
+  *
+  * Lives in access-control-service because USER_LAST_ACTIVE_TIME is a
+  * user-management concern; the assumption is that any authenticated
+  * client session contacts this service often enough (UI navigation,
+  * permission checks, LiteLLM proxy) to capture activity with high
+  * recall, so other services do not need to mirror this listener.
+  */
+@Provider
+class UserActivityEventListener(track: Integer => Unit = 
UserActivityTracker.markActive)
+    extends ApplicationEventListener {
+
+  override def onEvent(event: ApplicationEvent): Unit = ()
+
+  // SAM-converted lambda: avoids an inner anonymous class so coverage
+  // tooling sees a flat method body. Logic lives in the companion's
+  // `handle` so tests can drive it directly.
+  override def onRequest(requestEvent: RequestEvent): RequestEventListener =
+    (event: RequestEvent) => UserActivityEventListener.handle(event, track)
+}
+
+object UserActivityEventListener {
+
+  /** Process a single Jersey request event. Public-package for tests so the
+    * per-request branching is exercised without a Jersey runtime.
+    */
+  private[activity] def handle(event: RequestEvent, track: Integer => Unit): 
Unit = {
+    // `eq` (reference equality) is correct here because Type is a Java enum
+    // — its constants are singletons. It also compiles to a single
+    // `if_acmpne`, sidestepping Scala's BoxesRunTime.equals branch fan-out.
+    if (!(event.getType eq RequestEvent.Type.RESOURCE_METHOD_FINISHED)) return
+    val sc = event.getContainerRequest.getSecurityContext
+    if (sc == null) return
+    sc.getUserPrincipal match {
+      case u: SessionUser if u.getUid != null => track(u.getUid)
+      case _                                  =>
+    }
+  }
+}
diff --git 
a/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala
 
b/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala
new file mode 100644
index 0000000000..89979ee816
--- /dev/null
+++ 
b/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service
+
+import io.dropwizard.core.setup.Environment
+import io.dropwizard.jersey.setup.JerseyEnvironment
+import io.dropwizard.jetty.MutableServletContextHandler
+import io.dropwizard.jetty.setup.ServletEnvironment
+import org.apache.texera.service.activity.UserActivityEventListener
+import org.mockito.ArgumentMatchers.isA
+import org.mockito.Mockito.{mock, verify, when}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class AccessControlServiceRunSpec extends AnyFlatSpec with Matchers {
+
+  "AccessControlService.run" should "register UserActivityEventListener on the 
Jersey environment" in {
+    val jersey = mock(classOf[JerseyEnvironment])
+    val servlets = mock(classOf[ServletEnvironment])
+    val context = mock(classOf[MutableServletContextHandler])
+    val env = mock(classOf[Environment])
+    when(env.jersey).thenReturn(jersey)
+    when(env.servlets).thenReturn(servlets)
+    when(env.getApplicationContext).thenReturn(context)
+
+    val service = new AccessControlService
+    service.run(mock(classOf[AccessControlServiceConfiguration]), env)
+
+    verify(jersey).register(isA(classOf[UserActivityEventListener]))
+    verify(jersey).setUrlPattern("/api/*")
+  }
+}
diff --git 
a/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala
 
b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala
new file mode 100644
index 0000000000..3d99f4e7fb
--- /dev/null
+++ 
b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.activity
+
+import jakarta.ws.rs.core.SecurityContext
+import org.apache.texera.auth.SessionUser
+import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum
+import org.apache.texera.dao.jooq.generated.tables.pojos.User
+import org.glassfish.jersey.server.ContainerRequest
+import org.glassfish.jersey.server.monitoring.{ApplicationEvent, RequestEvent}
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import java.security.Principal
+import java.util.concurrent.ConcurrentLinkedQueue
+
+class UserActivityEventListenerSpec extends AnyFlatSpec with Matchers {
+
+  private def sessionUser(uid: Integer): SessionUser = {
+    val u = new User(uid, "u", null, null, null, null, UserRoleEnum.REGULAR, 
null, null, null, null)
+    new SessionUser(u)
+  }
+
+  private def buildEvent(eventType: RequestEvent.Type, sc: SecurityContext): 
RequestEvent = {
+    val req = mock(classOf[ContainerRequest])
+    when(req.getSecurityContext).thenReturn(sc)
+    val event = mock(classOf[RequestEvent])
+    when(event.getType).thenReturn(eventType)
+    when(event.getContainerRequest).thenReturn(req)
+    event
+  }
+
+  private def buildSecurityContext(principal: Principal): SecurityContext = {
+    val sc = mock(classOf[SecurityContext])
+    when(sc.getUserPrincipal).thenReturn(principal)
+    sc
+  }
+
+  private def newRecorder(): ConcurrentLinkedQueue[Integer] = new 
ConcurrentLinkedQueue[Integer]()
+  private def trackTo(q: ConcurrentLinkedQueue[Integer]): Integer => Unit =
+    uid => { q.add(uid); () }
+
+  "UserActivityEventListener.handle" should "invoke the tracker on 
RESOURCE_METHOD_FINISHED with a SessionUser principal" in {
+    val recorded = newRecorder()
+    UserActivityEventListener.handle(
+      buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, 
buildSecurityContext(sessionUser(42))),
+      trackTo(recorded)
+    )
+    recorded.size shouldBe 1
+    recorded.peek() shouldBe 42
+  }
+
+  it should "ignore RequestEvent types other than RESOURCE_METHOD_FINISHED" in 
{
+    val recorded = newRecorder()
+    val sc = buildSecurityContext(sessionUser(42))
+    UserActivityEventListener.handle(buildEvent(RequestEvent.Type.START, sc), 
trackTo(recorded))
+    UserActivityEventListener.handle(
+      buildEvent(RequestEvent.Type.RESOURCE_METHOD_START, sc),
+      trackTo(recorded)
+    )
+    UserActivityEventListener.handle(buildEvent(RequestEvent.Type.FINISHED, 
sc), trackTo(recorded))
+    recorded.isEmpty shouldBe true
+  }
+
+  it should "ignore non-SessionUser principals" in {
+    val recorded = newRecorder()
+    val anon: Principal = new Principal { override def getName: String = 
"anon" }
+    UserActivityEventListener.handle(
+      buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, 
buildSecurityContext(anon)),
+      trackTo(recorded)
+    )
+    recorded.isEmpty shouldBe true
+  }
+
+  it should "ignore SessionUser with null uid" in {
+    val recorded = newRecorder()
+    UserActivityEventListener.handle(
+      buildEvent(
+        RequestEvent.Type.RESOURCE_METHOD_FINISHED,
+        buildSecurityContext(sessionUser(null))
+      ),
+      trackTo(recorded)
+    )
+    recorded.isEmpty shouldBe true
+  }
+
+  it should "ignore null SecurityContext" in {
+    val recorded = newRecorder()
+    UserActivityEventListener.handle(
+      buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, null),
+      trackTo(recorded)
+    )
+    recorded.isEmpty shouldBe true
+  }
+
+  // Listener-level smoke tests: verify the SAM lambda + dispatch glue,
+  // not the per-event branching (which lives in `handle`).
+  "UserActivityEventListener" should "dispatch RequestEvent to the handle 
function" in {
+    val recorded = newRecorder()
+    val listener = new UserActivityEventListener(trackTo(recorded))
+    val rel = listener.onRequest(mock(classOf[RequestEvent]))
+    rel.onEvent(
+      buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, 
buildSecurityContext(sessionUser(7)))
+    )
+    recorded.peek() shouldBe 7
+  }
+
+  it should "no-op on ApplicationEvent (lifecycle hook unused)" in {
+    val recorded = newRecorder()
+    val listener = new UserActivityEventListener(trackTo(recorded))
+    listener.onEvent(mock(classOf[ApplicationEvent]))
+    recorded.isEmpty shouldBe true
+  }
+
+  it should "construct with the default tracker without invoking it" in {
+    new UserActivityEventListener() should not be null
+  }
+}
diff --git a/common/auth/build.sbt b/common/auth/build.sbt
index ce8d2243a1..a33da64fea 100644
--- a/common/auth/build.sbt
+++ b/common/auth/build.sbt
@@ -58,5 +58,6 @@ libraryDependencies ++= Seq(
   "org.bitbucket.b_c" % "jose4j" % "0.9.6",                             // for 
jwt parser
   "jakarta.ws.rs" % "jakarta.ws.rs-api" % "3.0.0",                      // for 
JwtAuthFilter
   "jakarta.servlet" % "jakarta.servlet-api" % "5.0.0" % "provided",    // for 
RequestLoggingFilter
-  "org.eclipse.jetty" % "jetty-servlet" % "11.0.24" % "provided"       // for 
FilterHolder
+  "org.eclipse.jetty" % "jetty-servlet" % "11.0.24" % "provided",      // for 
FilterHolder
+  "org.scalatest" %% "scalatest" % "3.2.17" % Test
 )
\ No newline at end of file
diff --git 
a/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala 
b/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala
index ed9615b451..5698515630 100644
--- a/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala
+++ b/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala
@@ -20,23 +20,16 @@
 package org.apache.texera.auth
 
 import com.typesafe.scalalogging.LazyLogging
-import jakarta.ws.rs.container.{ContainerRequestContext, 
ContainerRequestFilter, ResourceInfo}
-import jakarta.ws.rs.core.{Context, HttpHeaders, SecurityContext}
+import jakarta.ws.rs.container.{ContainerRequestContext, 
ContainerRequestFilter}
+import jakarta.ws.rs.core.{HttpHeaders, SecurityContext}
 import jakarta.ws.rs.ext.Provider
-import org.apache.texera.dao.SqlServer
-import org.apache.texera.dao.jooq.generated.Tables.USER_LAST_ACTIVE_TIME
 import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum
 
 import java.security.Principal
-import java.time.OffsetDateTime
 
 @Provider
 class JwtAuthFilter extends ContainerRequestFilter with LazyLogging {
 
-  @Context
-  private var resourceInfo: ResourceInfo = _
-  private def ctx = SqlServer.getInstance().createDSLContext()
-
   override def filter(requestContext: ContainerRequestContext): Unit = {
     val authHeader = requestContext.getHeaderString(HttpHeaders.AUTHORIZATION)
 
@@ -46,16 +39,6 @@ class JwtAuthFilter extends ContainerRequestFilter with 
LazyLogging {
 
       if (userOpt.isPresent) {
         val user = userOpt.get()
-
-        ctx
-          .insertInto(USER_LAST_ACTIVE_TIME)
-          .set(USER_LAST_ACTIVE_TIME.UID, user.getUid)
-          .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, OffsetDateTime.now())
-          .onConflict(USER_LAST_ACTIVE_TIME.UID) // conflict on primary key uid
-          .doUpdate()
-          .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, OffsetDateTime.now())
-          .execute()
-
         requestContext.setSecurityContext(new SecurityContext {
           override def getUserPrincipal: Principal = user
           override def isUserInRole(role: String): Boolean =
diff --git 
a/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala 
b/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala
new file mode 100644
index 0000000000..d397932289
--- /dev/null
+++ 
b/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.auth
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.jooq.generated.Tables.USER_LAST_ACTIVE_TIME
+
+import java.time.{Duration, Instant, OffsetDateTime, ZoneOffset}
+import java.util.concurrent.{
+  ArrayBlockingQueue,
+  ConcurrentHashMap,
+  Executor,
+  Executors,
+  ScheduledExecutorService,
+  ThreadFactory,
+  ThreadPoolExecutor,
+  TimeUnit
+}
+import scala.util.control.NonFatal
+
+/** Per-uid activity timestamp recorder. The actual DB upsert is throttled
+  * by a per-uid in-memory cooldown so that a user hitting the API at high
+  * RPS produces at most one USER_LAST_ACTIVE_TIME write per
+  * `writeInterval`. The upsert itself runs on the supplied `executor` so
+  * request threads never wait on DB latency.
+  *
+  * Class form (with injectable upsert / executor / clock) exists so the
+  * cooldown/CAS logic can be unit-tested without a DB. The companion
+  * object [[UserActivityTracker]] is the production singleton.
+  */
+class UserActivityTracker(
+    writeInterval: Duration,
+    upsertFn: (Integer, Instant) => Unit,
+    executor: Executor,
+    clock: () => Instant
+) extends LazyLogging {
+
+  private val lastClaimed = new ConcurrentHashMap[Integer, Instant]()
+
+  // Eviction window: an entry is stale once 2*writeInterval has passed
+  // since its last claim. The factor keeps in-cooldown entries safe from
+  // eviction while still bounding `lastClaimed` for users who have gone
+  // away.
+  private val staleAfter: Duration = writeInterval.multipliedBy(2)
+
+  /** Record the user as active. Lock-free; performs at most one upsert per
+    * uid per `writeInterval`. Never propagates failures to the caller.
+    */
+  def markActive(uid: Integer): Unit = {
+    if (uid == null) return
+    try {
+      val now = clock()
+      val prev = lastClaimed.get(uid)
+      if (prev != null && Duration.between(prev, now).compareTo(writeInterval) 
< 0) return
+
+      // CAS to claim the write slot for this uid. If another thread won
+      // the race, drop this call.
+      val claimed =
+        if (prev == null) lastClaimed.putIfAbsent(uid, now) == null
+        else lastClaimed.replace(uid, prev, now)
+      if (!claimed) return
+
+      executor.execute(() =>
+        try upsertFn(uid, now)
+        catch {
+          case NonFatal(e) =>
+            logger.warn(s"User activity upsert failed (uid=$uid)", e)
+        }
+      )
+    } catch {
+      case NonFatal(e) =>
+        logger.warn(s"markActive failed (uid=$uid)", e)
+    }
+  }
+
+  /** Drop entries whose last-claimed time is older than `2 * writeInterval`.
+    * Bounds `lastClaimed` for long-lived processes with many distinct uids.
+    * Safe to call concurrently with [[markActive]].
+    */
+  def evictStale(): Unit = {
+    try {
+      val cutoff = clock().minus(staleAfter)
+      lastClaimed.entrySet().removeIf(e => e.getValue.isBefore(cutoff))
+    } catch {
+      case NonFatal(e) => logger.warn("evictStale failed", e)
+    }
+  }
+
+  /** Visible for tests. */
+  private[auth] def cooldownSize: Int = lastClaimed.size()
+}
+
+object UserActivityTracker extends LazyLogging {
+
+  private val WRITE_INTERVAL: Duration = Duration.ofMinutes(5)
+  // Bounded queue: under DB stalls or write storms, oldest pending tasks
+  // are dropped (DiscardOldest). The next request from the same uid will
+  // re-claim and re-write once cooldown elapses, so dropping a stale
+  // pending write does not lose the activity signal long-term.
+  private val WRITER_QUEUE_CAPACITY = 256
+
+  private val writer: Executor = new ThreadPoolExecutor(
+    1,
+    1,
+    0L,
+    TimeUnit.MILLISECONDS,
+    new ArrayBlockingQueue[Runnable](WRITER_QUEUE_CAPACITY),
+    daemonThreadFactory("user-activity-writer"),
+    new ThreadPoolExecutor.DiscardOldestPolicy
+  )
+
+  private val instance = new UserActivityTracker(
+    WRITE_INTERVAL,
+    defaultUpsert,
+    writer,
+    () => Instant.now()
+  )
+
+  // Periodic eviction of stale uid entries, running once per WRITE_INTERVAL.
+  private val cleanup: ScheduledExecutorService =
+    
Executors.newSingleThreadScheduledExecutor(daemonThreadFactory("user-activity-cleanup"))
+  cleanup.scheduleAtFixedRate(
+    () => instance.evictStale(),
+    WRITE_INTERVAL.toMillis,
+    WRITE_INTERVAL.toMillis,
+    TimeUnit.MILLISECONDS
+  )
+
+  /** Production entry point. Delegates to the singleton tracker. */
+  def markActive(uid: Integer): Unit = instance.markActive(uid)
+
+  private def defaultUpsert(uid: Integer, ts: Instant): Unit = {
+    val ctx = SqlServer.getInstance().createDSLContext()
+    val odt = OffsetDateTime.ofInstant(ts, ZoneOffset.UTC)
+    ctx
+      .insertInto(USER_LAST_ACTIVE_TIME)
+      .set(USER_LAST_ACTIVE_TIME.UID, uid)
+      .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt)
+      .onConflict(USER_LAST_ACTIVE_TIME.UID)
+      .doUpdate()
+      .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt)
+      .execute()
+  }
+
+  private def daemonThreadFactory(name: String): ThreadFactory =
+    (r: Runnable) => {
+      val t = new Thread(r, name)
+      t.setDaemon(true)
+      t
+    }
+}
diff --git 
a/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala
 
b/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala
new file mode 100644
index 0000000000..c8ad606301
--- /dev/null
+++ 
b/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.auth
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import java.time.{Duration, Instant}
+import java.util.concurrent.{ConcurrentLinkedQueue, Executor}
+import java.util.concurrent.atomic.AtomicReference
+
+class UserActivityTrackerSpec extends AnyFlatSpec with Matchers {
+
+  // Synchronous executor: runnable runs on the calling thread, so the
+  // test can observe upsert invocations deterministically.
+  private val sameThread: Executor = (cmd: Runnable) => cmd.run()
+
+  private class Recorder {
+    val calls = new ConcurrentLinkedQueue[(Integer, Instant)]()
+    def upsert(uid: Integer, ts: Instant): Unit = { calls.add((uid, ts)); () }
+  }
+
+  private def makeTracker(
+      writeInterval: Duration,
+      recorder: Recorder,
+      clock: AtomicReference[Instant]
+  ) =
+    new UserActivityTracker(writeInterval, recorder.upsert, sameThread, () => 
clock.get())
+
+  "UserActivityTracker" should "trigger an upsert on the first call for a uid" 
in {
+    val recorder = new Recorder
+    val now = Instant.parse("2026-01-01T00:00:00Z")
+    val clock = new AtomicReference[Instant](now)
+    val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+    tracker.markActive(42)
+
+    recorder.calls.size shouldBe 1
+    val (uid, ts) = recorder.calls.peek()
+    uid shouldBe 42
+    ts shouldBe now
+  }
+
+  it should "skip upserts within the cooldown window" in {
+    val recorder = new Recorder
+    val t0 = Instant.parse("2026-01-01T00:00:00Z")
+    val clock = new AtomicReference[Instant](t0)
+    val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+    tracker.markActive(42)
+    clock.set(t0.plus(Duration.ofMinutes(2)))
+    tracker.markActive(42)
+    clock.set(t0.plus(Duration.ofMinutes(4).plusSeconds(59)))
+    tracker.markActive(42)
+
+    recorder.calls.size shouldBe 1
+  }
+
+  it should "fire another upsert once the cooldown elapses" in {
+    val recorder = new Recorder
+    val t0 = Instant.parse("2026-01-01T00:00:00Z")
+    val clock = new AtomicReference[Instant](t0)
+    val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+    tracker.markActive(42)
+    clock.set(t0.plus(Duration.ofMinutes(5)))
+    tracker.markActive(42)
+
+    recorder.calls.size shouldBe 2
+  }
+
+  it should "track different uids independently" in {
+    val recorder = new Recorder
+    val clock = new 
AtomicReference[Instant](Instant.parse("2026-01-01T00:00:00Z"))
+    val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+    tracker.markActive(1)
+    tracker.markActive(2)
+    tracker.markActive(3)
+
+    recorder.calls.size shouldBe 3
+  }
+
+  it should "treat null uid as a no-op" in {
+    val recorder = new Recorder
+    val clock = new 
AtomicReference[Instant](Instant.parse("2026-01-01T00:00:00Z"))
+    val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+    tracker.markActive(null)
+
+    recorder.calls.size shouldBe 0
+  }
+
+  it should "evict cooldown entries older than 2 * writeInterval" in {
+    val recorder = new Recorder
+    val t0 = Instant.parse("2026-01-01T00:00:00Z")
+    val clock = new AtomicReference[Instant](t0)
+    val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+    tracker.markActive(1)
+    tracker.markActive(2)
+    tracker.cooldownSize shouldBe 2
+
+    // 9 minutes — under 2 * writeInterval (10), nothing evicted
+    clock.set(t0.plus(Duration.ofMinutes(9)))
+    tracker.evictStale()
+    tracker.cooldownSize shouldBe 2
+
+    // 11 minutes — past 2 * writeInterval, both entries evicted
+    clock.set(t0.plus(Duration.ofMinutes(11)))
+    tracker.evictStale()
+    tracker.cooldownSize shouldBe 0
+  }
+
+  it should "swallow upsertFn exceptions instead of propagating to the caller" 
in {
+    val t0 = Instant.parse("2026-01-01T00:00:00Z")
+    val clock = new AtomicReference[Instant](t0)
+    val throwing: (Integer, Instant) => Unit =
+      (_, _) => throw new RuntimeException("simulated DB outage")
+    val tracker =
+      new UserActivityTracker(Duration.ofMinutes(5), throwing, sameThread, () 
=> clock.get())
+
+    // Must not throw — the wrapper catches NonFatal from upsertFn.
+    noException should be thrownBy tracker.markActive(42)
+  }
+}

Reply via email to