This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 72071fffc4 test(workflow-operator): add unit test coverage for SplitOp
and UrlVizOp (descriptor + executor pairs) (#5769)
72071fffc4 is described below
commit 72071fffc4222d4d7e55b0dbdf7903f3a3b18649
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 19 16:11:05 2026 -0700
test(workflow-operator): add unit test coverage for SplitOp and UrlVizOp
(descriptor + executor pairs) (#5769)
### What changes were proposed in this PR?
Pin behavior of two previously-uncovered standalone operators
(descriptor + executor pairs). No production-code changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `SplitOpDescSpec` | `SplitOpDesc` | 8 |
| `SplitOpExecSpec` | `SplitOpExec` | 7 |
| `UrlVizOpDescSpec` | `UrlVizOpDesc` | 7 |
| `UrlVizOpExecSpec` | `UrlVizOpExec` | 6 |
All four spec files follow the `<srcClassName>Spec.scala` one-to-one
convention.
**Behavior pinned — `SplitOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | name `"Split"`, group `UTILITY_GROUP`, one input, two
outputs (PortIdentity 0 = training, PortIdentity 1 = testing) |
| Field defaults | `k = 80`, `random = true`, `seed = 1` |
| `getPhysicalOp` | wires
`OpExecWithClassName("…operator.split.SplitOpExec", <json>)`;
non-parallelizable; payload includes the `k` / `random` / `seed`
wire-keys |
| Schema propagation | propagates the single input schema to every
output port; throws `IllegalArgumentException` unless exactly one input
is supplied |
| Independent instances | `operatorIdentifier` (UUID-seeded) differs
across `new` |
**Behavior pinned — `SplitOpExec`**
| Surface | Contract |
| --- | --- |
| `k = 100` | every tuple emitted on PortIdentity 0 (training) |
| `k = 0` | every tuple emitted on PortIdentity 1 (testing) |
| Deterministic seed | two fresh instances with the same `(k, seed)`
produce identical port sequences over 200 tuples |
| `k = 50` (deterministic seed) | ~50% ratio over 2000 tuples (±150 band
— safely outside binomial 3σ ≈ 67) |
| `close()` | clears the `random` reference to `null` |
| `processTuple` (single-port overload) | throws `NotImplementedError` |
| Malformed descriptor JSON | construction throws
`JsonProcessingException` |
**Behavior pinned — `UrlVizOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | name `"URL Visualizer"`, group
`VISUALIZATION_MEDIA_GROUP` |
| `getPhysicalOp` | wires
`OpExecWithClassName("…operator.visualization.urlviz.UrlVizOpExec",
<json>)` |
| Output schema | propagation function ignores input and emits a single
`html-content` STRING attribute |
| `urlContentAttrName` annotations | `@JsonProperty(required = true)` +
`@AutofillAttributeName` + `@NotNull` (verified via reflection) |
| Class-level `@JsonSchemaInject` | restricts `urlContentAttrName` to
STRING attributes |
| Independent instances | `operatorIdentifier` (UUID-seeded) differs
across `new` |
**Behavior pinned — `UrlVizOpExec`**
| Surface | Contract |
| --- | --- |
| `processTuple` | emits a single `TupleLike` whose only value contains
the generated HTML |
| Generated HTML | `<!DOCTYPE html>` preamble; `<iframe src="…">`
interpolates the input URL; `frameborder="0"` and the `height:100vh;
width:100%; border:none` sizing style |
| Per-tuple cardinality | exactly one emission per `processTuple` call |
| Distinct URLs | interpolated into distinct outputs |
| Malformed descriptor JSON | construction throws
`JsonProcessingException` |
**Test-harness note**
`UrlVizOpDesc` declares `urlContentAttrName: val = ""`; the production
code seeds it via `objectMapper.readValue` and the
`jackson-module-no-ctor-deser` module that bypasses immutable vals. To
test the executor without touching production code, `UrlVizOpExecSpec`
builds the descriptor JSON via Jackson's tree API and injects both the
`operatorType` discriminator (`"URLVisualizer"`, per `LogicalOp`'s
`@JsonSubTypes` table) and the `urlContentAttrName` field.
### Any related issues, documentation, discussions?
Closes #5766.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.split.SplitOpDescSpec
org.apache.texera.amber.operator.split.SplitOpExecSpec
org.apache.texera.amber.operator.visualization.urlviz.UrlVizOpDescSpec
org.apache.texera.amber.operator.visualization.urlviz.UrlVizOpExecSpec"`
— 30 tests, all green
- `sbt scalafmtCheckAll` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7 [1M context])
---
.../amber/operator/split/SplitOpDescSpec.scala | 139 +++++++++++++++++++
.../amber/operator/split/SplitOpExecSpec.scala | 152 +++++++++++++++++++++
.../visualization/urlviz/UrlVizOpDescSpec.scala | 128 +++++++++++++++++
.../visualization/urlviz/UrlVizOpExecSpec.scala | 130 ++++++++++++++++++
4 files changed, 549 insertions(+)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/split/SplitOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/split/SplitOpDescSpec.scala
new file mode 100644
index 0000000000..d1fecdf783
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/split/SplitOpDescSpec.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.split
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class SplitOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ private val schema: Schema =
+ Schema().add(new Attribute("v", AttributeType.INTEGER))
+
+ //
---------------------------------------------------------------------------
+ // operatorInfo
+ //
---------------------------------------------------------------------------
+
+ "SplitOpDesc.operatorInfo" should
+ "advertise the Split user-friendly name and Utility group" in {
+ val info = (new SplitOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Split"
+ info.operatorGroupName shouldBe OperatorGroupConstants.UTILITY_GROUP
+ info.operatorDescription.toLowerCase should include("split")
+ }
+
+ it should "expose one input port and two output ports (PortIdentity 0 and
1)" in {
+ val info = (new SplitOpDesc).operatorInfo
+ info.inputPorts should have length 1
+ info.outputPorts should have length 2
+ info.outputPorts.map(_.id) shouldBe List(PortIdentity(), PortIdentity(1))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Field defaults
+ //
---------------------------------------------------------------------------
+
+ "SplitOpDesc fields" should "default k to 80, random to true, seed to 1" in {
+ val d = new SplitOpDesc
+ d.k shouldBe 80
+ d.random shouldBe true
+ d.seed shouldBe 1
+ }
+
+ //
---------------------------------------------------------------------------
+ // getPhysicalOp — wiring to SplitOpExec + non-parallelizable
+ //
---------------------------------------------------------------------------
+
+ "SplitOpDesc.getPhysicalOp" should
+ "wire the SplitOpExec class name into the OpExecInitInfo" in {
+ val physical = (new SplitOpDesc).getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, _) =>
+ className shouldBe "org.apache.texera.amber.operator.split.SplitOpExec"
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ }
+
+ it should "carry a serialized descriptor JSON in the OpExecInitInfo payload"
in {
+ // The descriptor's `k` / `random` / `seed` must be reachable at the
+ // executor via the serialized payload — pin that the JSON includes
+ // the canonical wire keys.
+ val physical = (new SplitOpDesc).getPhysicalOp(workflowId, executionId)
+ val payload = physical.opExecInitInfo match {
+ case OpExecWithClassName(_, p) => p
+ case other => fail(s"expected OpExecWithClassName,
got $other")
+ }
+ payload should include("\"k\"")
+ payload should include("\"random\"")
+ payload should include("\"seed\"")
+ }
+
+ it should "be non-parallelizable (single worker)" in {
+ // Split's deterministic-seed contract relies on a single
+ // worker — `withParallelizable(false)` is the wiring under test.
+ val physical = (new SplitOpDesc).getPhysicalOp(workflowId, executionId)
+ physical.parallelizable shouldBe false
+ }
+
+ //
---------------------------------------------------------------------------
+ // Schema propagation
+ //
---------------------------------------------------------------------------
+
+ "SplitOpDesc schema propagation" should
+ "propagate the single input schema to every output port" in {
+ val physical = (new SplitOpDesc).getPhysicalOp(workflowId, executionId)
+ val out = physical.propagateSchema.func(Map(PortIdentity() -> schema))
+ val descInfo = (new SplitOpDesc).operatorInfo
+ out.keySet shouldBe descInfo.outputPorts.map(_.id).toSet
+ out.values.toSet shouldBe Set(schema)
+ }
+
+ it should
+ "throw IllegalArgumentException when the input map does not have exactly
one entry" in {
+ val physical = (new SplitOpDesc).getPhysicalOp(workflowId, executionId)
+ intercept[IllegalArgumentException] {
+ physical.propagateSchema.func(Map.empty)
+ }
+ intercept[IllegalArgumentException] {
+ physical.propagateSchema.func(
+ Map(PortIdentity() -> schema, PortIdentity(1) -> schema)
+ )
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Independent instances
+ //
---------------------------------------------------------------------------
+
+ "SplitOpDesc" should
+ "assign a fresh operatorIdentifier per instance (UUID-based id is not
shared)" in {
+ val a = new SplitOpDesc
+ val b = new SplitOpDesc
+ a.operatorIdentifier should not equal b.operatorIdentifier
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/split/SplitOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/split/SplitOpExecSpec.scala
new file mode 100644
index 0000000000..b14021b237
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/split/SplitOpExecSpec.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.split
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class SplitOpExecSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Fixture
+ //
---------------------------------------------------------------------------
+
+ private val attr = new Attribute("v", AttributeType.INTEGER)
+ private val schema: Schema = Schema().add(attr)
+ private def tuple(v: Int): Tuple =
+ Tuple.builder(schema).add(attr, Integer.valueOf(v)).build()
+
+ private def descJson(k: Int, random: Boolean = false, seed: Int = 1): String
= {
+ val d = new SplitOpDesc
+ d.k = k
+ d.random = random
+ d.seed = seed
+ objectMapper.writeValueAsString(d)
+ }
+
+ private def emittedPorts(
+ exec: SplitOpExec,
+ count: Int
+ ): IndexedSeq[Option[PortIdentity]] = {
+ (1 to count).map { i =>
+ val out = exec.processTupleMultiPort(tuple(i), port = 0).toList
+ assert(out.size == 1, s"expected exactly one emission per tuple, got:
$out")
+ out.head._2
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // k = 100 — every tuple to the training port (0)
+ //
---------------------------------------------------------------------------
+
+ "SplitOpExec (k = 100)" should "emit every tuple on the training port
(PortIdentity 0)" in {
+ val exec = new SplitOpExec(descJson(k = 100, seed = 1))
+ exec.open()
+ try {
+ val ports = emittedPorts(exec, 200)
+ assert(ports.forall(_ == Some(PortIdentity(0))), s"expected all 0, got:
${ports.distinct}")
+ } finally exec.close()
+ }
+
+ //
---------------------------------------------------------------------------
+ // k = 0 — every tuple to the testing port (1)
+ //
---------------------------------------------------------------------------
+
+ "SplitOpExec (k = 0)" should "emit every tuple on the testing port
(PortIdentity 1)" in {
+ val exec = new SplitOpExec(descJson(k = 0, seed = 1))
+ exec.open()
+ try {
+ val ports = emittedPorts(exec, 200)
+ assert(ports.forall(_ == Some(PortIdentity(1))), s"expected all 1, got:
${ports.distinct}")
+ } finally exec.close()
+ }
+
+ //
---------------------------------------------------------------------------
+ // Deterministic seed
+ //
---------------------------------------------------------------------------
+
+ "SplitOpExec (deterministic seed)" should
+ "produce identical port sequences across two fresh instances when seed + k
match" in {
+ val a = new SplitOpExec(descJson(k = 50, seed = 7))
+ a.open()
+ val b = new SplitOpExec(descJson(k = 50, seed = 7))
+ b.open()
+ try {
+ val seqA = emittedPorts(a, 200)
+ val seqB = emittedPorts(b, 200)
+ assert(seqA == seqB)
+ } finally {
+ a.close()
+ b.close()
+ }
+ }
+
+ it should "approximate the requested ratio over a large sample (k = 50)" in {
+ // Binomial(2000, 0.5) — 3σ ≈ 67; allow ±150 so the case is not flaky
+ // while still catching gross deviations (e.g. seed being ignored).
+ val exec = new SplitOpExec(descJson(k = 50, seed = 1))
+ exec.open()
+ try {
+ val ports = emittedPorts(exec, 2000)
+ val toTraining = ports.count(_ == Some(PortIdentity(0)))
+ assert(toTraining >= 850 && toTraining <= 1150, s"expected ~1000, got
$toTraining")
+ } finally exec.close()
+ }
+
+ //
---------------------------------------------------------------------------
+ // open / close lifecycle
+ //
---------------------------------------------------------------------------
+
+ "SplitOpExec.close" should "clear the random reference (null-out)" in {
+ val exec = new SplitOpExec(descJson(k = 50, seed = 1))
+ exec.open()
+ assert(exec.random != null)
+ exec.close()
+ assert(exec.random == null)
+ }
+
+ //
---------------------------------------------------------------------------
+ // processTuple (single-port overload) — unsupported
+ //
---------------------------------------------------------------------------
+
+ "SplitOpExec.processTuple" should
+ "throw NotImplementedError (single-port overload is intentionally
unsupported)" in {
+ val exec = new SplitOpExec(descJson(k = 100))
+ exec.open()
+ try {
+ intercept[NotImplementedError] {
+ exec.processTuple(tuple(1), port = 0)
+ }
+ } finally exec.close()
+ }
+
+ //
---------------------------------------------------------------------------
+ // Descriptor parse failure
+ //
---------------------------------------------------------------------------
+
+ "SplitOpExec construction" should
+ "throw on malformed descriptor JSON" in {
+ intercept[com.fasterxml.jackson.core.JsonProcessingException] {
+ new SplitOpExec("{not valid")
+ }
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/visualization/urlviz/UrlVizOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/visualization/urlviz/UrlVizOpDescSpec.scala
new file mode 100644
index 0000000000..2dbbb886d1
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/visualization/urlviz/UrlVizOpDescSpec.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.visualization.urlviz
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import
org.apache.texera.amber.operator.metadata.annotations.AutofillAttributeName
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import javax.validation.constraints.NotNull
+
+class UrlVizOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ //
---------------------------------------------------------------------------
+ // operatorInfo
+ //
---------------------------------------------------------------------------
+
+ "UrlVizOpDesc.operatorInfo" should
+ "advertise the URL Visualizer name and Visualization-Media group" in {
+ val info = (new UrlVizOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "URL Visualizer"
+ info.operatorGroupName shouldBe
OperatorGroupConstants.VISUALIZATION_MEDIA_GROUP
+ info.operatorDescription.toLowerCase should include("url")
+ }
+
+ //
---------------------------------------------------------------------------
+ // getPhysicalOp — wiring + output schema
+ //
---------------------------------------------------------------------------
+
+ "UrlVizOpDesc.getPhysicalOp" should
+ "wire the UrlVizOpExec class name into the OpExecInitInfo" in {
+ val physical = (new UrlVizOpDesc).getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithClassName(className, _) =>
+ className shouldBe
"org.apache.texera.amber.operator.visualization.urlviz.UrlVizOpExec"
+ case other => fail(s"expected OpExecWithClassName, got $other")
+ }
+ }
+
+ it should "produce an output schema with a single `html-content` STRING
attribute" in {
+ val op = new UrlVizOpDesc
+ val physical = op.getPhysicalOp(workflowId, executionId)
+ // The propagation function ignores its input schemas — it always
+ // emits the fixed `html-content: STRING` schema on the (single)
+ // output port.
+ val out = physical.propagateSchema.func(Map.empty)
+ val outputId = op.operatorInfo.outputPorts.head.id
+ out.keySet shouldBe Set(outputId)
+ val schema: Schema = out(outputId)
+ schema.getAttributes should have size 1
+ val attr: Attribute = schema.getAttributes.head
+ attr.getName shouldBe "html-content"
+ attr.getType shouldBe AttributeType.STRING
+ }
+
+ //
---------------------------------------------------------------------------
+ // Field annotations
+ //
---------------------------------------------------------------------------
+
+ "UrlVizOpDesc#urlContentAttrName" should
+ "carry @JsonProperty(required = true)" in {
+ val jp = classOf[UrlVizOpDesc]
+ .getDeclaredField("urlContentAttrName")
+ .getAnnotation(classOf[JsonProperty])
+ jp should not be null
+ jp.required shouldBe true
+ }
+
+ it should "carry @AutofillAttributeName (UI populates the attribute
dropdown)" in {
+ val ann = classOf[UrlVizOpDesc]
+ .getDeclaredField("urlContentAttrName")
+ .getAnnotation(classOf[AutofillAttributeName])
+ ann should not be null
+ }
+
+ it should "carry @NotNull (javax.validation contract)" in {
+ val notNull = classOf[UrlVizOpDesc]
+ .getDeclaredField("urlContentAttrName")
+ .getAnnotation(classOf[NotNull])
+ notNull should not be null
+ }
+
+ "UrlVizOpDesc (class-level)" should
+ "carry @JsonSchemaInject restricting `urlContentAttrName` to STRING
attributes" in {
+ val ann = classOf[UrlVizOpDesc].getAnnotation(classOf[JsonSchemaInject])
+ ann should not be null
+ val payload = ann.json
+ payload should include("attributeTypeRules")
+ payload should include("urlContentAttrName")
+ payload should include("string")
+ }
+
+ //
---------------------------------------------------------------------------
+ // Independent instances
+ //
---------------------------------------------------------------------------
+
+ "UrlVizOpDesc" should
+ "assign a fresh operatorIdentifier per instance (UUID-based id is not
shared)" in {
+ val a = new UrlVizOpDesc
+ val b = new UrlVizOpDesc
+ a.operatorIdentifier should not equal b.operatorIdentifier
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/visualization/urlviz/UrlVizOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/visualization/urlviz/UrlVizOpExecSpec.scala
new file mode 100644
index 0000000000..7563020abe
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/visualization/urlviz/UrlVizOpExecSpec.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.visualization.urlviz
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple, TupleLike}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class UrlVizOpExecSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Fixtures
+ //
---------------------------------------------------------------------------
+
+ private val attr = new Attribute("url", AttributeType.STRING)
+ private val schema: Schema = Schema().add(attr)
+ private def tuple(url: String): Tuple =
+ Tuple.builder(schema).add(attr, url).build()
+
+ /**
+ * Build a descriptor JSON for `UrlVizOpExec`. The production class
+ * declares `urlContentAttrName` as a `val` initialized to `""`; the
+ * Jackson-`module-no-ctor-deser` module bypasses the val and writes
+ * to the underlying field via reflection, so the spec can supply
+ * any attribute name through the JSON wire-key.
+ *
+ * LogicalOp carries `@JsonTypeInfo(property = "operatorType")`, so
+ * the JSON must include the `operatorType` discriminator (value
+ * `"URLVisualizer"` per LogicalOp's `@JsonSubTypes` table).
+ */
+ private def descJson(urlAttr: String): String = {
+ val node = objectMapper.createObjectNode()
+ node.put("operatorType", "URLVisualizer")
+ node.put("urlContentAttrName", urlAttr)
+ node.toString
+ }
+
+ private def runSingle(exec: UrlVizOpExec, t: Tuple): String = {
+ val out = exec.processTuple(t, port = 0).toList
+ assert(out.size == 1, s"expected one emission, got: $out")
+ out.head match {
+ case tupleLike: TupleLike =>
+ val fields = tupleLike.getFields
+ assert(
+ fields.size == 1,
+ s"expected exactly one field on the emitted TupleLike, got: $fields"
+ )
+ val field = fields.head
+ assert(
+ field.isInstanceOf[String],
+ s"expected the field to be a String, got ${field.getClass.getName}:
$field"
+ )
+ field.asInstanceOf[String]
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Generated HTML contents — pinned via canonical substrings
+ //
---------------------------------------------------------------------------
+
+ "UrlVizOpExec.processTuple" should "emit an HTML iframe referencing the
input URL" in {
+ val exec = new UrlVizOpExec(descJson("url"))
+ val html = runSingle(exec, tuple("https://example.invalid/page"))
+ assert(html.contains("<!DOCTYPE html>"))
+ assert(
+ html.contains("<iframe src=\"https://example.invalid/page\""),
+ s"expected iframe src to embed the input URL, got: $html"
+ )
+ }
+
+ it should "include `frameborder=\"0\"` so the iframe renders without a
border" in {
+ val exec = new UrlVizOpExec(descJson("url"))
+ val html = runSingle(exec, tuple("about:blank"))
+ assert(html.contains("frameborder=\"0\""))
+ }
+
+ it should
+ "include the full-viewport sizing style `height:100vh; width:100%;
border:none`" in {
+ val exec = new UrlVizOpExec(descJson("url"))
+ val html = runSingle(exec, tuple("about:blank"))
+ assert(html.contains("height:100vh"))
+ assert(html.contains("width:100%"))
+ assert(html.contains("border:none"))
+ }
+
+ it should "interpolate distinct URLs into distinct outputs" in {
+ val exec = new UrlVizOpExec(descJson("url"))
+ val first = runSingle(exec, tuple("https://example.com/a"))
+ val second = runSingle(exec, tuple("https://example.com/b"))
+ assert(first.contains("https://example.com/a"))
+ assert(second.contains("https://example.com/b"))
+ assert(first != second)
+ }
+
+ it should "produce exactly one emission per input tuple" in {
+ val exec = new UrlVizOpExec(descJson("url"))
+ val iter = exec.processTuple(tuple("https://x.invalid/"), port = 0)
+ assert(iter.hasNext)
+ iter.next()
+ assert(!iter.hasNext)
+ }
+
+ //
---------------------------------------------------------------------------
+ // Descriptor parse failure surfaces during construction
+ //
---------------------------------------------------------------------------
+
+ "UrlVizOpExec construction" should
+ "throw on malformed descriptor JSON" in {
+ intercept[com.fasterxml.jackson.core.JsonProcessingException] {
+ new UrlVizOpExec("{not valid")
+ }
+ }
+}