This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 79a1d170b7 test(amber): cover WorkflowExecutionsResource static
helpers (#5205)
79a1d170b7 is described below
commit 79a1d170b7bcf803f606f81fdc93d15bbc64fb99
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 25 16:27:30 2026 -0700
test(amber): cover WorkflowExecutionsResource static helpers (#5205)
### What changes were proposed in this PR?
Extends `WorkflowExecutionsResourceSpec` to cover the companion-object
helpers that the JAX-RS endpoints lean on. The existing spec covered two
of them; this PR adds cases for status-filtered execution listing,
latest-execution lookup by `(wid, cuid)`, the expired-execution scan,
`insertOperatorExecutions`, `updateRuntimeStatsUri` (including the
no-match branch), the per-eid URI fetchers
(`getResultUrisByExecutionId`, `getConsoleMessagesUriByExecutionId`,
`getRuntimeStatsUriByExecutionId`) with their null/empty filtering,
`deleteConsoleMessageAndExecutionResultUris`, the DB-delete branch of
`removeAllExecutionFiles`, `updateResultSize`, the VFS-decoding logic in
`getResultUriByLogicalPortId`, and the private BFS-based
`getNonDownloadableOperatorMap` (via `PrivateMethodTester`).
Cleanup is extended to `OPERATOR_PORT_EXECUTIONS`,
`OPERATOR_EXECUTIONS`, `DATASET`, and `WORKFLOW_COMPUTING_UNIT` so
seeded rows don't leak between cases.
No production code is touched.
### Any related issues, documentation, discussions?
Closes #5204.
### How was this PR tested?
Added unit tests for the helpers listed above.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../workflow/WorkflowExecutionsResourceSpec.scala | 583 ++++++++++++++++++++-
1 file changed, 555 insertions(+), 28 deletions(-)
diff --git
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
index bd55124a72..03adc82c96 100644
---
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
@@ -19,24 +19,31 @@
package org.apache.texera.web.resource.dashboard.user.workflow
+import org.apache.texera.amber.core.storage.{VFSResourceType, VFSURIFactory}
import org.apache.texera.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
- PhysicalOpIdentity
+ PhysicalOpIdentity,
+ WorkflowIdentity
}
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
import org.apache.texera.dao.MockTexeraDB
import org.apache.texera.dao.jooq.generated.Tables._
+import org.apache.texera.dao.jooq.generated.enums.WorkflowComputingUnitTypeEnum
import org.apache.texera.dao.jooq.generated.tables.daos.{
+ DatasetDao,
UserDao,
+ WorkflowComputingUnitDao,
WorkflowDao,
WorkflowExecutionsDao,
WorkflowVersionDao
}
import org.apache.texera.dao.jooq.generated.tables.pojos.{
+ Dataset,
User,
Workflow,
+ WorkflowComputingUnit,
WorkflowExecutions,
WorkflowVersion
}
@@ -66,6 +73,8 @@ class WorkflowExecutionsResourceSpec
private var workflowDao: WorkflowDao = _
private var workflowVersionDao: WorkflowVersionDao = _
private var workflowExecutionsDao: WorkflowExecutionsDao = _
+ private var datasetDao: DatasetDao = _
+ private var computingUnitDao: WorkflowComputingUnitDao = _
override protected def beforeAll(): Unit = {
initializeDBAndReplaceDSLContext()
@@ -96,6 +105,8 @@ class WorkflowExecutionsResourceSpec
workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration())
userDao = new UserDao(getDSLContext.configuration())
workflowExecutionsDao = new
WorkflowExecutionsDao(getDSLContext.configuration())
+ datasetDao = new DatasetDao(getDSLContext.configuration())
+ computingUnitDao = new
WorkflowComputingUnitDao(getDSLContext.configuration())
cleanupTestData()
@@ -109,18 +120,41 @@ class WorkflowExecutionsResourceSpec
}
private def cleanupTestData(): Unit = {
+ val vidSubquery = getDSLContext
+ .select(WORKFLOW_VERSION.VID)
+ .from(WORKFLOW_VERSION)
+ .where(WORKFLOW_VERSION.WID.eq(testWorkflowWid))
+
+ // Child tables of WORKFLOW_EXECUTIONS must be wiped before the parent row.
getDSLContext
- .deleteFrom(WORKFLOW_EXECUTIONS)
+ .deleteFrom(OPERATOR_PORT_EXECUTIONS)
+ .where(
+ OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.in(
+ getDSLContext
+ .select(WORKFLOW_EXECUTIONS.EID)
+ .from(WORKFLOW_EXECUTIONS)
+ .where(WORKFLOW_EXECUTIONS.VID.in(vidSubquery))
+ )
+ )
+ .execute()
+
+ getDSLContext
+ .deleteFrom(OPERATOR_EXECUTIONS)
.where(
- WORKFLOW_EXECUTIONS.VID.in(
+ OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.in(
getDSLContext
- .select(WORKFLOW_VERSION.VID)
- .from(WORKFLOW_VERSION)
- .where(WORKFLOW_VERSION.WID.eq(testWorkflowWid))
+ .select(WORKFLOW_EXECUTIONS.EID)
+ .from(WORKFLOW_EXECUTIONS)
+ .where(WORKFLOW_EXECUTIONS.VID.in(vidSubquery))
)
)
.execute()
+ getDSLContext
+ .deleteFrom(WORKFLOW_EXECUTIONS)
+ .where(WORKFLOW_EXECUTIONS.VID.in(vidSubquery))
+ .execute()
+
getDSLContext
.deleteFrom(WORKFLOW_VERSION)
.where(WORKFLOW_VERSION.WID.eq(testWorkflowWid))
@@ -131,6 +165,17 @@ class WorkflowExecutionsResourceSpec
.where(WORKFLOW.WID.eq(testWorkflowWid))
.execute()
+ // Datasets / computing units / extra users may be seeded by individual
cases.
+ getDSLContext
+ .deleteFrom(DATASET)
+
.where(DATASET.OWNER_UID.in(getDSLContext.select(USER.UID).from(USER).where(USER.UID.ne(0))))
+ .execute()
+
+ getDSLContext
+ .deleteFrom(WORKFLOW_COMPUTING_UNIT)
+ .where(WORKFLOW_COMPUTING_UNIT.UID.eq(testUserId))
+ .execute()
+
getDSLContext
.deleteFrom(USER)
.where(USER.UID.eq(testUserId))
@@ -141,24 +186,59 @@ class WorkflowExecutionsResourceSpec
shutdownDB()
}
+ // ─── helpers ──────────────────────────────────────────────────────────────
+
+ private def insertComputingUnit(): WorkflowComputingUnit = {
+ val unit = new WorkflowComputingUnit
+ unit.setUid(testUser.getUid)
+ unit.setName("test-unit-" + UUID.randomUUID().toString.substring(0, 8))
+ unit.setCreationTime(new Timestamp(System.currentTimeMillis()))
+ unit.setType(WorkflowComputingUnitTypeEnum.local)
+ unit.setUri("local://test")
+ unit.setResource("{}")
+ computingUnitDao.insert(unit)
+ unit
+ }
+
+ private def insertExecution(
+ name: String = s"Execution-${UUID.randomUUID().toString.substring(0,
8)}",
+ status: Byte = 0.toByte,
+ result: String = "",
+ logLocation: String = "",
+ startOffsetMillis: Long = 0L,
+ lastUpdateOffsetMillis: Option[Long] = None,
+ cuid: Integer = null,
+ runtimeStatsUri: String = null
+ ): WorkflowExecutions = {
+ val execution = new WorkflowExecutions
+ execution.setVid(testVersion.getVid)
+ execution.setUid(testUser.getUid)
+ execution.setStatus(status)
+ execution.setResult(result)
+ execution.setLogLocation(logLocation)
+ val now = System.currentTimeMillis()
+ execution.setStartingTime(new Timestamp(now - startOffsetMillis))
+ lastUpdateOffsetMillis.foreach(off => execution.setLastUpdateTime(new
Timestamp(now - off)))
+ execution.setBookmarked(false)
+ execution.setName(name)
+ execution.setEnvironmentVersion("test-env-1.0")
+ execution.setCuid(cuid)
+ execution.setRuntimeStatsUri(runtimeStatsUri)
+ workflowExecutionsDao.insert(execution)
+ execution
+ }
+
+ // ─── existing tests (preserved) ───────────────────────────────────────────
+
"WorkflowExecutionsResource.getWorkflowExecutions" should "return executions
with EIDs in descending order" in {
val numExecutions = 10
val executionIds = ArrayBuffer.empty[Integer]
for (i <- 1 to numExecutions) {
- val execution = new WorkflowExecutions
- execution.setVid(testVersion.getVid)
- execution.setUid(testUser.getUid)
- execution.setStatus(0.toByte)
- execution.setResult("")
- execution.setStartingTime(
- new Timestamp(System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(numExecutions - i))
+ val execution = insertExecution(
+ name = s"Execution ${i}",
+ startOffsetMillis = TimeUnit.DAYS.toMillis(numExecutions - i)
)
- execution.setBookmarked(false)
- execution.setName(s"Execution ${i}")
- execution.setEnvironmentVersion("test-env-1.0")
-
- workflowExecutionsDao.insert(execution)
executionIds.append(execution.getEid)
}
@@ -185,16 +265,7 @@ class WorkflowExecutionsResourceSpec
}
"WorkflowExecutionsResource.insertOperatorPortResultUri" should "insert a
result URI row" in {
- val execution = new WorkflowExecutions
- execution.setVid(testVersion.getVid)
- execution.setUid(testUser.getUid)
- execution.setStatus(0.toByte)
- execution.setResult("")
- execution.setStartingTime(new Timestamp(System.currentTimeMillis()))
- execution.setBookmarked(false)
- execution.setName("Execution with duplicate result URI insert")
- execution.setEnvironmentVersion("test-env-1.0")
- workflowExecutionsDao.insert(execution)
+ val execution = insertExecution(name = "Execution with duplicate result
URI insert")
val executionId = ExecutionIdentity(execution.getEid.longValue())
val globalPortId = GlobalPortIdentity(
@@ -216,4 +287,460 @@ class WorkflowExecutionsResourceSpec
assert(rows.get(0).getResultUri == uri.toString)
}
+ // ─── new: status-filtered execution listing ───────────────────────────────
+
+ "getWorkflowExecutions with statusCodes" should "narrow results to the
requested codes" in {
+ insertExecution(status = 1.toByte)
+ insertExecution(status = 2.toByte)
+ insertExecution(status = 1.toByte)
+
+ val onlyStatusOne =
+ WorkflowExecutionsResource.getWorkflowExecutions(
+ testWorkflowWid,
+ getDSLContext,
+ Set(1.toByte)
+ )
+ assert(onlyStatusOne.size == 2)
+ assert(onlyStatusOne.forall(_.status == 1.toByte))
+ }
+
+ // ─── new: getLatestExecutionID ────────────────────────────────────────────
+
+ "getLatestExecutionID" should "return None when no executions exist for the
(wid, cuid) pair" in {
+ val result =
+ WorkflowExecutionsResource.getLatestExecutionID(testWorkflowWid,
Integer.valueOf(999))
+ assert(result.isEmpty)
+ }
+
+ it should "return the largest EID for matching (wid, cuid)" in {
+ // cuid has an FK to WORKFLOW_COMPUTING_UNIT — seed two units.
+ val unitA = insertComputingUnit()
+ val unitB = insertComputingUnit()
+
+ val a = insertExecution(cuid = unitA.getCuid)
+ val b = insertExecution(cuid = unitA.getCuid)
+ // Distractor with a different cuid — should be ignored.
+ insertExecution(cuid = unitB.getCuid)
+
+ val result =
WorkflowExecutionsResource.getLatestExecutionID(testWorkflowWid, unitA.getCuid)
+ assert(result.isDefined)
+ assert(result.get == math.max(a.getEid, b.getEid))
+ }
+
+ // ─── new: getExpiredExecutionsWithResultOrLog ─────────────────────────────
+
+ "getExpiredExecutionsWithResultOrLog" should "match rows that are stale by
starting_time and have a result" in {
+ // Stale-by-starting-time + has result → match.
+ val expired = insertExecution(
+ name = "expired-with-result",
+ result = "some-result",
+ startOffsetMillis = TimeUnit.SECONDS.toMillis(120)
+ )
+ // Fresh starting_time → must not match.
+ insertExecution(name = "fresh", result = "some-result")
+ // Stale but empty result+log → must not match.
+ insertExecution(
+ name = "stale-but-empty",
+ startOffsetMillis = TimeUnit.SECONDS.toMillis(120)
+ )
+
+ val matched =
WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(60)
+
+ val eids = matched.map(_.getEid).toSet
+ assert(eids.contains(expired.getEid))
+ assert(matched.forall(e => e.getResult.nonEmpty ||
Option(e.getLogLocation).exists(_.nonEmpty)))
+ }
+
+ it should "match rows that are stale by last_update_time and have a
log_location" in {
+ val expired = insertExecution(
+ name = "log-stale",
+ logLocation = "file:///tmp/log",
+ lastUpdateOffsetMillis = Some(TimeUnit.SECONDS.toMillis(120))
+ )
+ insertExecution(
+ name = "log-fresh",
+ logLocation = "file:///tmp/log-2",
+ lastUpdateOffsetMillis = Some(0L)
+ )
+
+ val matched =
WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(60)
+ assert(matched.map(_.getEid).toSet.contains(expired.getEid))
+ }
+
+ // ─── new: insertOperatorExecutions ────────────────────────────────────────
+
+ "insertOperatorExecutions" should "insert one OPERATOR_EXECUTIONS row" in {
+ val execution = insertExecution()
+ val uri = URI.create("vfs:///console-msg")
+
+ WorkflowExecutionsResource.insertOperatorExecutions(
+ execution.getEid.longValue(),
+ "op-A",
+ uri
+ )
+
+ val rows = getDSLContext
+ .selectFrom(OPERATOR_EXECUTIONS)
+ .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid))
+ .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq("op-A"))
+ .fetch()
+
+ assert(rows.size() == 1)
+ assert(rows.get(0).getConsoleMessagesUri == uri.toString)
+ }
+
+ // ─── new: updateRuntimeStatsUri ───────────────────────────────────────────
+
+ "updateRuntimeStatsUri" should "set the runtime_stats_uri on the matching
execution" in {
+ val execution = insertExecution()
+ val uri = URI.create("vfs:///runtime-stats")
+
+ WorkflowExecutionsResource.updateRuntimeStatsUri(
+ testWorkflowWid.longValue(),
+ execution.getEid.longValue(),
+ uri
+ )
+
+ val refreshed = workflowExecutionsDao.fetchOneByEid(execution.getEid)
+ assert(refreshed.getRuntimeStatsUri == uri.toString)
+ }
+
+ it should "leave executions belonging to other workflows untouched" in {
+ val execution = insertExecution()
+ val uri = URI.create("vfs:///runtime-stats")
+
+ // wid that does not match the execution's WORKFLOW_VERSION row → no-op.
+ WorkflowExecutionsResource.updateRuntimeStatsUri(
+ (testWorkflowWid + 100000).longValue(),
+ execution.getEid.longValue(),
+ uri
+ )
+
+ val refreshed = workflowExecutionsDao.fetchOneByEid(execution.getEid)
+ assert(refreshed.getRuntimeStatsUri == null)
+ }
+
+ // ─── new: URI fetchers ────────────────────────────────────────────────────
+
+ "getResultUrisByExecutionId" should "return inserted URIs and filter out
null/empty entries" in {
+ val execution = insertExecution()
+ val eid = ExecutionIdentity(execution.getEid.longValue())
+ val opA = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("opA"), "main"),
+ PortIdentity(),
+ input = false
+ )
+ val opB = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("opB"), "main"),
+ PortIdentity(),
+ input = false
+ )
+ val opC = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("opC"), "main"),
+ PortIdentity(),
+ input = false
+ )
+
+ WorkflowExecutionsResource.insertOperatorPortResultUri(eid, opA,
URI.create("vfs:///A"))
+ WorkflowExecutionsResource.insertOperatorPortResultUri(eid, opB,
URI.create("vfs:///B"))
+ // Empty-string URI row — the helper should drop it from the returned list.
+ getDSLContext
+ .insertInto(OPERATOR_PORT_EXECUTIONS)
+ .columns(
+ OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID,
+ OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID,
+ OPERATOR_PORT_EXECUTIONS.RESULT_URI
+ )
+ .values(execution.getEid, opC.serializeAsString, "")
+ .execute()
+
+ val uris = WorkflowExecutionsResource.getResultUrisByExecutionId(eid)
+ assert(uris.toSet == Set(URI.create("vfs:///A"), URI.create("vfs:///B")))
+ }
+
+ "getConsoleMessagesUriByExecutionId" should "return inserted URIs and filter
empty entries" in {
+ val execution = insertExecution()
+ val eid = ExecutionIdentity(execution.getEid.longValue())
+
+ WorkflowExecutionsResource.insertOperatorExecutions(
+ execution.getEid.longValue(),
+ "op-A",
+ URI.create("vfs:///console-A")
+ )
+ WorkflowExecutionsResource.insertOperatorExecutions(
+ execution.getEid.longValue(),
+ "op-B",
+ URI.create("vfs:///console-B")
+ )
+ // Empty-URI row — must be filtered.
+ getDSLContext
+ .insertInto(OPERATOR_EXECUTIONS)
+ .columns(
+ OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID,
+ OPERATOR_EXECUTIONS.OPERATOR_ID,
+ OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI
+ )
+ .values(execution.getEid, "op-C", "")
+ .execute()
+
+ val uris =
WorkflowExecutionsResource.getConsoleMessagesUriByExecutionId(eid)
+ assert(uris.toSet == Set(URI.create("vfs:///console-A"),
URI.create("vfs:///console-B")))
+ }
+
+ "getRuntimeStatsUriByExecutionId" should "return None when the stored URI is
null or empty" in {
+ val noUri = insertExecution()
+ assert(
+ WorkflowExecutionsResource
+
.getRuntimeStatsUriByExecutionId(ExecutionIdentity(noUri.getEid.longValue()))
+ .isEmpty
+ )
+
+ val emptyUri = insertExecution(runtimeStatsUri = "")
+ assert(
+ WorkflowExecutionsResource
+
.getRuntimeStatsUriByExecutionId(ExecutionIdentity(emptyUri.getEid.longValue()))
+ .isEmpty
+ )
+ }
+
+ it should "return Some(URI) when the stored URI is non-empty" in {
+ val withUri = insertExecution(runtimeStatsUri = "vfs:///stats")
+ val result = WorkflowExecutionsResource.getRuntimeStatsUriByExecutionId(
+ ExecutionIdentity(withUri.getEid.longValue())
+ )
+ assert(result.contains(URI.create("vfs:///stats")))
+ }
+
+ // ─── new: deleteConsoleMessageAndExecutionResultUris ──────────────────────
+
+ "deleteConsoleMessageAndExecutionResultUris" should "purge both child tables
for a given eid" in {
+ val execution = insertExecution()
+ val eid = ExecutionIdentity(execution.getEid.longValue())
+
+ val globalPortId = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("op-purge"), "main"),
+ PortIdentity(),
+ input = false
+ )
+ WorkflowExecutionsResource.insertOperatorPortResultUri(
+ eid,
+ globalPortId,
+ URI.create("vfs:///r")
+ )
+ WorkflowExecutionsResource.insertOperatorExecutions(
+ execution.getEid.longValue(),
+ "op-purge",
+ URI.create("vfs:///c")
+ )
+
+ WorkflowExecutionsResource.deleteConsoleMessageAndExecutionResultUris(eid)
+
+ val resultRows = getDSLContext
+ .fetchCount(
+ OPERATOR_PORT_EXECUTIONS,
+ OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid)
+ )
+ val consoleRows = getDSLContext
+ .fetchCount(
+ OPERATOR_EXECUTIONS,
+ OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid)
+ )
+ assert(resultRows == 0)
+ assert(consoleRows == 0)
+ }
+
+ // ─── new: removeAllExecutionFiles (DB delete branch) ──────────────────────
+
+ "removeAllExecutionFiles" should "delete the listed executions from
WORKFLOW_EXECUTIONS" in {
+ val a = insertExecution()
+ val b = insertExecution()
+ // Distractor that should survive.
+ val survivor = insertExecution()
+
+ WorkflowExecutionsResource.removeAllExecutionFiles(Array(a.getEid,
b.getEid))
+
+ val survivors = workflowExecutionsDao.findAll()
+ val survivorEids =
survivors.toArray.map(_.asInstanceOf[WorkflowExecutions].getEid).toSet
+ assert(!survivorEids.contains(a.getEid))
+ assert(!survivorEids.contains(b.getEid))
+ assert(survivorEids.contains(survivor.getEid))
+ }
+
+ // ─── new: updateResultSize ────────────────────────────────────────────────
+
+ "updateResultSize" should "set RESULT_SIZE on the matching (eid,
globalPortId) row" in {
+ val execution = insertExecution()
+ val eid = ExecutionIdentity(execution.getEid.longValue())
+ val globalPortId = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("op-size"), "main"),
+ PortIdentity(),
+ input = false
+ )
+ WorkflowExecutionsResource.insertOperatorPortResultUri(
+ eid,
+ globalPortId,
+ URI.create("vfs:///r")
+ )
+
+ WorkflowExecutionsResource.updateResultSize(eid, globalPortId, 4096L)
+
+ val row = getDSLContext
+ .selectFrom(OPERATOR_PORT_EXECUTIONS)
+
.where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid))
+
.and(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString))
+ .fetchOne()
+ assert(row.getResultSize == 4096)
+ }
+
+ // ─── new: getResultUriByLogicalPortId ─────────────────────────────────────
+
+ "getResultUriByLogicalPortId" should "match by logical operator id, port id,
and resource type" in {
+ val execution = insertExecution()
+ val eid = ExecutionIdentity(execution.getEid.longValue())
+ val wfId = WorkflowIdentity(testWorkflowWid.longValue())
+
+ // Build a real VFS result URI that decodeURI can parse.
+ val targetOpId = OperatorIdentity("target-op")
+ val targetPortId = PortIdentity()
+ val targetGlobalPort = GlobalPortIdentity(
+ PhysicalOpIdentity(targetOpId, "main"),
+ targetPortId,
+ input = false
+ )
+ val targetUri = VFSURIFactory.resultURI(
+ VFSURIFactory.createPortBaseURI(wfId, eid, targetGlobalPort)
+ )
+ WorkflowExecutionsResource.insertOperatorPortResultUri(eid,
targetGlobalPort, targetUri)
+
+ // Distractor: same workflow, different op id.
+ val otherGlobalPort = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("other-op"), "main"),
+ PortIdentity(),
+ input = false
+ )
+ val otherUri = VFSURIFactory.resultURI(
+ VFSURIFactory.createPortBaseURI(wfId, eid, otherGlobalPort)
+ )
+ WorkflowExecutionsResource.insertOperatorPortResultUri(eid,
otherGlobalPort, otherUri)
+
+ val found =
+ WorkflowExecutionsResource.getResultUriByLogicalPortId(eid, targetOpId,
targetPortId)
+ assert(found.contains(targetUri))
+
+ // Sanity-check: the decoded URI is RESULT-typed and matches the target
ids.
+ val (_, _, gpOpt, resType) = VFSURIFactory.decodeURI(found.get)
+ assert(resType == VFSResourceType.RESULT)
+ assert(gpOpt.exists(gp => gp.opId.logicalOpId == targetOpId && gp.portId
== targetPortId))
+ }
+
+ it should "return None when no URI matches the requested op/port" in {
+ val execution = insertExecution()
+ val eid = ExecutionIdentity(execution.getEid.longValue())
+ val found =
+ WorkflowExecutionsResource.getResultUriByLogicalPortId(
+ eid,
+ OperatorIdentity("nope"),
+ PortIdentity()
+ )
+ assert(found.isEmpty)
+ }
+
+ // ─── new: getNonDownloadableOperatorMap (private — via
PrivateMethodTester) ─
+
+ "getNonDownloadableOperatorMap" should "flag operators reading
non-downloadable datasets they don't own" in {
+ // Owner of the non-downloadable dataset is a *different* user than
testUser.
+ val otherUser = new User
+ val otherUid = testUserId + 1
+ otherUser.setUid(otherUid)
+ otherUser.setName("dataset-owner")
+ otherUser.setEmail("[email protected]")
+ otherUser.setPassword("password")
+ userDao.insert(otherUser)
+
+ val dataset = new Dataset
+ dataset.setOwnerUid(otherUid)
+ dataset.setName("LockedDS")
+ dataset.setRepositoryName("repo-locked")
+ dataset.setIsPublic(false)
+ dataset.setIsDownloadable(false)
+ dataset.setDescription("")
+ dataset.setCreationTime(new Timestamp(System.currentTimeMillis()))
+ datasetDao.insert(dataset)
+
+ // Workflow content: scan op A reading the locked dataset, then a
downstream op B.
+ val content =
+ """{
+ | "operators": [
+ | {"operatorID": "scanA", "operatorProperties": {"fileName":
"/[email protected]/LockedDS/v1/data.csv"}},
+ | {"operatorID": "downstreamB", "operatorProperties": {}}
+ | ],
+ | "links": [
+ | {"source": {"operatorID": "scanA"}, "target": {"operatorID":
"downstreamB"}}
+ | ]
+ |}""".stripMargin
+ testWorkflow.setContent(content)
+ workflowDao.update(testWorkflow)
+
+ val privateMethod =
+ PrivateMethod[Map[String, Set[(String,
String)]]](Symbol("getNonDownloadableOperatorMap"))
+ val result = WorkflowExecutionsResource invokePrivate
privateMethod(testWorkflowWid, testUser)
+
+ assert(result.contains("scanA"))
+ assert(result("scanA").contains(("[email protected]", "LockedDS")))
+ // BFS propagates the restriction to the downstream operator.
+ assert(result.contains("downstreamB"))
+ }
+
+ it should "return an empty map when the workflow content is unparseable" in {
+ testWorkflow.setContent("not-json")
+ workflowDao.update(testWorkflow)
+
+ val privateMethod =
+ PrivateMethod[Map[String, Set[(String,
String)]]](Symbol("getNonDownloadableOperatorMap"))
+ val result = WorkflowExecutionsResource invokePrivate
privateMethod(testWorkflowWid, testUser)
+ assert(result.isEmpty)
+ }
+
+ it should "return an empty map when the workflow has no operators
referencing datasets" in {
+ val content =
+ """{"operators": [{"operatorID": "x", "operatorProperties": {}}],
"links": []}"""
+ testWorkflow.setContent(content)
+ workflowDao.update(testWorkflow)
+
+ val privateMethod =
+ PrivateMethod[Map[String, Set[(String,
String)]]](Symbol("getNonDownloadableOperatorMap"))
+ val result = WorkflowExecutionsResource invokePrivate
privateMethod(testWorkflowWid, testUser)
+ assert(result.isEmpty)
+ }
+
+ it should "skip restriction when the current user is the dataset owner" in {
+ // The dataset is owned by testUser ([email protected]), and the operator
points
+ // to /[email protected]/MyDS/v1/file.csv → no restriction even though
+ // is_downloadable=false.
+ val dataset = new Dataset
+ dataset.setOwnerUid(testUserId)
+ dataset.setName("MyDS")
+ dataset.setRepositoryName("repo-my")
+ dataset.setIsPublic(false)
+ dataset.setIsDownloadable(false)
+ dataset.setDescription("")
+ dataset.setCreationTime(new Timestamp(System.currentTimeMillis()))
+ datasetDao.insert(dataset)
+
+ val content =
+ """{
+ | "operators": [
+ | {"operatorID": "scan", "operatorProperties": {"fileName":
"/[email protected]/MyDS/v1/data.csv"}}
+ | ],
+ | "links": []
+ |}""".stripMargin
+ testWorkflow.setContent(content)
+ workflowDao.update(testWorkflow)
+
+ val privateMethod =
+ PrivateMethod[Map[String, Set[(String,
String)]]](Symbol("getNonDownloadableOperatorMap"))
+ val result = WorkflowExecutionsResource invokePrivate
privateMethod(testWorkflowWid, testUser)
+ assert(result.isEmpty)
+ }
+
}