This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-region-restart
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-region-restart by this 
push:
     new 20d4bb8bef Add region restart tests
20d4bb8bef is described below

commit 20d4bb8befdc548ae2f4acbe6a9819708e4488e0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 21 18:44:28 2026 -0700

    Add region restart tests
---
 .../control/utils/TrivialControlTester.scala       | 14 +++++---
 .../messaginglayer/NetworkInputGatewaySpec.scala   | 12 +++++++
 .../workflow/WorkflowExecutionsResourceSpec.scala  | 41 ++++++++++++++++++++++
 3 files changed, 63 insertions(+), 4 deletions(-)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
index e236ac760b..ca86338d36 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/control/utils/TrivialControlTester.scala
@@ -24,7 +24,10 @@ import 
org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, Chann
 import 
org.apache.texera.amber.engine.architecture.common.WorkflowActor.NetworkAck
 import org.apache.texera.amber.engine.architecture.common.{AmberProcessor, 
WorkflowActor}
 import 
org.apache.texera.amber.engine.architecture.control.utils.TrivialControlTester.ControlTesterRPCClient
-import 
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+  NetworkInputGateway,
+  NetworkOutputGateway
+}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.AsyncRPCContext
 import 
org.apache.texera.amber.engine.architecture.rpc.testerservice.RPCTesterFs2Grpc
 import org.apache.texera.amber.engine.common.CheckpointState
@@ -37,8 +40,11 @@ import org.apache.texera.amber.engine.common.ambermessage.{
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 
 object TrivialControlTester {
-  class ControlTesterRPCClient(outputGateway: NetworkOutputGateway, actorId: 
ActorVirtualIdentity)
-      extends AsyncRPCClient(outputGateway, actorId) {
+  class ControlTesterRPCClient(
+      inputGateway: NetworkInputGateway,
+      outputGateway: NetworkOutputGateway,
+      actorId: ActorVirtualIdentity
+  ) extends AsyncRPCClient(inputGateway, outputGateway, actorId) {
     val getProxy: RPCTesterFs2Grpc[Future, AsyncRPCContext] =
       AsyncRPCClient
         .createProxy[RPCTesterFs2Grpc[Future, AsyncRPCContext]](createPromise, 
outputGateway)
@@ -55,7 +61,7 @@ class TrivialControlTester(
       case Right(value) => transferService.send(value)
     }
   ) {
-    override val asyncRPCClient = new ControlTesterRPCClient(outputGateway, id)
+    override val asyncRPCClient = new ControlTesterRPCClient(inputGateway, 
outputGateway, id)
   }
   val initializer =
     new TesterAsyncRPCHandlerInitializer(ap.actorId, ap.asyncRPCClient, 
ap.asyncRPCServer)
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
index 1679144354..04f4f00045 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGatewaySpec.scala
@@ -84,4 +84,16 @@ class NetworkInputGatewaySpec extends AnyFlatSpec with 
MockFactory {
 
   }
 
+  "network input port" should "remove control channel by sender" in {
+    val inputPort = new NetworkInputGateway(fakeReceiverID)
+    val controlChannelId = ChannelIdentity(fakeSenderID, fakeReceiverID, 
isControl = true)
+    inputPort.getChannel(controlChannelId)
+
+    assert(inputPort.getAllControlChannels.size == 1)
+
+    inputPort.removeControlChannel(fakeSenderID)
+
+    assert(inputPort.getAllControlChannels.isEmpty)
+  }
+
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
 
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
index 1bd7fcdf6c..973a76cab8 100644
--- 
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
@@ -19,6 +19,13 @@
 
 package org.apache.texera.web.resource.dashboard.user.workflow
 
+import org.apache.texera.amber.core.virtualidentity.{
+  ExecutionIdentity,
+  OperatorIdentity,
+  PhysicalOpIdentity
+}
+import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
 import org.apache.texera.dao.MockTexeraDB
 import org.apache.texera.dao.jooq.generated.Tables._
 import org.apache.texera.dao.jooq.generated.tables.daos.{
@@ -36,6 +43,7 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{
 import org.scalatest.flatspec.AnyFlatSpec
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, 
PrivateMethodTester}
 
+import java.net.URI
 import java.sql.Timestamp
 import java.util.UUID
 import java.util.concurrent.TimeUnit
@@ -176,4 +184,37 @@ class WorkflowExecutionsResourceSpec
     )
   }
 
+  "WorkflowExecutionsResource.insertOperatorPortResultUri" should "ignore 
duplicate inserts" in {
+    val execution = new WorkflowExecutions
+    execution.setVid(testVersion.getVid)
+    execution.setUid(testUser.getUid)
+    execution.setStatus(0.toByte)
+    execution.setResult("")
+    execution.setStartingTime(new Timestamp(System.currentTimeMillis()))
+    execution.setBookmarked(false)
+    execution.setName("Execution with duplicate result URI insert")
+    execution.setEnvironmentVersion("test-env-1.0")
+    workflowExecutionsDao.insert(execution)
+
+    val executionId = ExecutionIdentity(execution.getEid.longValue())
+    val globalPortId = GlobalPortIdentity(
+      PhysicalOpIdentity(OperatorIdentity("operator-1"), "main"),
+      PortIdentity(),
+      input = false
+    )
+    val uri = URI.create("vfs:///test-result")
+
+    WorkflowExecutionsResource.insertOperatorPortResultUri(executionId, 
globalPortId, uri)
+    WorkflowExecutionsResource.insertOperatorPortResultUri(executionId, 
globalPortId, uri)
+
+    val rows = getDSLContext
+      .selectFrom(OPERATOR_PORT_EXECUTIONS)
+      
.where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid))
+      
.and(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString))
+      .fetch()
+
+    assert(rows.size() == 1)
+    assert(rows.get(0).getResultUri == uri.toString)
+  }
+
 }

Reply via email to