Xiao-zhen-Liu commented on code in PR #5434:
URL: https://github.com/apache/texera/pull/5434#discussion_r3392576067


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -580,10 +580,8 @@ class RegionExecutionCoordinator(
         DocumentFactory.createDocument(stateURI, State.schema)
         if (!isRestart) {
           val (_, eid, _, _) = decodeURI(resultURI)

Review Comment:
   Inherited from the old code, but worth a look now that this defines an event 
contract: the eid is parsed back out of the URI string here 
(`decodeURI(resultURI)`), even though the only consumer is execution-scoped — 
`attachToExecution` already has `executionId` in hand. You could drop `eid` 
from the event and the `decodeURI` call entirely, and let the subscriber use 
its own execution id. Self-describing events are also a reasonable choice, so 
feel free to keep it; flagging since it removes a field and a decode.



##########
amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala:
##########
@@ -353,6 +357,10 @@ class ExecutionResultService(
       )
     )
 
+    addSubscription(
+      
client.registerCallback[OperatorPortResultUriAvailable](persistOperatorPortResultUri)

Review Comment:
   Before this PR the engine wrote the row unconditionally; now 
`operator_port_executions` only gets populated if a result service was attached 
before `startWorkflow`. That holds today (both the websocket and sync REST 
paths attach in `WorkflowExecutionService.executeWorkflow` before starting), 
but it is an invariant enforced by call ordering rather than by structure. 
Worth a short comment here or on the event definition saying "must be 
registered before the workflow starts, otherwise result URIs are silently never 
persisted" — the failure mode is invisible (results compute fine, the dashboard 
just cannot find them), so the next person touching startup ordering should hit 
a warning sign.



##########
amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala:
##########
@@ -501,4 +509,19 @@ class ExecutionResultService(
     }
   }
 
+  /**
+    * Callback body for `OperatorPortResultUriAvailable`: persist the URI to
+    * `operator_port_executions`. Lifted to an instance method so a unit test
+    * can drive it directly without spinning up an `AmberClient`.
+    */
+  private[service] def persistOperatorPortResultUri(
+      evt: OperatorPortResultUriAvailable
+  ): Unit = {
+    WorkflowExecutionsResource.insertOperatorPortResultUri(

Review Comment:
   This method only forwards to a static helper on the JAX-RS resource, but 
#5430 scoped this work as deleting that helper. Now that 
`ExecutionResultService` is its only production caller, the five lines of DAO 
code could just live here and 
`WorkflowExecutionsResource.insertOperatorPortResultUri` could be deleted as 
planned (the resource spec can insert fixture rows with 
`OperatorPortExecutionsDao` directly). That would also make the PR match its 
own description. If you would rather defer that to the follow-up that moves the 
other static DB helpers off the resource, that is fine too — but please update 
the description and the issue so the series stays trackable.



##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -184,32 +209,24 @@ object TestUtils {
       ControllerConfig.default,
       error => {}
     )
+    val findResultUri = collectResultUriLookup(client)

Review Comment:
   The collect-terminal-results block here is character-for-character identical 
to the one in `DataProcessingSpec`. The duplication predates this PR, but since 
both copies were rewritten here anyway, extracting a shared helper would halve 
the diff and gives the rest of the #5424 series one place to update instead of 
two. (nit)



##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -159,6 +162,28 @@ object TestUtils {
     p
   }
 
+  /**
+    * Subscribe to engine-emitted result-URI events and return a lookup over
+    * the collected URIs keyed by `(logicalOpId, portId)`. Test fixtures use
+    * this to observe URIs directly from the event stream, bypassing the
+    * `operator_port_executions` table (which only gets populated in
+    * production by `ExecutionResultService`'s parallel subscription).
+    */
+  def collectResultUriLookup(

Review Comment:
   Before this PR, `DataProcessingSpec` covered the full path: engine inserts 
the row, test reads it back through `getResultUriByLogicalPortId`. Now the e2e 
tests observe the event directly and the DB insert is only covered by the 
isolated callback-body unit test — nothing anywhere verifies that 
`attachToExecution` actually wires the event to the insert. Since 
`DataProcessingSpec` already stands up the mock DB, one cheap option: have this 
helper (or one test case) also call 
`WorkflowExecutionsResource.insertOperatorPortResultUri` in the callback and 
keep a single DB-backed assertion. That restores end-to-end coverage of the 
seam this PR introduces, and answers @kunwp1's question about what happens when 
the registration is missing.



##########
amber/src/test/scala/org/apache/texera/web/service/ExecutionResultServiceSpec.scala:
##########
@@ -36,8 +56,110 @@ import 
org.apache.texera.web.service.ExecutionResultService.{
 }
 import org.scalatest.flatspec.AnyFlatSpec
 import org.scalatest.matchers.should.Matchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import java.net.URI
+import java.sql.Timestamp
+import scala.jdk.CollectionConverters._
+
+class ExecutionResultServiceSpec
+    extends AnyFlatSpec
+    with Matchers
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with MockTexeraDB {
+
+  private val testWid: Integer = 9000 + scala.util.Random.nextInt(1000)
+  private val testUid: Integer = 9000 + scala.util.Random.nextInt(1000)
+  private var executionsDao: WorkflowExecutionsDao = _
+
+  override protected def beforeAll(): Unit = {
+    initializeDBAndReplaceDSLContext()
+  }
+
+  override protected def afterAll(): Unit = {
+    shutdownDB()
+  }
+
+  override protected def beforeEach(): Unit = {
+    val user = new User
+    user.setUid(testUid)
+    user.setName("execution-result-test-user")
+    user.setEmail(s"[email protected]")
+    user.setPassword("password")
+    new UserDao(getDSLContext.configuration()).insert(user)
+
+    val workflow = new Workflow
+    workflow.setWid(testWid)
+    workflow.setName(s"execution-result-test-$testWid")
+    workflow.setContent("{}")
+    workflow.setDescription("")
+    workflow.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    workflow.setLastModifiedTime(new Timestamp(System.currentTimeMillis()))
+    new WorkflowDao(getDSLContext.configuration()).insert(workflow)
+
+    val version = new WorkflowVersion
+    version.setWid(testWid)
+    version.setContent("{}")
+    version.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    new WorkflowVersionDao(getDSLContext.configuration()).insert(version)
+
+    executionsDao = new WorkflowExecutionsDao(getDSLContext.configuration())
+  }
+
+  override protected def afterEach(): Unit = {
+    val ctx = getDSLContext
+    ctx
+      .deleteFrom(OPERATOR_PORT_EXECUTIONS)
+      .where(
+        OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.in(
+          ctx.select(WORKFLOW_EXECUTIONS.EID).from(WORKFLOW_EXECUTIONS)
+        )
+      )
+      .execute()
+    ctx.deleteFrom(WORKFLOW_EXECUTIONS).execute()
+    import org.apache.texera.dao.jooq.generated.Tables.{USER, WORKFLOW, 
WORKFLOW_VERSION}
+    
ctx.deleteFrom(WORKFLOW_VERSION).where(WORKFLOW_VERSION.WID.eq(testWid)).execute()
+    ctx.deleteFrom(WORKFLOW).where(WORKFLOW.WID.eq(testWid)).execute()
+    ctx.deleteFrom(USER).where(USER.UID.eq(testUid)).execute()
+  }
 
-class ExecutionResultServiceSpec extends AnyFlatSpec with Matchers {
+  "persistOperatorPortResultUri" should
+    "insert the URI carried by an OperatorPortResultUriAvailable event" in {
+    val execution = new WorkflowExecutions
+    execution.setVid(1)

Review Comment:
   To add to Chris's comment: this is not just cosmetic. The version's vid is 
auto-generated and the sequence does not reset between tests, so `setVid(1)` 
only matches because this test is declared first in the file. Adding any 
earlier DB-touching test, or a future test-order change, turns this into an FK 
violation. `version.getVid` after the insert (the DAO writes the generated key 
back into the pojo, same as `execution.getEid` below) fixes it for good.



##########
amber/src/test/scala/org/apache/texera/web/service/ExecutionResultServiceSpec.scala:
##########
@@ -36,8 +56,110 @@ import 
org.apache.texera.web.service.ExecutionResultService.{
 }
 import org.scalatest.flatspec.AnyFlatSpec
 import org.scalatest.matchers.should.Matchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import java.net.URI
+import java.sql.Timestamp
+import scala.jdk.CollectionConverters._
+
+class ExecutionResultServiceSpec
+    extends AnyFlatSpec
+    with Matchers
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with MockTexeraDB {
+
+  private val testWid: Integer = 9000 + scala.util.Random.nextInt(1000)

Review Comment:
   The spec owns its embedded DB so the random wid/uid do not prevent any real 
collision — they just make a failure non-reproducible across runs. Fixed values 
are simpler and replay the same way every time. (nit)



##########
amber/src/test/scala/org/apache/texera/web/service/ExecutionResultServiceSpec.scala:
##########
@@ -36,8 +56,110 @@ import 
org.apache.texera.web.service.ExecutionResultService.{
 }
 import org.scalatest.flatspec.AnyFlatSpec
 import org.scalatest.matchers.should.Matchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import java.net.URI
+import java.sql.Timestamp
+import scala.jdk.CollectionConverters._
+
+class ExecutionResultServiceSpec
+    extends AnyFlatSpec
+    with Matchers
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with MockTexeraDB {
+
+  private val testWid: Integer = 9000 + scala.util.Random.nextInt(1000)
+  private val testUid: Integer = 9000 + scala.util.Random.nextInt(1000)
+  private var executionsDao: WorkflowExecutionsDao = _
+
+  override protected def beforeAll(): Unit = {
+    initializeDBAndReplaceDSLContext()
+  }
+
+  override protected def afterAll(): Unit = {
+    shutdownDB()
+  }
+
+  override protected def beforeEach(): Unit = {
+    val user = new User
+    user.setUid(testUid)
+    user.setName("execution-result-test-user")
+    user.setEmail(s"[email protected]")
+    user.setPassword("password")
+    new UserDao(getDSLContext.configuration()).insert(user)
+
+    val workflow = new Workflow
+    workflow.setWid(testWid)
+    workflow.setName(s"execution-result-test-$testWid")
+    workflow.setContent("{}")
+    workflow.setDescription("")
+    workflow.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    workflow.setLastModifiedTime(new Timestamp(System.currentTimeMillis()))
+    new WorkflowDao(getDSLContext.configuration()).insert(workflow)
+
+    val version = new WorkflowVersion
+    version.setWid(testWid)
+    version.setContent("{}")
+    version.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    new WorkflowVersionDao(getDSLContext.configuration()).insert(version)
+
+    executionsDao = new WorkflowExecutionsDao(getDSLContext.configuration())
+  }
+
+  override protected def afterEach(): Unit = {

Review Comment:
   Two small things: the executions/port-executions deletes here are unscoped 
(they wipe the whole table) while the user/workflow deletes below are scoped to 
the test ids — scoping them consistently would keep this spec safe if it ever 
shares a DB. Also the `import ... {USER, WORKFLOW, WORKFLOW_VERSION}` 
mid-method reads oddly; it can join the imports at the top of the file. (nit)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to