This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new a15d4ff062 refactor(auth): move USER_LAST_ACTIVE_TIME write out of
JwtAuthFilter (#4888)
a15d4ff062 is described below
commit a15d4ff0623f0e3b5451690cd4c23ecb78aa973b
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 20:52:18 2026 -0700
refactor(auth): move USER_LAST_ACTIVE_TIME write out of JwtAuthFilter
(#4888)
### What changes were proposed in this PR?
`JwtAuthFilter` did a synchronous `USER_LAST_ACTIVE_TIME` upsert on
every authenticated request — coupling JWT verification to a DB
round-trip and mixing user-management into the auth pipeline. Strip the
DB write out of the filter.
The replacement:
- `UserActivityTracker` (in `common/auth`) — per-uid in-memory cooldown
(5 min default) + single-thread daemon executor, so request threads
never wait on DB.
- `UserActivityEventListener` (in access-control-service) — a Jersey
`ApplicationEventListener`, **not** a `ContainerRequestFilter`. It
observes `RESOURCE_METHOD_FINISHED` and forwards the uid to the tracker.
Cannot reject or transform requests.
Listener is registered only in access-control-service. Other services
keep `JwtAuthFilter` but no longer write `USER_LAST_ACTIVE_TIME` —
authenticated client sessions necessarily contact access-control-service
often enough (UI navigation, permission checks, LiteLLM proxy) to
capture activity with high recall.
### Any related issues, documentation, discussions?
Closes #4887
### How was this PR tested?
Unit specs cover tracker cooldown/CAS (synchronous executor + injectable
clock, no DB) and listener gating (Mockito stubs for `RequestEvent`, no
Jersey runtime). All green; format/lint clean.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
---
.../texera/service/AccessControlService.scala | 7 +
.../activity/UserActivityEventListener.scala | 79 ++++++++++
.../service/AccessControlServiceRunSpec.scala | 49 ++++++
.../activity/UserActivityEventListenerSpec.scala | 136 +++++++++++++++++
common/auth/build.sbt | 3 +-
.../org/apache/texera/auth/JwtAuthFilter.scala | 21 +--
.../apache/texera/auth/UserActivityTracker.scala | 169 +++++++++++++++++++++
.../texera/auth/UserActivityTrackerSpec.scala | 143 +++++++++++++++++
8 files changed, 587 insertions(+), 20 deletions(-)
diff --git
a/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala
b/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala
index 0ab9f0fbfe..21d367e2bb 100644
---
a/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala
+++
b/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala
@@ -26,6 +26,7 @@ import io.dropwizard.core.setup.{Bootstrap, Environment}
import org.apache.texera.amber.config.StorageConfig
import org.apache.texera.auth.{JwtAuthFilter, RequestLoggingFilter,
SessionUser}
import org.apache.texera.dao.SqlServer
+import org.apache.texera.service.activity.UserActivityEventListener
import org.apache.texera.service.resource.{
AccessControlResource,
HealthCheckResource,
@@ -77,6 +78,12 @@ class AccessControlService extends
Application[AccessControlServiceConfiguration
new
io.dropwizard.auth.AuthValueFactoryProvider.Binder(classOf[SessionUser])
)
+ // Record USER_LAST_ACTIVE_TIME on every matched, completed request.
+ // Lives only in this service because authenticated client sessions
+ // contact access-control-service often enough to capture activity
+ // with high recall.
+ environment.jersey.register(new UserActivityEventListener())
+
// Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL
RequestLoggingFilter.register(environment.getApplicationContext)
}
diff --git
a/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala
b/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala
new file mode 100644
index 0000000000..e5fa785f53
--- /dev/null
+++
b/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.activity
+
+import jakarta.ws.rs.ext.Provider
+import org.apache.texera.auth.{SessionUser, UserActivityTracker}
+import org.glassfish.jersey.server.monitoring.{
+ ApplicationEvent,
+ ApplicationEventListener,
+ RequestEvent,
+ RequestEventListener
+}
+
+/** Records user activity (USER_LAST_ACTIVE_TIME) once per matched, completed
+ * request. Intentionally NOT a ContainerRequestFilter:
+ *
+ * - It cannot reject or transform a request — it only observes.
+ * - It runs at Jersey's monitoring layer, not the auth pipeline, so
+ * activity tracking is decoupled from authentication concerns.
+ * - It listens for RESOURCE_METHOD_FINISHED only, so requests that
+ * fail before reaching a handler (no auth, 404, 4xx in earlier
+ * filters) do not count as user activity.
+ *
+ * The DB write itself is throttled per-uid by [[UserActivityTracker]].
+ *
+ * Lives in access-control-service because USER_LAST_ACTIVE_TIME is a
+ * user-management concern; the assumption is that any authenticated
+ * client session contacts this service often enough (UI navigation,
+ * permission checks, LiteLLM proxy) to capture activity with high
+ * recall, so other services do not need to mirror this listener.
+ */
+@Provider
+class UserActivityEventListener(track: Integer => Unit =
UserActivityTracker.markActive)
+ extends ApplicationEventListener {
+
+ override def onEvent(event: ApplicationEvent): Unit = ()
+
+ // SAM-converted lambda: avoids an inner anonymous class so coverage
+ // tooling sees a flat method body. Logic lives in the companion's
+ // `handle` so tests can drive it directly.
+ override def onRequest(requestEvent: RequestEvent): RequestEventListener =
+ (event: RequestEvent) => UserActivityEventListener.handle(event, track)
+}
+
+object UserActivityEventListener {
+
+ /** Process a single Jersey request event. Public-package for tests so the
+ * per-request branching is exercised without a Jersey runtime.
+ */
+ private[activity] def handle(event: RequestEvent, track: Integer => Unit):
Unit = {
+ // `eq` (reference equality) is correct here because Type is a Java enum
+ // — its constants are singletons. It also compiles to a single
+ // `if_acmpne`, sidestepping Scala's BoxesRunTime.equals branch fan-out.
+ if (!(event.getType eq RequestEvent.Type.RESOURCE_METHOD_FINISHED)) return
+ val sc = event.getContainerRequest.getSecurityContext
+ if (sc == null) return
+ sc.getUserPrincipal match {
+ case u: SessionUser if u.getUid != null => track(u.getUid)
+ case _ =>
+ }
+ }
+}
diff --git
a/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala
b/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala
new file mode 100644
index 0000000000..89979ee816
--- /dev/null
+++
b/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service
+
+import io.dropwizard.core.setup.Environment
+import io.dropwizard.jersey.setup.JerseyEnvironment
+import io.dropwizard.jetty.MutableServletContextHandler
+import io.dropwizard.jetty.setup.ServletEnvironment
+import org.apache.texera.service.activity.UserActivityEventListener
+import org.mockito.ArgumentMatchers.isA
+import org.mockito.Mockito.{mock, verify, when}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class AccessControlServiceRunSpec extends AnyFlatSpec with Matchers {
+
+ "AccessControlService.run" should "register UserActivityEventListener on the
Jersey environment" in {
+ val jersey = mock(classOf[JerseyEnvironment])
+ val servlets = mock(classOf[ServletEnvironment])
+ val context = mock(classOf[MutableServletContextHandler])
+ val env = mock(classOf[Environment])
+ when(env.jersey).thenReturn(jersey)
+ when(env.servlets).thenReturn(servlets)
+ when(env.getApplicationContext).thenReturn(context)
+
+ val service = new AccessControlService
+ service.run(mock(classOf[AccessControlServiceConfiguration]), env)
+
+ verify(jersey).register(isA(classOf[UserActivityEventListener]))
+ verify(jersey).setUrlPattern("/api/*")
+ }
+}
diff --git
a/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala
b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala
new file mode 100644
index 0000000000..3d99f4e7fb
--- /dev/null
+++
b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.activity
+
+import jakarta.ws.rs.core.SecurityContext
+import org.apache.texera.auth.SessionUser
+import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum
+import org.apache.texera.dao.jooq.generated.tables.pojos.User
+import org.glassfish.jersey.server.ContainerRequest
+import org.glassfish.jersey.server.monitoring.{ApplicationEvent, RequestEvent}
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import java.security.Principal
+import java.util.concurrent.ConcurrentLinkedQueue
+
+class UserActivityEventListenerSpec extends AnyFlatSpec with Matchers {
+
+ private def sessionUser(uid: Integer): SessionUser = {
+ val u = new User(uid, "u", null, null, null, null, UserRoleEnum.REGULAR,
null, null, null, null)
+ new SessionUser(u)
+ }
+
+ private def buildEvent(eventType: RequestEvent.Type, sc: SecurityContext):
RequestEvent = {
+ val req = mock(classOf[ContainerRequest])
+ when(req.getSecurityContext).thenReturn(sc)
+ val event = mock(classOf[RequestEvent])
+ when(event.getType).thenReturn(eventType)
+ when(event.getContainerRequest).thenReturn(req)
+ event
+ }
+
+ private def buildSecurityContext(principal: Principal): SecurityContext = {
+ val sc = mock(classOf[SecurityContext])
+ when(sc.getUserPrincipal).thenReturn(principal)
+ sc
+ }
+
+ private def newRecorder(): ConcurrentLinkedQueue[Integer] = new
ConcurrentLinkedQueue[Integer]()
+ private def trackTo(q: ConcurrentLinkedQueue[Integer]): Integer => Unit =
+ uid => { q.add(uid); () }
+
+ "UserActivityEventListener.handle" should "invoke the tracker on
RESOURCE_METHOD_FINISHED with a SessionUser principal" in {
+ val recorded = newRecorder()
+ UserActivityEventListener.handle(
+ buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED,
buildSecurityContext(sessionUser(42))),
+ trackTo(recorded)
+ )
+ recorded.size shouldBe 1
+ recorded.peek() shouldBe 42
+ }
+
+ it should "ignore RequestEvent types other than RESOURCE_METHOD_FINISHED" in
{
+ val recorded = newRecorder()
+ val sc = buildSecurityContext(sessionUser(42))
+ UserActivityEventListener.handle(buildEvent(RequestEvent.Type.START, sc),
trackTo(recorded))
+ UserActivityEventListener.handle(
+ buildEvent(RequestEvent.Type.RESOURCE_METHOD_START, sc),
+ trackTo(recorded)
+ )
+ UserActivityEventListener.handle(buildEvent(RequestEvent.Type.FINISHED,
sc), trackTo(recorded))
+ recorded.isEmpty shouldBe true
+ }
+
+ it should "ignore non-SessionUser principals" in {
+ val recorded = newRecorder()
+ val anon: Principal = new Principal { override def getName: String =
"anon" }
+ UserActivityEventListener.handle(
+ buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED,
buildSecurityContext(anon)),
+ trackTo(recorded)
+ )
+ recorded.isEmpty shouldBe true
+ }
+
+ it should "ignore SessionUser with null uid" in {
+ val recorded = newRecorder()
+ UserActivityEventListener.handle(
+ buildEvent(
+ RequestEvent.Type.RESOURCE_METHOD_FINISHED,
+ buildSecurityContext(sessionUser(null))
+ ),
+ trackTo(recorded)
+ )
+ recorded.isEmpty shouldBe true
+ }
+
+ it should "ignore null SecurityContext" in {
+ val recorded = newRecorder()
+ UserActivityEventListener.handle(
+ buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, null),
+ trackTo(recorded)
+ )
+ recorded.isEmpty shouldBe true
+ }
+
+ // Listener-level smoke tests: verify the SAM lambda + dispatch glue,
+ // not the per-event branching (which lives in `handle`).
+ "UserActivityEventListener" should "dispatch RequestEvent to the handle
function" in {
+ val recorded = newRecorder()
+ val listener = new UserActivityEventListener(trackTo(recorded))
+ val rel = listener.onRequest(mock(classOf[RequestEvent]))
+ rel.onEvent(
+ buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED,
buildSecurityContext(sessionUser(7)))
+ )
+ recorded.peek() shouldBe 7
+ }
+
+ it should "no-op on ApplicationEvent (lifecycle hook unused)" in {
+ val recorded = newRecorder()
+ val listener = new UserActivityEventListener(trackTo(recorded))
+ listener.onEvent(mock(classOf[ApplicationEvent]))
+ recorded.isEmpty shouldBe true
+ }
+
+ it should "construct with the default tracker without invoking it" in {
+ new UserActivityEventListener() should not be null
+ }
+}
diff --git a/common/auth/build.sbt b/common/auth/build.sbt
index ce8d2243a1..a33da64fea 100644
--- a/common/auth/build.sbt
+++ b/common/auth/build.sbt
@@ -58,5 +58,6 @@ libraryDependencies ++= Seq(
"org.bitbucket.b_c" % "jose4j" % "0.9.6", // for
jwt parser
"jakarta.ws.rs" % "jakarta.ws.rs-api" % "3.0.0", // for
JwtAuthFilter
"jakarta.servlet" % "jakarta.servlet-api" % "5.0.0" % "provided", // for
RequestLoggingFilter
- "org.eclipse.jetty" % "jetty-servlet" % "11.0.24" % "provided" // for
FilterHolder
+ "org.eclipse.jetty" % "jetty-servlet" % "11.0.24" % "provided", // for
FilterHolder
+ "org.scalatest" %% "scalatest" % "3.2.17" % Test
)
\ No newline at end of file
diff --git
a/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala
b/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala
index ed9615b451..5698515630 100644
--- a/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala
+++ b/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala
@@ -20,23 +20,16 @@
package org.apache.texera.auth
import com.typesafe.scalalogging.LazyLogging
-import jakarta.ws.rs.container.{ContainerRequestContext,
ContainerRequestFilter, ResourceInfo}
-import jakarta.ws.rs.core.{Context, HttpHeaders, SecurityContext}
+import jakarta.ws.rs.container.{ContainerRequestContext,
ContainerRequestFilter}
+import jakarta.ws.rs.core.{HttpHeaders, SecurityContext}
import jakarta.ws.rs.ext.Provider
-import org.apache.texera.dao.SqlServer
-import org.apache.texera.dao.jooq.generated.Tables.USER_LAST_ACTIVE_TIME
import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum
import java.security.Principal
-import java.time.OffsetDateTime
@Provider
class JwtAuthFilter extends ContainerRequestFilter with LazyLogging {
- @Context
- private var resourceInfo: ResourceInfo = _
- private def ctx = SqlServer.getInstance().createDSLContext()
-
override def filter(requestContext: ContainerRequestContext): Unit = {
val authHeader = requestContext.getHeaderString(HttpHeaders.AUTHORIZATION)
@@ -46,16 +39,6 @@ class JwtAuthFilter extends ContainerRequestFilter with
LazyLogging {
if (userOpt.isPresent) {
val user = userOpt.get()
-
- ctx
- .insertInto(USER_LAST_ACTIVE_TIME)
- .set(USER_LAST_ACTIVE_TIME.UID, user.getUid)
- .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, OffsetDateTime.now())
- .onConflict(USER_LAST_ACTIVE_TIME.UID) // conflict on primary key uid
- .doUpdate()
- .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, OffsetDateTime.now())
- .execute()
-
requestContext.setSecurityContext(new SecurityContext {
override def getUserPrincipal: Principal = user
override def isUserInRole(role: String): Boolean =
diff --git
a/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala
b/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala
new file mode 100644
index 0000000000..d397932289
--- /dev/null
+++
b/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.auth
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.jooq.generated.Tables.USER_LAST_ACTIVE_TIME
+
+import java.time.{Duration, Instant, OffsetDateTime, ZoneOffset}
+import java.util.concurrent.{
+ ArrayBlockingQueue,
+ ConcurrentHashMap,
+ Executor,
+ Executors,
+ ScheduledExecutorService,
+ ThreadFactory,
+ ThreadPoolExecutor,
+ TimeUnit
+}
+import scala.util.control.NonFatal
+
+/** Per-uid activity timestamp recorder. The actual DB upsert is throttled
+ * by a per-uid in-memory cooldown so that a user hitting the API at high
+ * RPS produces at most one USER_LAST_ACTIVE_TIME write per
+ * `writeInterval`. The upsert itself runs on the supplied `executor` so
+ * request threads never wait on DB latency.
+ *
+ * Class form (with injectable upsert / executor / clock) exists so the
+ * cooldown/CAS logic can be unit-tested without a DB. The companion
+ * object [[UserActivityTracker]] is the production singleton.
+ */
+class UserActivityTracker(
+ writeInterval: Duration,
+ upsertFn: (Integer, Instant) => Unit,
+ executor: Executor,
+ clock: () => Instant
+) extends LazyLogging {
+
+ private val lastClaimed = new ConcurrentHashMap[Integer, Instant]()
+
+ // Eviction window: an entry is stale once 2*writeInterval has passed
+ // since its last claim. The factor keeps in-cooldown entries safe from
+ // eviction while still bounding `lastClaimed` for users who have gone
+ // away.
+ private val staleAfter: Duration = writeInterval.multipliedBy(2)
+
+ /** Record the user as active. Lock-free; performs at most one upsert per
+ * uid per `writeInterval`. Never propagates failures to the caller.
+ */
+ def markActive(uid: Integer): Unit = {
+ if (uid == null) return
+ try {
+ val now = clock()
+ val prev = lastClaimed.get(uid)
+ if (prev != null && Duration.between(prev, now).compareTo(writeInterval)
< 0) return
+
+ // CAS to claim the write slot for this uid. If another thread won
+ // the race, drop this call.
+ val claimed =
+ if (prev == null) lastClaimed.putIfAbsent(uid, now) == null
+ else lastClaimed.replace(uid, prev, now)
+ if (!claimed) return
+
+ executor.execute(() =>
+ try upsertFn(uid, now)
+ catch {
+ case NonFatal(e) =>
+ logger.warn(s"User activity upsert failed (uid=$uid)", e)
+ }
+ )
+ } catch {
+ case NonFatal(e) =>
+ logger.warn(s"markActive failed (uid=$uid)", e)
+ }
+ }
+
+ /** Drop entries whose last-claimed time is older than `2 * writeInterval`.
+ * Bounds `lastClaimed` for long-lived processes with many distinct uids.
+ * Safe to call concurrently with [[markActive]].
+ */
+ def evictStale(): Unit = {
+ try {
+ val cutoff = clock().minus(staleAfter)
+ lastClaimed.entrySet().removeIf(e => e.getValue.isBefore(cutoff))
+ } catch {
+ case NonFatal(e) => logger.warn("evictStale failed", e)
+ }
+ }
+
+ /** Visible for tests. */
+ private[auth] def cooldownSize: Int = lastClaimed.size()
+}
+
+object UserActivityTracker extends LazyLogging {
+
+ private val WRITE_INTERVAL: Duration = Duration.ofMinutes(5)
+ // Bounded queue: under DB stalls or write storms, oldest pending tasks
+ // are dropped (DiscardOldest). The next request from the same uid will
+ // re-claim and re-write once cooldown elapses, so dropping a stale
+ // pending write does not lose the activity signal long-term.
+ private val WRITER_QUEUE_CAPACITY = 256
+
+ private val writer: Executor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue[Runnable](WRITER_QUEUE_CAPACITY),
+ daemonThreadFactory("user-activity-writer"),
+ new ThreadPoolExecutor.DiscardOldestPolicy
+ )
+
+ private val instance = new UserActivityTracker(
+ WRITE_INTERVAL,
+ defaultUpsert,
+ writer,
+ () => Instant.now()
+ )
+
+ // Periodic eviction of stale uid entries, running once per WRITE_INTERVAL.
+ private val cleanup: ScheduledExecutorService =
+
Executors.newSingleThreadScheduledExecutor(daemonThreadFactory("user-activity-cleanup"))
+ cleanup.scheduleAtFixedRate(
+ () => instance.evictStale(),
+ WRITE_INTERVAL.toMillis,
+ WRITE_INTERVAL.toMillis,
+ TimeUnit.MILLISECONDS
+ )
+
+ /** Production entry point. Delegates to the singleton tracker. */
+ def markActive(uid: Integer): Unit = instance.markActive(uid)
+
+ private def defaultUpsert(uid: Integer, ts: Instant): Unit = {
+ val ctx = SqlServer.getInstance().createDSLContext()
+ val odt = OffsetDateTime.ofInstant(ts, ZoneOffset.UTC)
+ ctx
+ .insertInto(USER_LAST_ACTIVE_TIME)
+ .set(USER_LAST_ACTIVE_TIME.UID, uid)
+ .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt)
+ .onConflict(USER_LAST_ACTIVE_TIME.UID)
+ .doUpdate()
+ .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt)
+ .execute()
+ }
+
+ private def daemonThreadFactory(name: String): ThreadFactory =
+ (r: Runnable) => {
+ val t = new Thread(r, name)
+ t.setDaemon(true)
+ t
+ }
+}
diff --git
a/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala
b/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala
new file mode 100644
index 0000000000..c8ad606301
--- /dev/null
+++
b/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.auth
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import java.time.{Duration, Instant}
+import java.util.concurrent.{ConcurrentLinkedQueue, Executor}
+import java.util.concurrent.atomic.AtomicReference
+
+class UserActivityTrackerSpec extends AnyFlatSpec with Matchers {
+
+ // Synchronous executor: runnable runs on the calling thread, so the
+ // test can observe upsert invocations deterministically.
+ private val sameThread: Executor = (cmd: Runnable) => cmd.run()
+
+ private class Recorder {
+ val calls = new ConcurrentLinkedQueue[(Integer, Instant)]()
+ def upsert(uid: Integer, ts: Instant): Unit = { calls.add((uid, ts)); () }
+ }
+
+ private def makeTracker(
+ writeInterval: Duration,
+ recorder: Recorder,
+ clock: AtomicReference[Instant]
+ ) =
+ new UserActivityTracker(writeInterval, recorder.upsert, sameThread, () =>
clock.get())
+
+ "UserActivityTracker" should "trigger an upsert on the first call for a uid"
in {
+ val recorder = new Recorder
+ val now = Instant.parse("2026-01-01T00:00:00Z")
+ val clock = new AtomicReference[Instant](now)
+ val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+ tracker.markActive(42)
+
+ recorder.calls.size shouldBe 1
+ val (uid, ts) = recorder.calls.peek()
+ uid shouldBe 42
+ ts shouldBe now
+ }
+
+ it should "skip upserts within the cooldown window" in {
+ val recorder = new Recorder
+ val t0 = Instant.parse("2026-01-01T00:00:00Z")
+ val clock = new AtomicReference[Instant](t0)
+ val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+ tracker.markActive(42)
+ clock.set(t0.plus(Duration.ofMinutes(2)))
+ tracker.markActive(42)
+ clock.set(t0.plus(Duration.ofMinutes(4).plusSeconds(59)))
+ tracker.markActive(42)
+
+ recorder.calls.size shouldBe 1
+ }
+
+ it should "fire another upsert once the cooldown elapses" in {
+ val recorder = new Recorder
+ val t0 = Instant.parse("2026-01-01T00:00:00Z")
+ val clock = new AtomicReference[Instant](t0)
+ val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+ tracker.markActive(42)
+ clock.set(t0.plus(Duration.ofMinutes(5)))
+ tracker.markActive(42)
+
+ recorder.calls.size shouldBe 2
+ }
+
+ it should "track different uids independently" in {
+ val recorder = new Recorder
+ val clock = new
AtomicReference[Instant](Instant.parse("2026-01-01T00:00:00Z"))
+ val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+ tracker.markActive(1)
+ tracker.markActive(2)
+ tracker.markActive(3)
+
+ recorder.calls.size shouldBe 3
+ }
+
+ it should "treat null uid as a no-op" in {
+ val recorder = new Recorder
+ val clock = new
AtomicReference[Instant](Instant.parse("2026-01-01T00:00:00Z"))
+ val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+ tracker.markActive(null)
+
+ recorder.calls.size shouldBe 0
+ }
+
+ it should "evict cooldown entries older than 2 * writeInterval" in {
+ val recorder = new Recorder
+ val t0 = Instant.parse("2026-01-01T00:00:00Z")
+ val clock = new AtomicReference[Instant](t0)
+ val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock)
+
+ tracker.markActive(1)
+ tracker.markActive(2)
+ tracker.cooldownSize shouldBe 2
+
+ // 9 minutes — under 2 * writeInterval (10), nothing evicted
+ clock.set(t0.plus(Duration.ofMinutes(9)))
+ tracker.evictStale()
+ tracker.cooldownSize shouldBe 2
+
+ // 11 minutes — past 2 * writeInterval, both entries evicted
+ clock.set(t0.plus(Duration.ofMinutes(11)))
+ tracker.evictStale()
+ tracker.cooldownSize shouldBe 0
+ }
+
+ it should "swallow upsertFn exceptions instead of propagating to the caller"
in {
+ val t0 = Instant.parse("2026-01-01T00:00:00Z")
+ val clock = new AtomicReference[Instant](t0)
+ val throwing: (Integer, Instant) => Unit =
+ (_, _) => throw new RuntimeException("simulated DB outage")
+ val tracker =
+ new UserActivityTracker(Duration.ofMinutes(5), throwing, sameThread, ()
=> clock.get())
+
+ // Must not throw — the wrapper catches NonFatal from upsertFn.
+ noException should be thrownBy tracker.markActive(42)
+ }
+}