aglinxinyuan commented on code in PR #5712:
URL: https://github.com/apache/texera/pull/5712#discussion_r3409145491
##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -78,6 +79,66 @@ object TestUtils {
)
}
+ /**
+ * Resolve and read each operator's external RESULT document at
`executionId`,
+ * applying `extract` to the opened document. Operators with no external
+ * RESULT uri (e.g. one whose output wasn't materialized) are omitted.
Shared
+ * by the e2e specs so the lookup-open-extract block doesn't drift between
+ * copies.
+ */
+ def readMaterializedResults[T](
+ executionId: ExecutionIdentity,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T
+ ): Map[OperatorIdentity, T] =
+ operatorIds.flatMap { opId =>
+ getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri
=>
+ opId -> extract(
+
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ )
+ }
+ }.toMap
+
+ /**
+ * Run `workflow` to COMPLETED, then read the requested operators'
materialized
+ * results via `readMaterializedResults`. A FatalError aborts the run and is
+ * surfaced as the exception from the completion await. Shared by the simple
+ * "run and read" e2e specs (e.g. DataProcessingSpec, LoopIntegrationSpec);
+ * specs that drive the run differently (e.g. reconfiguration's
pause/resume)
+ * call `readMaterializedResults` directly inside their own completion
callback.
+ */
+ def runWorkflowAndReadResults[T](
+ system: ActorSystem,
+ workflow: Workflow,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T,
+ completionTimeout: Duration = Duration.fromMinutes(1)
+ ): Map[OperatorIdentity, T] = {
+ val client = new AmberClient(
+ system,
+ workflow.context,
+ workflow.physicalPlan,
+ ControllerConfig.default,
+ _ => {}
+ )
+ val completion = Promise[Unit]()
+ var results: Map[OperatorIdentity, T] = Map.empty
+ client.registerCallback[FatalError](evt => {
+ completion.setException(evt.e)
+ client.shutdown()
+ })
+ client.registerCallback[ExecutionStateUpdate](evt => {
+ if (evt.state == COMPLETED) {
+ results = readMaterializedResults(workflow.context.executionId,
operatorIds, extract)
+ completion.setDone()
+ }
+ })
+ Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+ Await.result(completion, completionTimeout)
+ client.shutdown()
+ results
+ }
Review Comment:
Done in `14bf08a2`. These were all inherited verbatim from the original
`DataProcessingSpec.executeWorkflow` this PR extracted (so long-standing rather
than new here), but since it's now the shared harness, fixed once for every
caller:
- The result rides the `Promise[Map[OperatorIdentity, T]]` (dropped the
mutable `var`).
- `AmberClient`'s `errorHandler` now fails the promise — `e =>
completion.updateIfEmpty(Throw(e))`.
- Every terminal path uses `updateIfEmpty`, so a second/late event can't
throw inside a callback and get swallowed; a read failure inside the COMPLETED
callback now fails the promise via `Try(...)` instead of hanging into a timeout.
- `client.shutdown()` runs in a `finally`, so a timeout or error can't leak
the client's actors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]