This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5213-c40f7f51735353c7edaae59ddc3cec19ea49494d
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 69bee152f4285410799d196d5e21884663527a74
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 25 20:53:06 2026 -0700

    test(amber): DB-backed unit tests for ExecutionsMetadataPersistService and 
updateWorkflowState (#5213)
    
    ### What changes were proposed in this PR?
    
    Adds `ExecutionsMetadataPersistServiceSpec`, a `MockTexeraDB`-backed
    unit spec covering the three methods on
    `ExecutionsMetadataPersistService` plus the
    `ExecutionStateStore.updateWorkflowState` wrapper that sits on top of
    `tryUpdateExistingExecution`. The MockTexeraDB trait (already used by
    `WorkflowExecutionsResourceSpec` and peers) spins up an EmbeddedPostgres
    in `beforeAll`, loads `sql/texera_ddl.sql`, and tears down in
    `afterAll`.
    
    Two latent silent-failure patterns are pinned with explanatory comments
    and filed as follow-up bugs: `tryGetExistingExecution` returns
    `Some(null)` instead of `None` for unknown eids, and
    `insertNewExecution` propagates a NOT NULL violation when `uid=None`
    despite the `Option[Integer]` signature. The pinned `Some(null)` test is
    paired with an `intercept[TestFailedException]`-based xfail-strict test
    asserting the intended `None` contract — it flips red the day the bug is
    fixed.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5210. Surfaces #5211 and #5212.
    
    ### How was this PR tested?
    
    Added unit tests under
    `amber/src/test/scala/org/apache/texera/web/service/`.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7
---
 .../ExecutionsMetadataPersistServiceSpec.scala     | 330 +++++++++++++++++++++
 1 file changed, 330 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala
 
b/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala
new file mode 100644
index 0000000000..f0f583ecc4
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.apache.texera.amber.engine.common.Utils.maptoStatusCode
+import 
org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore
+import org.apache.texera.dao.MockTexeraDB
+import org.apache.texera.dao.jooq.generated.Tables._
+import org.apache.texera.dao.jooq.generated.tables.daos.{
+  UserDao,
+  WorkflowComputingUnitDao,
+  WorkflowDao,
+  WorkflowExecutionsDao,
+  WorkflowVersionDao
+}
+import org.apache.texera.dao.jooq.generated.tables.pojos.{
+  User,
+  Workflow,
+  WorkflowComputingUnit,
+  WorkflowExecutions,
+  WorkflowVersion
+}
+import org.apache.texera.web.storage.ExecutionStateStore
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import java.sql.Timestamp
+import java.util.UUID
+
+class ExecutionsMetadataPersistServiceSpec
+    extends AnyFlatSpec
+    with Matchers
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with MockTexeraDB {
+
+  // Randomise the seeded wid/uid so a parallel run of unrelated specs that
+  // happen to seed the same ids wouldn't collide on the embedded postgres.
+  private val testWid = 7000 + scala.util.Random.nextInt(1000)
+  private val testUid = 7000 + scala.util.Random.nextInt(1000)
+
+  private var workflowDao: WorkflowDao = _
+  private var workflowVersionDao: WorkflowVersionDao = _
+  private var workflowExecutionsDao: WorkflowExecutionsDao = _
+  private var userDao: UserDao = _
+  private var computingUnitDao: WorkflowComputingUnitDao = _
+  private var seededVid: Integer = _
+  private var seededCuid: Integer = _
+
+  override protected def beforeAll(): Unit = {
+    initializeDBAndReplaceDSLContext()
+  }
+
+  override protected def afterAll(): Unit = {
+    shutdownDB()
+  }
+
+  override protected def beforeEach(): Unit = {
+    val cfg = getDSLContext.configuration()
+    workflowDao = new WorkflowDao(cfg)
+    workflowVersionDao = new WorkflowVersionDao(cfg)
+    workflowExecutionsDao = new WorkflowExecutionsDao(cfg)
+    userDao = new UserDao(cfg)
+    computingUnitDao = new WorkflowComputingUnitDao(cfg)
+    cleanup()
+
+    val user = new User
+    user.setUid(testUid)
+    user.setName("metadata_persist_spec_user")
+    user.setEmail(s"user_${UUID.randomUUID()}@example.com")
+    user.setPassword("password")
+    userDao.insert(user)
+
+    val workflow = new Workflow
+    workflow.setWid(testWid)
+    workflow.setName(s"wf_${UUID.randomUUID().toString.substring(0, 8)}")
+    workflow.setContent("{}")
+    workflow.setDescription("")
+    workflow.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    workflow.setLastModifiedTime(new Timestamp(System.currentTimeMillis()))
+    workflowDao.insert(workflow)
+
+    // Seed one version explicitly so insertNewExecution's getLatestVersion
+    // takes the happy "max(existing)" branch instead of falling into the
+    // back-compat "insert one for you" branch (which would drag in
+    // WorkflowVersionResource.insertNewVersion's diff/aggregate logic).
+    val version = new WorkflowVersion
+    version.setWid(testWid)
+    version.setContent("{}")
+    version.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    workflowVersionDao.insert(version)
+    seededVid = version.getVid
+
+    // workflow_executions.cuid has a FK to workflow_computing_unit; seed one
+    // so insertNewExecution's setCuid(...) call doesn't trip the constraint.
+    val cu = new WorkflowComputingUnit
+    cu.setUid(testUid)
+    cu.setName("test-cu")
+    computingUnitDao.insert(cu)
+    seededCuid = cu.getCuid
+  }
+
+  override protected def afterEach(): Unit = cleanup()
+
+  private def cleanup(): Unit = {
+    getDSLContext
+      .deleteFrom(WORKFLOW_EXECUTIONS)
+      .where(
+        WORKFLOW_EXECUTIONS.VID.in(
+          getDSLContext
+            .select(WORKFLOW_VERSION.VID)
+            .from(WORKFLOW_VERSION)
+            .where(WORKFLOW_VERSION.WID.eq(testWid))
+        )
+      )
+      .execute()
+    getDSLContext
+      .deleteFrom(WORKFLOW_VERSION)
+      .where(WORKFLOW_VERSION.WID.eq(testWid))
+      .execute()
+    
getDSLContext.deleteFrom(WORKFLOW).where(WORKFLOW.WID.eq(testWid)).execute()
+    getDSLContext
+      .deleteFrom(WORKFLOW_COMPUTING_UNIT)
+      .where(WORKFLOW_COMPUTING_UNIT.UID.eq(testUid))
+      .execute()
+    getDSLContext.deleteFrom(USER).where(USER.UID.eq(testUid)).execute()
+  }
+
+  // Helper: insert an execution row tied to the seeded version and return its 
eid.
+  private def seedExecution(status: Byte = 0): Integer = {
+    val exec = new WorkflowExecutions
+    exec.setVid(seededVid)
+    exec.setUid(testUid)
+    exec.setStatus(status)
+    exec.setResult("")
+    exec.setStartingTime(new Timestamp(System.currentTimeMillis()))
+    exec.setBookmarked(false)
+    exec.setName("seeded execution")
+    exec.setEnvironmentVersion("test-env")
+    workflowExecutionsDao.insert(exec)
+    exec.getEid
+  }
+
+  // -- insertNewExecution 
-----------------------------------------------------
+
+  "insertNewExecution" should "insert a row tied to the latest workflow 
version" in {
+    val id = ExecutionsMetadataPersistService.insertNewExecution(
+      WorkflowIdentity(testWid.toLong),
+      Some(testUid),
+      executionName = "named-execution",
+      environmentVersion = "env-1",
+      computingUnitId = seededCuid
+    )
+    id should not be ExecutionIdentity(0L)
+
+    val stored = workflowExecutionsDao.fetchOneByEid(id.id.toInt)
+    stored should not be null
+    stored.getVid shouldBe seededVid
+    stored.getUid shouldBe testUid
+    stored.getName shouldBe "named-execution"
+    stored.getEnvironmentVersion shouldBe "env-1"
+    stored.getCuid shouldBe seededCuid
+  }
+
+  it should "skip setName when executionName is the empty string" in {
+    val id = ExecutionsMetadataPersistService.insertNewExecution(
+      WorkflowIdentity(testWid.toLong),
+      Some(testUid),
+      executionName = "",
+      environmentVersion = "env-2",
+      computingUnitId = seededCuid
+    )
+    val stored = workflowExecutionsDao.fetchOneByEid(id.id.toInt)
+    // The DDL default for workflow_executions.name is 'Untitled Execution'
+    // (sql/texera_ddl.sql). The production code path explicitly does not
+    // call setName for an empty string, so the row should fall back to the
+    // DDL default rather than persisting "".
+    stored.getName shouldBe "Untitled Execution"
+  }
+
+  it should "throw a DB constraint violation when uid is None" in {
+    // The method signature accepts Option[Integer] for uid and calls
+    // `newExecution.setUid(uid.orNull)`, but workflow_executions.uid is
+    // NOT NULL per texera_ddl.sql, so passing None propagates a jOOQ
+    // DataAccessException. Pinning the current behavior so a future fix —
+    // either tightening the signature to a required Integer or making the
+    // column nullable — breaks the spec deliberately. See follow-up bug.
+    val ex = intercept[org.jooq.exception.DataAccessException] {
+      ExecutionsMetadataPersistService.insertNewExecution(
+        WorkflowIdentity(testWid.toLong),
+        None,
+        executionName = "anonymous",
+        environmentVersion = "env-3",
+        computingUnitId = seededCuid
+      )
+    }
+    ex.getMessage should include("uid")
+  }
+
+  // -- tryGetExistingExecution 
------------------------------------------------
+
+  "tryGetExistingExecution" should "return Some(row) for a known eid" in {
+    val eid = seedExecution()
+    val fetched = ExecutionsMetadataPersistService.tryGetExistingExecution(
+      ExecutionIdentity(eid.longValue())
+    )
+    fetched shouldBe defined
+    fetched.get.getEid shouldBe eid
+  }
+
+  it should "currently return Some(null) for an unknown eid (latent bug)" in {
+    // Pin actual behavior: `fetchOneByEid` returns null for a miss (no throw),
+    // and the production code wraps the result in `Some(...)` before the
+    // try/catch can convert it to None. So the Option contract is broken for
+    // misses — callers that destructure with `getOrElse` get the null through.
+    // The catch only fires on hard errors (e.g. a closed connection), which
+    // is why the method name says "tryGet". Filed as a follow-up bug.
+    val fetched = ExecutionsMetadataPersistService.tryGetExistingExecution(
+      ExecutionIdentity(-1L)
+    )
+    fetched shouldBe Some(null)
+  }
+
+  it should "(intended contract) return None for an unknown eid" in {
+    // xfail-strict equivalent in ScalaTest: invert via intercept. When the
+    // bug above is fixed (e.g. 
`Option(workflowExecutionsDao.fetchOneByEid(...))`
+    // instead of `Some(...)`), the inner assertion will pass, intercept will
+    // not catch a TestFailedException, and this test will flip red — forcing
+    // the pinned-behavior test above to be updated in the same PR.
+    intercept[org.scalatest.exceptions.TestFailedException] {
+      val fetched = ExecutionsMetadataPersistService.tryGetExistingExecution(
+        ExecutionIdentity(-1L)
+      )
+      fetched shouldBe None
+    }
+  }
+
+  // -- tryUpdateExistingExecution 
---------------------------------------------
+
+  "tryUpdateExistingExecution" should "apply the update function to the stored 
row" in {
+    val eid = seedExecution(status = 0)
+    ExecutionsMetadataPersistService.tryUpdateExistingExecution(
+      ExecutionIdentity(eid.longValue())
+    ) { exec =>
+      exec.setStatus(2.toByte) // PAUSED
+      exec.setName("renamed via update")
+    }
+    val after = workflowExecutionsDao.fetchOneByEid(eid)
+    after.getStatus shouldBe 2.toByte
+    after.getName shouldBe "renamed via update"
+  }
+
+  it should "swallow the update error for an unknown eid and leave existing 
rows untouched" in {
+    // Pin the silent-failure contract: fetchOneByEid returns null, the
+    // update closure NPEs on it, the catch logs "Unable to update execution"
+    // and continues. The seeded row stays untouched.
+    val eid = seedExecution(status = 1)
+    noException should be thrownBy
+      ExecutionsMetadataPersistService.tryUpdateExistingExecution(
+        ExecutionIdentity(-1L)
+      ) { exec => exec.setStatus(9.toByte) }
+    val after = workflowExecutionsDao.fetchOneByEid(eid)
+    after.getStatus shouldBe 1.toByte
+  }
+
+  // -- ExecutionStateStore.updateWorkflowState 
--------------------------------
+  //
+  // updateWorkflowState wraps tryUpdateExistingExecution to also bump the
+  // in-memory ExecutionMetadataStore's `state`. Exercising it here keeps the
+  // DB-backed setup (workflow/version/execution rows) in one place; the
+  // pure-logic ExecutionStateStoreSpec sibling has no DB dependency.
+
+  "ExecutionStateStore.updateWorkflowState" should "set status via 
maptoStatusCode and return the metadata store with new state" in {
+    val eid = seedExecution(status = 0)
+    val before = workflowExecutionsDao.fetchOneByEid(eid)
+    val beforeTs = before.getLastUpdateTime
+    val store = ExecutionMetadataStore(
+      state = WorkflowAggregatedState.UNINITIALIZED,
+      executionId = ExecutionIdentity(eid.longValue())
+    )
+    val updated =
+      
ExecutionStateStore.updateWorkflowState(WorkflowAggregatedState.COMPLETED, 
store)
+    updated.state shouldBe WorkflowAggregatedState.COMPLETED
+
+    val after = workflowExecutionsDao.fetchOneByEid(eid)
+    after.getStatus shouldBe maptoStatusCode(WorkflowAggregatedState.COMPLETED)
+    // lastUpdateTime is set unconditionally to System.currentTimeMillis().
+    // Asserting it advanced past the seeded null/older value catches a
+    // regression that drops the setLastUpdateTime call.
+    Option(beforeTs) match {
+      case Some(t) => after.getLastUpdateTime.getTime should be >= t.getTime
+      case None    => after.getLastUpdateTime should not be null
+    }
+  }
+
+  it should "still return a metadataStore with the new state when the eid is 
unknown" in {
+    // updateWorkflowState first calls tryUpdateExistingExecution (which
+    // silently swallows the unknown-eid error) and then unconditionally
+    // returns metadataStore.withState(state). Document this so a future
+    // refactor that makes the failure surface (e.g. via Try / Either) has
+    // a spec to migrate.
+    val store = ExecutionMetadataStore(
+      state = WorkflowAggregatedState.UNINITIALIZED,
+      executionId = ExecutionIdentity(-1L)
+    )
+    val updated =
+      ExecutionStateStore.updateWorkflowState(WorkflowAggregatedState.FAILED, 
store)
+    updated.state shouldBe WorkflowAggregatedState.FAILED
+  }
+}

Reply via email to