This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5022-e25c28225f25990a4c149b85fa48f8090900cf85
in repository https://gitbox.apache.org/repos/asf/texera.git

commit a8afdc487aaa8a1befca3cc4db61584b21d5a848
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 31 16:59:08 2026 -0700

    test(compiling-service): add direct unit tests for WorkflowCompiler (#5022)
    
    ### What changes were proposed in this PR?
    
    Add direct unit coverage for `workflow-compiling-service`'s
    `WorkflowCompiler` lenient-mode contract: `compile` never throws,
    per-operator failures accumulate into `operatorIdToError`, and
    `physicalPlan` is `None` whenever any error occurred. Only the REST
    happy path is covered today.
    
    The spec invokes the compiler directly rather than the REST resource,
    sidestepping a separate response-serialization NPE surfaced while
    writing these tests (apache/texera#5021). The contract tests stay green
    once that bug is fixed.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5020. References #5021.
    
    ### How was this PR tested?
    
    `sbt "WorkflowCompilingService/testOnly
    org.apache.texera.amber.compiler.WorkflowCompilerSpec"` — all green.
    `scalafmtCheck` and `compile` clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7 (1M context)
    
    ---------
    
    Signed-off-by: Yicong Huang <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 .../amber/compiler/WorkflowCompilerSpec.scala      | 317 +++++++++++++++++++++
 .../resource/WorkflowCompilationResourceSpec.scala | 260 ++++++-----------
 2 files changed, 410 insertions(+), 167 deletions(-)

diff --git 
a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala
 
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala
new file mode 100644
index 0000000000..ee221728d3
--- /dev/null
+++ 
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/WorkflowCompilerSpec.scala
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.compiler
+
+import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo}
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
+import org.apache.texera.amber.core.virtualidentity.WorkflowIdentity
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import 
org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.COMPILATION_ERROR
+import org.apache.texera.amber.operator.filter.{
+  ComparisonType,
+  FilterPredicate,
+  SpecializedFilterOpDesc
+}
+import org.apache.texera.amber.operator.limit.LimitOpDesc
+import org.apache.texera.amber.operator.projection.{AttributeUnit, 
ProjectionOpDesc}
+import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
+import org.scalatest.flatspec.AnyFlatSpec
+
+/**
+  * Direct unit coverage for the editing-time [[WorkflowCompiler]].
+  *
+  * Owns *compiler-behavior* tests — schema propagation through multi-op
+  * chains, lenient-mode error accumulation, terminal-storage selection.
+  * `WorkflowCompilationResourceSpec` owns *resource-layer* tests — HTTP
+  * status, response type discriminator, JSON envelope. Drawing the line
+  * here keeps each spec focused on a single failure axis.
+  *
+  * Bypassing the resource layer also sidesteps a separate NPE in response
+  * serialization (apache/texera#5021); these compiler-level tests stay
+  * green once that bug is fixed.
+  */
+class WorkflowCompilerSpec extends AnyFlatSpec {
+
+  private def newContext(): WorkflowContext =
+    new WorkflowContext(workflowId = WorkflowIdentity(0))
+
+  private def csvOp(fileName: String): CSVScanSourceOpDesc = {
+    val op = new CSVScanSourceOpDesc()
+    op.fileName = Some(fileName)
+    op.customDelimiter = Some(",")
+    op.hasHeader = true
+    op
+  }
+
+  private def csvOpNoFile(): CSVScanSourceOpDesc = {
+    val op = new CSVScanSourceOpDesc()
+    op.customDelimiter = Some(",")
+    op.hasHeader = true
+    op
+  }
+
+  private def projectOp(columns: List[String]): ProjectionOpDesc = {
+    val op = new ProjectionOpDesc()
+    op.attributes = columns.map(name => new AttributeUnit(name, ""))
+    op.isDrop = false
+    op
+  }
+
+  private def filterOp(predicates: FilterPredicate*): SpecializedFilterOpDesc 
= {
+    val op = new SpecializedFilterOpDesc
+    op.predicates = predicates.toList
+    op
+  }
+
+  private def limitOp(limit: Int): LimitOpDesc = {
+    val op = new LimitOpDesc
+    op.limit = limit
+    op
+  }
+
+  private val realCsvPath =
+    "workflow-compiling-service/src/test/resources/country_sales_small.csv"
+
+  // -------------------- happy path --------------------
+
+  "WorkflowCompiler" should "produce a populated physicalPlan and no errors 
for a well-formed plan" in {
+    val csv = csvOp(realCsvPath)
+    val proj = projectOp(List("Region", "Total Profit"))
+    val result = new WorkflowCompiler(newContext()).compile(
+      LogicalPlanPojo(
+        operators = List(csv, proj),
+        links = List(
+          LogicalLink(
+            csv.operatorIdentifier,
+            PortIdentity(0),
+            proj.operatorIdentifier,
+            PortIdentity(0)
+          )
+        ),
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
+    )
+
+    assert(result.physicalPlan.isDefined, "happy path should yield a physical 
plan")
+    assert(result.operatorIdToError.isEmpty, s"unexpected errors: 
${result.operatorIdToError}")
+    // Schema for both operators' output ports should be populated and 
non-null —
+    // this is the property whose violation triggers the resource-level NPE.
+    val projSchemas = result.operatorIdToOutputSchemas(proj.operatorIdentifier)
+    assert(projSchemas.values.forall(s => s.isDefined && s.get != null))
+  }
+
+  it should "propagate schemas through a csv -> projection -> limit -> filter 
-> filter -> limit chain" in {
+    // Real-world editing-shape: source then filter/limit/project ops. Asserts
+    // the compiler threads schema through every link so the frontend sees the
+    // projected columns at every downstream port. Previously this lived in
+    // WorkflowCompilationResourceSpec as an HTTP test, but the property being
+    // pinned is compiler-level (schema propagation) — the REST envelope adds
+    // no signal.
+    val csv = csvOp(realCsvPath)
+    val proj = projectOp(List("Region", "Total Profit"))
+    val limit1 = limitOp(10)
+    val filter1 =
+      filterOp(new FilterPredicate("Total Profit", 
ComparisonType.GREATER_THAN, "10000"))
+    val filter2 = filterOp(new FilterPredicate("Region", 
ComparisonType.NOT_EQUAL_TO, "JPN"))
+    val limit2 = limitOp(5)
+
+    val result = new WorkflowCompiler(newContext()).compile(
+      LogicalPlanPojo(
+        operators = List(csv, proj, limit1, filter1, filter2, limit2),
+        links = List(
+          LogicalLink(
+            csv.operatorIdentifier,
+            PortIdentity(0),
+            proj.operatorIdentifier,
+            PortIdentity(0)
+          ),
+          LogicalLink(
+            proj.operatorIdentifier,
+            PortIdentity(0),
+            limit1.operatorIdentifier,
+            PortIdentity(0)
+          ),
+          LogicalLink(
+            limit1.operatorIdentifier,
+            PortIdentity(0),
+            filter1.operatorIdentifier,
+            PortIdentity(0)
+          ),
+          LogicalLink(
+            filter1.operatorIdentifier,
+            PortIdentity(0),
+            filter2.operatorIdentifier,
+            PortIdentity(0)
+          ),
+          LogicalLink(
+            filter2.operatorIdentifier,
+            PortIdentity(0),
+            limit2.operatorIdentifier,
+            PortIdentity(0)
+          )
+        ),
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
+    )
+
+    assert(result.physicalPlan.isDefined)
+    assert(result.operatorIdToError.isEmpty, s"unexpected errors: 
${result.operatorIdToError}")
+    // Projection narrowed [Region, Country, ..., Total Profit] down to two
+    // columns; every downstream op should see exactly those two attributes.
+    val filter2Schemas = 
result.operatorIdToOutputSchemas(filter2.operatorIdentifier)
+    val outputAttrs = filter2Schemas(PortIdentity(0)).get.attributes
+    assert(
+      outputAttrs == List(
+        new Attribute("Region", AttributeType.STRING),
+        new Attribute("Total Profit", AttributeType.DOUBLE)
+      ),
+      s"projected schema should reach filter2 unchanged, got $outputAttrs"
+    )
+  }
+
+  // -------------------- lenient-mode error accumulation --------------------
+
+  // The frontend relies on `compile` *never throwing*: a user mid-edit
+  // routinely produces an inconsistent plan and the editing UI must render
+  // structured per-operator errors. These tests pin the contract.
+
+  "WorkflowCompiler" should "accumulate, not throw, when a scan source has no 
fileName" in {
+    val orphan = csvOpNoFile()
+
+    val result = new WorkflowCompiler(newContext()).compile(
+      LogicalPlanPojo(
+        operators = List(orphan),
+        links = List.empty,
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
+    )
+
+    assert(result.physicalPlan.isEmpty, "any error must clear the physical 
plan")
+    val err = result.operatorIdToError(orphan.operatorIdentifier)
+    assert(err.`type` == COMPILATION_ERROR)
+    assert(err.operatorId == orphan.operatorIdentifier.id)
+    assert(err.message.contains("no input file name"), s"unexpected message: 
${err.message}")
+    assert(err.details.nonEmpty, "stack-trace details should be populated for 
UI display")
+  }
+
+  it should "accumulate when a scan source's fileName points to a non-existent 
path" in {
+    val broken = csvOp("/does/not/exist/missing.csv")
+
+    val result = new WorkflowCompiler(newContext()).compile(
+      LogicalPlanPojo(
+        operators = List(broken),
+        links = List.empty,
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
+    )
+
+    assert(result.physicalPlan.isEmpty)
+    assert(result.operatorIdToError.contains(broken.operatorIdentifier))
+    // FileResolver.resolve falls through both resolvers and rethrows
+    // org.apache.commons.vfs2.FileNotFoundException(fileName); its message 
bundle
+    // renders as `Could not read from "<path>" because it is not a file.`, so 
the
+    // only stable substring across that wording and any 
java.io.FileNotFoundException
+    // fallback is the bad path itself.
+    assert(
+      
result.operatorIdToError(broken.operatorIdentifier).message.contains("missing.csv"),
+      s"unexpected message: 
${result.operatorIdToError(broken.operatorIdentifier).message}"
+    )
+  }
+
+  it should "accumulate a per-operator error when projection references a 
non-existent attribute" in {
+    val csv = csvOp(realCsvPath)
+    val badProjection = projectOp(List("DoesNotExist"))
+
+    val result = new WorkflowCompiler(newContext()).compile(
+      LogicalPlanPojo(
+        operators = List(csv, badProjection),
+        links = List(
+          LogicalLink(
+            csv.operatorIdentifier,
+            PortIdentity(0),
+            badProjection.operatorIdentifier,
+            PortIdentity(0)
+          )
+        ),
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
+    )
+
+    assert(result.physicalPlan.isEmpty)
+    assert(
+      result.operatorIdToError.contains(badProjection.operatorIdentifier),
+      s"projection should be in errors, got ${result.operatorIdToError.keySet}"
+    )
+    // The upstream csv ran fine, so its output schema should still be present
+    // — partial progress is the whole point of lenient mode.
+    assert(
+      result.operatorIdToOutputSchemas.contains(csv.operatorIdentifier),
+      "upstream csv's schemas should be retained even when downstream fails"
+    )
+  }
+
+  it should "not throw when given an empty plan" in {
+    val result = new WorkflowCompiler(newContext()).compile(
+      LogicalPlanPojo(
+        operators = List.empty,
+        links = List.empty,
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
+    )
+    assert(result.operatorIdToError.isEmpty)
+    assert(result.operatorIdToOutputSchemas.isEmpty)
+    assert(result.physicalPlan.isDefined, "an empty plan compiles to an empty 
physical plan")
+    assert(result.physicalPlan.get.operators.isEmpty)
+    assert(result.physicalPlan.get.links.isEmpty)
+  }
+
+  // -------------------- multi-error accumulation --------------------
+
+  // Re-anchor the subject after the sub-section.
+  "WorkflowCompiler" should "accumulate errors for multiple unrelated failing 
ops in one compile" in {
+    val orphan1 = csvOpNoFile()
+    val orphan2 = csvOpNoFile()
+
+    val result = new WorkflowCompiler(newContext()).compile(
+      LogicalPlanPojo(
+        operators = List(orphan1, orphan2),
+        links = List.empty,
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
+    )
+
+    assert(result.physicalPlan.isEmpty)
+    // Both ops must appear in the error map — the frontend renders per-op
+    // diagnostics in parallel, so swallowing all-but-one would silently break
+    // multi-error workflows.
+    assert(
+      result.operatorIdToError.contains(orphan1.operatorIdentifier) &&
+        result.operatorIdToError.contains(orphan2.operatorIdentifier),
+      s"expected both csvs in errors, got ${result.operatorIdToError.keySet}"
+    )
+  }
+}
diff --git 
a/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala
 
b/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala
index 87246fd7f3..fd99c3d27d 100644
--- 
a/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala
+++ 
b/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala
@@ -24,23 +24,25 @@ import io.dropwizard.testing.junit5.ResourceExtension
 import jakarta.ws.rs.client.Entity
 import jakarta.ws.rs.core.{MediaType, Response}
 import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo}
-import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
 import org.apache.texera.amber.core.workflow.PortIdentity
-import org.apache.texera.amber.operator.filter.{
-  ComparisonType,
-  FilterOpDesc,
-  FilterPredicate,
-  SpecializedFilterOpDesc
-}
-import org.apache.texera.amber.operator.limit.LimitOpDesc
 import org.apache.texera.amber.operator.projection.{AttributeUnit, 
ProjectionOpDesc}
 import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
 import org.apache.texera.amber.util.JSONUtils.objectMapper
-import org.apache.texera.amber.util.serde.PortIdentityKeySerializer
 import org.assertj.core.api.Assertions.assertThat
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.flatspec.AnyFlatSpec
 
+/**
+  * Resource-layer tests for `/compile`. Owns only what the REST envelope
+  * adds on top of compilation itself: HTTP status, the
+  * `@JsonTypeInfo` discriminator that lets the frontend route success vs
+  * failure, and the JSON shape the resource expects on the wire.
+  *
+  * All compiler-behavior assertions (schema propagation, lenient-mode
+  * error accumulation, multi-op chains) live in
+  * `org.apache.texera.amber.compiler.WorkflowCompilerSpec` so a regression
+  * lands on the right spec.
+  */
 class WorkflowCompilationResourceSpec extends AnyFlatSpec with 
BeforeAndAfterAll {
 
   private val resources: ResourceExtension = ResourceExtension
@@ -48,193 +50,117 @@ class WorkflowCompilationResourceSpec extends AnyFlatSpec 
with BeforeAndAfterAll
     .addResource(new WorkflowCompilationResource())
     .setMapper(objectMapper)
     .build()
-  override protected def beforeAll(): Unit = {
-    resources.before()
-  }
 
-  override protected def afterAll(): Unit = {
-    resources.after()
-  }
+  override protected def beforeAll(): Unit = resources.before()
+  override protected def afterAll(): Unit = resources.after()
 
-  // utility function to create a csv scan op
-  private def getCsvScanOpDesc(
-      fileName: String,
-      header: Boolean
-  ): CSVScanSourceOpDesc = {
-    val csvOp = new CSVScanSourceOpDesc()
-    csvOp.fileName = Some(fileName)
-    csvOp.customDelimiter = Some(",")
-    csvOp.hasHeader = header
-    csvOp
-  }
+  private val realCsvPath =
+    "workflow-compiling-service/src/test/resources/country_sales_small.csv"
 
-  // utility function to create a projection op
-  private def getProjectionOpDesc(
-      attributeNames: List[String],
-      isDrop: Boolean = false
-  ): ProjectionOpDesc = {
-    val projectionOpDesc = new ProjectionOpDesc()
-    projectionOpDesc.attributes = attributeNames.map(name => new 
AttributeUnit(name, ""))
-    projectionOpDesc.isDrop = isDrop
-    projectionOpDesc
+  private def csvOp(fileName: String): CSVScanSourceOpDesc = {
+    val op = new CSVScanSourceOpDesc()
+    op.fileName = Some(fileName)
+    op.customDelimiter = Some(",")
+    op.hasHeader = true
+    op
   }
 
-  // utility function to create a limit op
-  private def getLimitOpDesc(
-      limit: Int
-  ): LimitOpDesc = {
-    val limitOpDesc = new LimitOpDesc
-    limitOpDesc.limit = limit
-    limitOpDesc
+  private def projectOp(columns: List[String]): ProjectionOpDesc = {
+    val op = new ProjectionOpDesc()
+    op.attributes = columns.map(name => new AttributeUnit(name, ""))
+    op.isDrop = false
+    op
   }
 
-  // utility function to create a filter op
-  private def getFilterOpDesc(
-      filterPredicates: List[FilterPredicate]
-  ): FilterOpDesc = {
-    val filterOpDesc = new SpecializedFilterOpDesc
-    filterOpDesc.predicates = filterPredicates
-    filterOpDesc
-  }
-
-  // utility function to transform a logical plan pojo to json that can be 
deserialized correctly by the compile endpoint
-  private def transformLogicalPlanPojoToJsonString(logicalPlanPojo: 
LogicalPlanPojo): String = {
-    val jsonNode = objectMapper.valueToTree[ObjectNode](logicalPlanPojo)
-
-    // iterate over the "links" array and replace nested "id" fields
+  // The frontend serializes LogicalLink with `fromOpId` / `toOpId` as flat
+  // strings, but the Scala case class stores them as nested `OperatorIdentity`
+  // records. This helper mirrors the wire shape so the test exercises the
+  // resource's actual JSON contract instead of a Scala-only round trip.
+  private def encodePojoAsFrontendJson(pojo: LogicalPlanPojo): String = {
+    val jsonNode = objectMapper.valueToTree[ObjectNode](pojo)
     val linksArray = jsonNode.withArray("links")
     linksArray.forEach { linkNode =>
-      // replace "fromOpId" with its "id" field value
       val fromOpIdNode = linkNode.get("fromOpId")
       linkNode.asInstanceOf[ObjectNode].put("fromOpId", 
fromOpIdNode.get("id").asText())
-
-      // replace "toOpId" with its "id" field value if it exists
       val toOpIdNode = linkNode.get("toOpId")
       linkNode.asInstanceOf[ObjectNode].put("toOpId", 
toOpIdNode.get("id").asText())
     }
-
-    // convert the modified JSON node back to a string
     objectMapper.writeValueAsString(jsonNode)
   }
 
-  // utility function for asserting the successful compilation
-  private def assertSuccessfulCompilation(response: Response): 
WorkflowCompilationSuccess = {
-    val responseBody = response.readEntity(classOf[String])
-    val compilationResponse =
-      objectMapper.readValue(responseBody, 
classOf[WorkflowCompilationResponse])
-
-    assertThat(compilationResponse.asInstanceOf[WorkflowCompilationSuccess])
-    compilationResponse.asInstanceOf[WorkflowCompilationSuccess]
+  private def postCompile(pojo: LogicalPlanPojo): Response =
+    resources
+      .target("/compile")
+      .request(MediaType.APPLICATION_JSON)
+      .post(Entity.json(encodePojoAsFrontendJson(pojo)))
+
+  "POST /compile" should "return HTTP 200 for a well-formed plan" in {
+    val csv = csvOp(realCsvPath)
+    val proj = projectOp(List("Region", "Total Profit"))
+    val response = postCompile(
+      LogicalPlanPojo(
+        operators = List(csv, proj),
+        links = List(
+          LogicalLink(
+            csv.operatorIdentifier,
+            PortIdentity(0),
+            proj.operatorIdentifier,
+            PortIdentity(0)
+          )
+        ),
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
+    )
+    assertThat(response.getStatus).isEqualTo(200)
   }
 
-  it should "compile workflow successfully with multiple filter and limit 
operations" in {
-    // construct the LogicalPlan: CSVScan --> Projection --> Limit --> Filter 
(TotalProfit > 10000) --> Filter (Region != "JPN") --> Limit
-    val localCsvFilePath =
-      "workflow-compiling-service/src/test/resources/country_sales_small.csv"
-    val csvSourceOp = getCsvScanOpDesc(localCsvFilePath, header = true)
-    val projectionOpDesc = getProjectionOpDesc(List("Region", "Total Profit"))
-    val limitOpDesc1 = getLimitOpDesc(10)
-
-    // Create the filter predicate for TotalProfit > 10000
-    val filterPredicate1 = new FilterPredicate("Total Profit", 
ComparisonType.GREATER_THAN, "10000")
-    val filterOpDesc1 = getFilterOpDesc(List(filterPredicate1))
-
-    // Create the filter predicate for Region != "JPN"
-    val filterPredicate2 = new FilterPredicate("Region", 
ComparisonType.NOT_EQUAL_TO, "JPN")
-    val filterOpDesc2 = getFilterOpDesc(List(filterPredicate2))
-
-    // Add a second limit operation
-    val limitOpDesc2 = getLimitOpDesc(5)
-
-    val logicalPlanPojo = LogicalPlanPojo(
-      operators = List(
-        csvSourceOp,
-        projectionOpDesc,
-        limitOpDesc1,
-        filterOpDesc1,
-        filterOpDesc2,
-        limitOpDesc2
-      ),
-      links = List(
-        LogicalLink(
-          csvSourceOp.operatorIdentifier,
-          PortIdentity(0),
-          projectionOpDesc.operatorIdentifier,
-          PortIdentity(0)
+  it should "tag the response body with type=success so the frontend can route 
on the discriminator" in {
+    // The @JsonTypeInfo on WorkflowCompilationResponse writes a `type` field.
+    // Both polymorphic deserialization (round-tripping through the trait) and
+    // a raw-JSON `type == "success"` check need to hold so the Angular client
+    // can branch without depending on Scala class names.
+    val csv = csvOp(realCsvPath)
+    val proj = projectOp(List("Region", "Total Profit"))
+    val response = postCompile(
+      LogicalPlanPojo(
+        operators = List(csv, proj),
+        links = List(
+          LogicalLink(
+            csv.operatorIdentifier,
+            PortIdentity(0),
+            proj.operatorIdentifier,
+            PortIdentity(0)
+          )
         ),
-        LogicalLink(
-          projectionOpDesc.operatorIdentifier,
-          PortIdentity(0),
-          limitOpDesc1.operatorIdentifier,
-          PortIdentity(0)
-        ),
-        LogicalLink(
-          limitOpDesc1.operatorIdentifier,
-          PortIdentity(0),
-          filterOpDesc1.operatorIdentifier,
-          PortIdentity(0)
-        ),
-        LogicalLink(
-          filterOpDesc1.operatorIdentifier,
-          PortIdentity(0),
-          filterOpDesc2.operatorIdentifier,
-          PortIdentity(0)
-        ),
-        LogicalLink(
-          filterOpDesc2.operatorIdentifier,
-          PortIdentity(0),
-          limitOpDesc2.operatorIdentifier,
-          PortIdentity(0)
-        )
-      ),
-      opsToViewResult = List(),
-      opsToReuseResult = List()
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
     )
 
-    // transform the LogicalPlanPojo to a modified JSON string
-    val modifiedLogicalPlanJsonString = 
transformLogicalPlanPojoToJsonString(logicalPlanPojo)
-
-    // send the request to compile endpoint
-    val response = resources
-      .target("/compile")
-      .request(MediaType.APPLICATION_JSON)
-      .post(Entity.json(modifiedLogicalPlanJsonString))
-
-    assertThat(response.getStatus).isEqualTo(200)
-
-    // verify the schema is correctly propagated for the final limit operator
-    val compilationResult = assertSuccessfulCompilation(response)
-    val finalLimitInputSchema =
-      
compilationResult.operatorOutputSchemas(filterOpDesc2.operatorIdentifier.id)
+    val body = response.readEntity(classOf[String])
+    val node = objectMapper.readTree(body)
     assert(
-      finalLimitInputSchema(
-        PortIdentityKeySerializer.portIdToString(PortIdentity(id = 0, internal 
= false))
-      ).get.equals(
-        List(
-          new Attribute("Region", AttributeType.STRING),
-          new Attribute("Total Profit", AttributeType.DOUBLE)
-        )
-      )
+      node.has("type") && node.get("type").asText() == "success",
+      s"expected type:success discriminator, got $body"
     )
+    val parsed = objectMapper.readValue(body, 
classOf[WorkflowCompilationResponse])
+    assert(parsed.isInstanceOf[WorkflowCompilationSuccess])
+    val success = parsed.asInstanceOf[WorkflowCompilationSuccess]
+    assert(success.physicalPlan != null)
+    assert(success.operatorOutputSchemas.nonEmpty)
   }
 
   it should "return WorkflowCompilationFailure (not HTTP 500) when a scan 
source file cannot be resolved" in {
-    val brokenCsv = getCsvScanOpDesc("/does/not/exist/missing.csv", header = 
true)
-
-    val logicalPlanPojo = LogicalPlanPojo(
-      operators = List(brokenCsv),
-      links = List(),
-      opsToViewResult = List(),
-      opsToReuseResult = List()
+    val response = postCompile(
+      LogicalPlanPojo(
+        operators = List(csvOp("/does/not/exist/missing.csv")),
+        links = List.empty,
+        opsToViewResult = List.empty,
+        opsToReuseResult = List.empty
+      )
     )
 
-    val modifiedLogicalPlanJsonString = 
transformLogicalPlanPojoToJsonString(logicalPlanPojo)
-
-    val response = resources
-      .target("/compile")
-      .request(MediaType.APPLICATION_JSON)
-      .post(Entity.json(modifiedLogicalPlanJsonString))
-
     // Must not surface as HTTP 500 — the error must come back as a structured 
failure.
     assertThat(response.getStatus).isEqualTo(200)
 

Reply via email to