This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new a8afdc487a test(compiling-service): add direct unit tests for
WorkflowCompiler (#5022)
a8afdc487a is described below
commit a8afdc487aaa8a1befca3cc4db61584b21d5a848
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 31 16:59:08 2026 -0700
test(compiling-service): add direct unit tests for WorkflowCompiler (#5022)
### What changes were proposed in this PR?
Add direct unit coverage for `workflow-compiling-service`'s
`WorkflowCompiler` lenient-mode contract: `compile` never throws,
per-operator failures accumulate into `operatorIdToError`, and
`physicalPlan` is `None` whenever any error occurred. Only the REST
happy path is covered today.
The spec invokes the compiler directly rather than the REST resource,
sidestepping a separate response-serialization NPE surfaced while
writing these tests (apache/texera#5021). The contract tests stay green
once that bug is fixed.
### Any related issues, documentation, discussions?
Closes #5020. References #5021.
### How was this PR tested?
`sbt "WorkflowCompilingService/testOnly
org.apache.texera.amber.compiler.WorkflowCompilerSpec"` — all green.
`scalafmtCheck` and `compile` clean.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7 (1M context)
---------
Signed-off-by: Yicong Huang <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../amber/compiler/WorkflowCompilerSpec.scala | 317 +++++++++++++++++++++
.../resource/WorkflowCompilationResourceSpec.scala | 260 ++++++-----------
2 files changed, 410 insertions(+), 167 deletions(-)
diff --git
a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala
new file mode 100644
index 0000000000..ee221728d3
--- /dev/null
+++
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.compiler
+
+import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo}
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
+import org.apache.texera.amber.core.virtualidentity.WorkflowIdentity
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import
org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.COMPILATION_ERROR
+import org.apache.texera.amber.operator.filter.{
+ ComparisonType,
+ FilterPredicate,
+ SpecializedFilterOpDesc
+}
+import org.apache.texera.amber.operator.limit.LimitOpDesc
+import org.apache.texera.amber.operator.projection.{AttributeUnit,
ProjectionOpDesc}
+import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
+import org.scalatest.flatspec.AnyFlatSpec
+
+/**
+ * Direct unit coverage for the editing-time [[WorkflowCompiler]].
+ *
+ * Owns *compiler-behavior* tests — schema propagation through multi-op
+ * chains, lenient-mode error accumulation, terminal-storage selection.
+ * `WorkflowCompilationResourceSpec` owns *resource-layer* tests — HTTP
+ * status, response type discriminator, JSON envelope. Drawing the line
+ * here keeps each spec focused on a single failure axis.
+ *
+ * Bypassing the resource layer also sidesteps a separate NPE in response
+ * serialization (apache/texera#5021); these compiler-level tests stay
+ * green once that bug is fixed.
+ */
+class WorkflowCompilerSpec extends AnyFlatSpec {
+
+ private def newContext(): WorkflowContext =
+ new WorkflowContext(workflowId = WorkflowIdentity(0))
+
+ private def csvOp(fileName: String): CSVScanSourceOpDesc = {
+ val op = new CSVScanSourceOpDesc()
+ op.fileName = Some(fileName)
+ op.customDelimiter = Some(",")
+ op.hasHeader = true
+ op
+ }
+
+ private def csvOpNoFile(): CSVScanSourceOpDesc = {
+ val op = new CSVScanSourceOpDesc()
+ op.customDelimiter = Some(",")
+ op.hasHeader = true
+ op
+ }
+
+ private def projectOp(columns: List[String]): ProjectionOpDesc = {
+ val op = new ProjectionOpDesc()
+ op.attributes = columns.map(name => new AttributeUnit(name, ""))
+ op.isDrop = false
+ op
+ }
+
+ private def filterOp(predicates: FilterPredicate*): SpecializedFilterOpDesc
= {
+ val op = new SpecializedFilterOpDesc
+ op.predicates = predicates.toList
+ op
+ }
+
+ private def limitOp(limit: Int): LimitOpDesc = {
+ val op = new LimitOpDesc
+ op.limit = limit
+ op
+ }
+
+ private val realCsvPath =
+ "workflow-compiling-service/src/test/resources/country_sales_small.csv"
+
+ // -------------------- happy path --------------------
+
+ "WorkflowCompiler" should "produce a populated physicalPlan and no errors
for a well-formed plan" in {
+ val csv = csvOp(realCsvPath)
+ val proj = projectOp(List("Region", "Total Profit"))
+ val result = new WorkflowCompiler(newContext()).compile(
+ LogicalPlanPojo(
+ operators = List(csv, proj),
+ links = List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(0),
+ proj.operatorIdentifier,
+ PortIdentity(0)
+ )
+ ),
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
+ )
+
+ assert(result.physicalPlan.isDefined, "happy path should yield a physical
plan")
+ assert(result.operatorIdToError.isEmpty, s"unexpected errors:
${result.operatorIdToError}")
+ // Schema for both operators' output ports should be populated and
non-null —
+ // this is the property whose violation triggers the resource-level NPE.
+ val projSchemas = result.operatorIdToOutputSchemas(proj.operatorIdentifier)
+ assert(projSchemas.values.forall(s => s.isDefined && s.get != null))
+ }
+
+ it should "propagate schemas through a csv -> projection -> limit -> filter
-> filter -> limit chain" in {
+ // Real-world editing-shape: source then filter/limit/project ops. Asserts
+ // the compiler threads schema through every link so the frontend sees the
+ // projected columns at every downstream port. Previously this lived in
+ // WorkflowCompilationResourceSpec as an HTTP test, but the property being
+ // pinned is compiler-level (schema propagation) — the REST envelope adds
+ // no signal.
+ val csv = csvOp(realCsvPath)
+ val proj = projectOp(List("Region", "Total Profit"))
+ val limit1 = limitOp(10)
+ val filter1 =
+ filterOp(new FilterPredicate("Total Profit",
ComparisonType.GREATER_THAN, "10000"))
+ val filter2 = filterOp(new FilterPredicate("Region",
ComparisonType.NOT_EQUAL_TO, "JPN"))
+ val limit2 = limitOp(5)
+
+ val result = new WorkflowCompiler(newContext()).compile(
+ LogicalPlanPojo(
+ operators = List(csv, proj, limit1, filter1, filter2, limit2),
+ links = List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(0),
+ proj.operatorIdentifier,
+ PortIdentity(0)
+ ),
+ LogicalLink(
+ proj.operatorIdentifier,
+ PortIdentity(0),
+ limit1.operatorIdentifier,
+ PortIdentity(0)
+ ),
+ LogicalLink(
+ limit1.operatorIdentifier,
+ PortIdentity(0),
+ filter1.operatorIdentifier,
+ PortIdentity(0)
+ ),
+ LogicalLink(
+ filter1.operatorIdentifier,
+ PortIdentity(0),
+ filter2.operatorIdentifier,
+ PortIdentity(0)
+ ),
+ LogicalLink(
+ filter2.operatorIdentifier,
+ PortIdentity(0),
+ limit2.operatorIdentifier,
+ PortIdentity(0)
+ )
+ ),
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
+ )
+
+ assert(result.physicalPlan.isDefined)
+ assert(result.operatorIdToError.isEmpty, s"unexpected errors:
${result.operatorIdToError}")
+ // Projection narrowed [Region, Country, ..., Total Profit] down to two
+ // columns; every downstream op should see exactly those two attributes.
+ val filter2Schemas =
result.operatorIdToOutputSchemas(filter2.operatorIdentifier)
+ val outputAttrs = filter2Schemas(PortIdentity(0)).get.attributes
+ assert(
+ outputAttrs == List(
+ new Attribute("Region", AttributeType.STRING),
+ new Attribute("Total Profit", AttributeType.DOUBLE)
+ ),
+ s"projected schema should reach filter2 unchanged, got $outputAttrs"
+ )
+ }
+
+ // -------------------- lenient-mode error accumulation --------------------
+
+ // The frontend relies on `compile` *never throwing*: a user mid-edit
+ // routinely produces an inconsistent plan and the editing UI must render
+ // structured per-operator errors. These tests pin the contract.
+
+ "WorkflowCompiler" should "accumulate, not throw, when a scan source has no
fileName" in {
+ val orphan = csvOpNoFile()
+
+ val result = new WorkflowCompiler(newContext()).compile(
+ LogicalPlanPojo(
+ operators = List(orphan),
+ links = List.empty,
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
+ )
+
+ assert(result.physicalPlan.isEmpty, "any error must clear the physical
plan")
+ val err = result.operatorIdToError(orphan.operatorIdentifier)
+ assert(err.`type` == COMPILATION_ERROR)
+ assert(err.operatorId == orphan.operatorIdentifier.id)
+ assert(err.message.contains("no input file name"), s"unexpected message:
${err.message}")
+ assert(err.details.nonEmpty, "stack-trace details should be populated for
UI display")
+ }
+
+ it should "accumulate when a scan source's fileName points to a non-existent
path" in {
+ val broken = csvOp("/does/not/exist/missing.csv")
+
+ val result = new WorkflowCompiler(newContext()).compile(
+ LogicalPlanPojo(
+ operators = List(broken),
+ links = List.empty,
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
+ )
+
+ assert(result.physicalPlan.isEmpty)
+ assert(result.operatorIdToError.contains(broken.operatorIdentifier))
+ // FileResolver.resolve falls through both resolvers and rethrows
+ // org.apache.commons.vfs2.FileNotFoundException(fileName); its message
bundle
+ // renders as `Could not read from "<path>" because it is not a file.`, so
the
+ // only stable substring across that wording and any
java.io.FileNotFoundException
+ // fallback is the bad path itself.
+ assert(
+
result.operatorIdToError(broken.operatorIdentifier).message.contains("missing.csv"),
+ s"unexpected message:
${result.operatorIdToError(broken.operatorIdentifier).message}"
+ )
+ }
+
+ it should "accumulate a per-operator error when projection references a
non-existent attribute" in {
+ val csv = csvOp(realCsvPath)
+ val badProjection = projectOp(List("DoesNotExist"))
+
+ val result = new WorkflowCompiler(newContext()).compile(
+ LogicalPlanPojo(
+ operators = List(csv, badProjection),
+ links = List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(0),
+ badProjection.operatorIdentifier,
+ PortIdentity(0)
+ )
+ ),
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
+ )
+
+ assert(result.physicalPlan.isEmpty)
+ assert(
+ result.operatorIdToError.contains(badProjection.operatorIdentifier),
+ s"projection should be in errors, got ${result.operatorIdToError.keySet}"
+ )
+ // The upstream csv ran fine, so its output schema should still be present
+ // — partial progress is the whole point of lenient mode.
+ assert(
+ result.operatorIdToOutputSchemas.contains(csv.operatorIdentifier),
+ "upstream csv's schemas should be retained even when downstream fails"
+ )
+ }
+
+ it should "not throw when given an empty plan" in {
+ val result = new WorkflowCompiler(newContext()).compile(
+ LogicalPlanPojo(
+ operators = List.empty,
+ links = List.empty,
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
+ )
+ assert(result.operatorIdToError.isEmpty)
+ assert(result.operatorIdToOutputSchemas.isEmpty)
+ assert(result.physicalPlan.isDefined, "an empty plan compiles to an empty
physical plan")
+ assert(result.physicalPlan.get.operators.isEmpty)
+ assert(result.physicalPlan.get.links.isEmpty)
+ }
+
+ // -------------------- multi-error accumulation --------------------
+
+ // Re-anchor the subject after the sub-section.
+ "WorkflowCompiler" should "accumulate errors for multiple unrelated failing
ops in one compile" in {
+ val orphan1 = csvOpNoFile()
+ val orphan2 = csvOpNoFile()
+
+ val result = new WorkflowCompiler(newContext()).compile(
+ LogicalPlanPojo(
+ operators = List(orphan1, orphan2),
+ links = List.empty,
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
+ )
+
+ assert(result.physicalPlan.isEmpty)
+ // Both ops must appear in the error map — the frontend renders per-op
+ // diagnostics in parallel, so swallowing all-but-one would silently break
+ // multi-error workflows.
+ assert(
+ result.operatorIdToError.contains(orphan1.operatorIdentifier) &&
+ result.operatorIdToError.contains(orphan2.operatorIdentifier),
+ s"expected both csvs in errors, got ${result.operatorIdToError.keySet}"
+ )
+ }
+}
diff --git
a/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala
b/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala
index 87246fd7f3..fd99c3d27d 100644
---
a/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala
+++
b/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala
@@ -24,23 +24,25 @@ import io.dropwizard.testing.junit5.ResourceExtension
import jakarta.ws.rs.client.Entity
import jakarta.ws.rs.core.{MediaType, Response}
import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo}
-import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
import org.apache.texera.amber.core.workflow.PortIdentity
-import org.apache.texera.amber.operator.filter.{
- ComparisonType,
- FilterOpDesc,
- FilterPredicate,
- SpecializedFilterOpDesc
-}
-import org.apache.texera.amber.operator.limit.LimitOpDesc
import org.apache.texera.amber.operator.projection.{AttributeUnit,
ProjectionOpDesc}
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
import org.apache.texera.amber.util.JSONUtils.objectMapper
-import org.apache.texera.amber.util.serde.PortIdentityKeySerializer
import org.assertj.core.api.Assertions.assertThat
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
+/**
+ * Resource-layer tests for `/compile`. Owns only what the REST envelope
+ * adds on top of compilation itself: HTTP status, the
+ * `@JsonTypeInfo` discriminator that lets the frontend route success vs
+ * failure, and the JSON shape the resource expects on the wire.
+ *
+ * All compiler-behavior assertions (schema propagation, lenient-mode
+ * error accumulation, multi-op chains) live in
+ * `org.apache.texera.amber.compiler.WorkflowCompilerSpec` so a regression
+ * lands on the right spec.
+ */
class WorkflowCompilationResourceSpec extends AnyFlatSpec with
BeforeAndAfterAll {
private val resources: ResourceExtension = ResourceExtension
@@ -48,193 +50,117 @@ class WorkflowCompilationResourceSpec extends AnyFlatSpec
with BeforeAndAfterAll
.addResource(new WorkflowCompilationResource())
.setMapper(objectMapper)
.build()
- override protected def beforeAll(): Unit = {
- resources.before()
- }
- override protected def afterAll(): Unit = {
- resources.after()
- }
+ override protected def beforeAll(): Unit = resources.before()
+ override protected def afterAll(): Unit = resources.after()
- // utility function to create a csv scan op
- private def getCsvScanOpDesc(
- fileName: String,
- header: Boolean
- ): CSVScanSourceOpDesc = {
- val csvOp = new CSVScanSourceOpDesc()
- csvOp.fileName = Some(fileName)
- csvOp.customDelimiter = Some(",")
- csvOp.hasHeader = header
- csvOp
- }
+ private val realCsvPath =
+ "workflow-compiling-service/src/test/resources/country_sales_small.csv"
- // utility function to create a projection op
- private def getProjectionOpDesc(
- attributeNames: List[String],
- isDrop: Boolean = false
- ): ProjectionOpDesc = {
- val projectionOpDesc = new ProjectionOpDesc()
- projectionOpDesc.attributes = attributeNames.map(name => new
AttributeUnit(name, ""))
- projectionOpDesc.isDrop = isDrop
- projectionOpDesc
+ private def csvOp(fileName: String): CSVScanSourceOpDesc = {
+ val op = new CSVScanSourceOpDesc()
+ op.fileName = Some(fileName)
+ op.customDelimiter = Some(",")
+ op.hasHeader = true
+ op
}
- // utility function to create a limit op
- private def getLimitOpDesc(
- limit: Int
- ): LimitOpDesc = {
- val limitOpDesc = new LimitOpDesc
- limitOpDesc.limit = limit
- limitOpDesc
+ private def projectOp(columns: List[String]): ProjectionOpDesc = {
+ val op = new ProjectionOpDesc()
+ op.attributes = columns.map(name => new AttributeUnit(name, ""))
+ op.isDrop = false
+ op
}
- // utility function to create a filter op
- private def getFilterOpDesc(
- filterPredicates: List[FilterPredicate]
- ): FilterOpDesc = {
- val filterOpDesc = new SpecializedFilterOpDesc
- filterOpDesc.predicates = filterPredicates
- filterOpDesc
- }
-
- // utility function to transform a logical plan pojo to json that can be
deserialized correctly by the compile endpoint
- private def transformLogicalPlanPojoToJsonString(logicalPlanPojo:
LogicalPlanPojo): String = {
- val jsonNode = objectMapper.valueToTree[ObjectNode](logicalPlanPojo)
-
- // iterate over the "links" array and replace nested "id" fields
+ // The frontend serializes LogicalLink with `fromOpId` / `toOpId` as flat
+ // strings, but the Scala case class stores them as nested `OperatorIdentity`
+ // records. This helper mirrors the wire shape so the test exercises the
+ // resource's actual JSON contract instead of a Scala-only round trip.
+ private def encodePojoAsFrontendJson(pojo: LogicalPlanPojo): String = {
+ val jsonNode = objectMapper.valueToTree[ObjectNode](pojo)
val linksArray = jsonNode.withArray("links")
linksArray.forEach { linkNode =>
- // replace "fromOpId" with its "id" field value
val fromOpIdNode = linkNode.get("fromOpId")
linkNode.asInstanceOf[ObjectNode].put("fromOpId",
fromOpIdNode.get("id").asText())
-
- // replace "toOpId" with its "id" field value if it exists
val toOpIdNode = linkNode.get("toOpId")
linkNode.asInstanceOf[ObjectNode].put("toOpId",
toOpIdNode.get("id").asText())
}
-
- // convert the modified JSON node back to a string
objectMapper.writeValueAsString(jsonNode)
}
- // utility function for asserting the successful compilation
- private def assertSuccessfulCompilation(response: Response):
WorkflowCompilationSuccess = {
- val responseBody = response.readEntity(classOf[String])
- val compilationResponse =
- objectMapper.readValue(responseBody,
classOf[WorkflowCompilationResponse])
-
- assertThat(compilationResponse.asInstanceOf[WorkflowCompilationSuccess])
- compilationResponse.asInstanceOf[WorkflowCompilationSuccess]
+ private def postCompile(pojo: LogicalPlanPojo): Response =
+ resources
+ .target("/compile")
+ .request(MediaType.APPLICATION_JSON)
+ .post(Entity.json(encodePojoAsFrontendJson(pojo)))
+
+ "POST /compile" should "return HTTP 200 for a well-formed plan" in {
+ val csv = csvOp(realCsvPath)
+ val proj = projectOp(List("Region", "Total Profit"))
+ val response = postCompile(
+ LogicalPlanPojo(
+ operators = List(csv, proj),
+ links = List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(0),
+ proj.operatorIdentifier,
+ PortIdentity(0)
+ )
+ ),
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
+ )
+ assertThat(response.getStatus).isEqualTo(200)
}
- it should "compile workflow successfully with multiple filter and limit
operations" in {
- // construct the LogicalPlan: CSVScan --> Projection --> Limit --> Filter
(TotalProfit > 10000) --> Filter (Region != "JPN") --> Limit
- val localCsvFilePath =
- "workflow-compiling-service/src/test/resources/country_sales_small.csv"
- val csvSourceOp = getCsvScanOpDesc(localCsvFilePath, header = true)
- val projectionOpDesc = getProjectionOpDesc(List("Region", "Total Profit"))
- val limitOpDesc1 = getLimitOpDesc(10)
-
- // Create the filter predicate for TotalProfit > 10000
- val filterPredicate1 = new FilterPredicate("Total Profit",
ComparisonType.GREATER_THAN, "10000")
- val filterOpDesc1 = getFilterOpDesc(List(filterPredicate1))
-
- // Create the filter predicate for Region != "JPN"
- val filterPredicate2 = new FilterPredicate("Region",
ComparisonType.NOT_EQUAL_TO, "JPN")
- val filterOpDesc2 = getFilterOpDesc(List(filterPredicate2))
-
- // Add a second limit operation
- val limitOpDesc2 = getLimitOpDesc(5)
-
- val logicalPlanPojo = LogicalPlanPojo(
- operators = List(
- csvSourceOp,
- projectionOpDesc,
- limitOpDesc1,
- filterOpDesc1,
- filterOpDesc2,
- limitOpDesc2
- ),
- links = List(
- LogicalLink(
- csvSourceOp.operatorIdentifier,
- PortIdentity(0),
- projectionOpDesc.operatorIdentifier,
- PortIdentity(0)
+ it should "tag the response body with type=success so the frontend can route
on the discriminator" in {
+ // The @JsonTypeInfo on WorkflowCompilationResponse writes a `type` field.
+ // Both polymorphic deserialization (round-tripping through the trait) and
+ // a raw-JSON `type == "success"` check need to hold so the Angular client
+ // can branch without depending on Scala class names.
+ val csv = csvOp(realCsvPath)
+ val proj = projectOp(List("Region", "Total Profit"))
+ val response = postCompile(
+ LogicalPlanPojo(
+ operators = List(csv, proj),
+ links = List(
+ LogicalLink(
+ csv.operatorIdentifier,
+ PortIdentity(0),
+ proj.operatorIdentifier,
+ PortIdentity(0)
+ )
),
- LogicalLink(
- projectionOpDesc.operatorIdentifier,
- PortIdentity(0),
- limitOpDesc1.operatorIdentifier,
- PortIdentity(0)
- ),
- LogicalLink(
- limitOpDesc1.operatorIdentifier,
- PortIdentity(0),
- filterOpDesc1.operatorIdentifier,
- PortIdentity(0)
- ),
- LogicalLink(
- filterOpDesc1.operatorIdentifier,
- PortIdentity(0),
- filterOpDesc2.operatorIdentifier,
- PortIdentity(0)
- ),
- LogicalLink(
- filterOpDesc2.operatorIdentifier,
- PortIdentity(0),
- limitOpDesc2.operatorIdentifier,
- PortIdentity(0)
- )
- ),
- opsToViewResult = List(),
- opsToReuseResult = List()
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
)
- // transform the LogicalPlanPojo to a modified JSON string
- val modifiedLogicalPlanJsonString =
transformLogicalPlanPojoToJsonString(logicalPlanPojo)
-
- // send the request to compile endpoint
- val response = resources
- .target("/compile")
- .request(MediaType.APPLICATION_JSON)
- .post(Entity.json(modifiedLogicalPlanJsonString))
-
- assertThat(response.getStatus).isEqualTo(200)
-
- // verify the schema is correctly propagated for the final limit operator
- val compilationResult = assertSuccessfulCompilation(response)
- val finalLimitInputSchema =
-
compilationResult.operatorOutputSchemas(filterOpDesc2.operatorIdentifier.id)
+ val body = response.readEntity(classOf[String])
+ val node = objectMapper.readTree(body)
assert(
- finalLimitInputSchema(
- PortIdentityKeySerializer.portIdToString(PortIdentity(id = 0, internal
= false))
- ).get.equals(
- List(
- new Attribute("Region", AttributeType.STRING),
- new Attribute("Total Profit", AttributeType.DOUBLE)
- )
- )
+ node.has("type") && node.get("type").asText() == "success",
+ s"expected type:success discriminator, got $body"
)
+ val parsed = objectMapper.readValue(body,
classOf[WorkflowCompilationResponse])
+ assert(parsed.isInstanceOf[WorkflowCompilationSuccess])
+ val success = parsed.asInstanceOf[WorkflowCompilationSuccess]
+ assert(success.physicalPlan != null)
+ assert(success.operatorOutputSchemas.nonEmpty)
}
it should "return WorkflowCompilationFailure (not HTTP 500) when a scan
source file cannot be resolved" in {
- val brokenCsv = getCsvScanOpDesc("/does/not/exist/missing.csv", header =
true)
-
- val logicalPlanPojo = LogicalPlanPojo(
- operators = List(brokenCsv),
- links = List(),
- opsToViewResult = List(),
- opsToReuseResult = List()
+ val response = postCompile(
+ LogicalPlanPojo(
+ operators = List(csvOp("/does/not/exist/missing.csv")),
+ links = List.empty,
+ opsToViewResult = List.empty,
+ opsToReuseResult = List.empty
+ )
)
- val modifiedLogicalPlanJsonString =
transformLogicalPlanPojoToJsonString(logicalPlanPojo)
-
- val response = resources
- .target("/compile")
- .request(MediaType.APPLICATION_JSON)
- .post(Entity.json(modifiedLogicalPlanJsonString))
-
// Must not surface as HTTP 500 — the error must come back as a structured
failure.
assertThat(response.getStatus).isEqualTo(200)