This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cf0e7b751 test(amber): add unit tests for SleepOpDesc, 
IntersectOpDesc, URLFetcherOpDesc (#4816)
5cf0e7b751 is described below

commit 5cf0e7b751353ac66905f6af9147cdfd368e021f
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 12:00:51 2026 -0700

    test(amber): add unit tests for SleepOpDesc, IntersectOpDesc, 
URLFetcherOpDesc (#4816)
    
    ### What changes were proposed in this PR?
    
    Adds scalatest coverage for three logical-operator descriptors that had
    no dedicated specs:
    
    - `sleep/SleepOpDesc` — operatorInfo (Sleep, Control group, 1 input + 1
    output) and `getPhysicalOp` (non-parallelizable, single-worker,
    SleepOpExec class wired in).
    - `intersect/IntersectOpDesc` — operatorInfo (Intersect, Set group, 2
    input ports with PortIdentity 0/1, blocking output) and `getPhysicalOp`
    (HashPartition required on both inputs, derived HashPartition for any
    input combination).
    - `source/fetcher/URLFetcherOpDesc` — operatorInfo (URL Fetcher, API
    group, source-shaped 0-in/1-out), `sourceSchema` (UTF-8 → STRING,
    RAW_BYTES → ANY, null fallthrough → ANY), and `getPhysicalOp`
    (URLFetcherOpExec class wired in).
    
    ### Any related issues, documentation, discussions?
    
    Closes #4814.
    
    Bug filed separately: `URLFetcherOpDesc.sourceSchema` silently falls
    back to `ANY` when `decodingMethod` is left at its `null` default rather
    than guarding non-null. Pinned in the spec as the current behavior.
    
    Drive-by note (already filed as #4813):
    `source/scan/json/JSONUtil.scala` is a byte-for-byte duplicate of
    `workflow-core/util/JSONUtils.scala::JSONToMap`. Not adding a redundant
    spec for it; its tests live with `JSONUtilsSpec` from #4716.
    
    ### How was this PR tested?
    
    ```
    sbt scalafmtCheckAll
    sbt "WorkflowOperator/testOnly 
org.apache.texera.amber.operator.sleep.SleepOpDescSpec 
org.apache.texera.amber.operator.intersect.IntersectOpDescSpec 
org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDescSpec"
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../operator/intersect/IntersectOpDescSpec.scala   |  82 ++++++++++++++++
 .../amber/operator/sleep/SleepOpDescSpec.scala     |  81 ++++++++++++++++
 .../source/fetcher/URLFetcherOpDescSpec.scala      | 107 +++++++++++++++++++++
 3 files changed, 270 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intersect/IntersectOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intersect/IntersectOpDescSpec.scala
new file mode 100644
index 0000000000..d9be82e0cd
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/intersect/IntersectOpDescSpec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.intersect
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{HashPartition, SinglePartition, 
UnknownPartition}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class IntersectOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "IntersectOpDesc.operatorInfo" should "advertise the user-friendly name and 
Set group" in {
+    val info = (new IntersectOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Intersect"
+    info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP
+    info.operatorDescription should include("intersect")
+  }
+
+  it should "expose two input ports (PortIdentity 0 and 1) and one blocking 
output" in {
+    val info = (new IntersectOpDesc).operatorInfo
+    info.inputPorts should have length 2
+    info.inputPorts.map(_.id.id) shouldBe List(0, 1)
+    info.outputPorts should have length 1
+    info.outputPorts.head.blocking shouldBe true
+  }
+
+  "IntersectOpDesc.getPhysicalOp" should "require HashPartition on both input 
ports" in {
+    val op = new IntersectOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.partitionRequirement shouldBe List(
+      Option(HashPartition()),
+      Option(HashPartition())
+    )
+  }
+
+  it should "always derive HashPartition for the output regardless of input 
partitions" in {
+    // The Intersect set semantics require both inputs to be hash-aligned, so
+    // the derived output partition must remain hash even when the upstream
+    // inputs report differing partition kinds.
+    val op = new IntersectOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.derivePartition(List(SinglePartition(), UnknownPartition())) 
shouldBe HashPartition()
+    physical.derivePartition(
+      List(HashPartition(List("a")), HashPartition(List("b")))
+    ) shouldBe HashPartition()
+  }
+
+  it should "wire the IntersectOpExec class name into the OpExecInitInfo" in {
+    // Pattern-match on OpExecWithClassName instead of substring-matching the
+    // toString output, which is brittle to scalapb formatting changes.
+    val op = new IntersectOpDesc
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe 
"org.apache.texera.amber.operator.intersect.IntersectOpExec"
+      case other =>
+        fail(s"expected OpExecWithClassName, got $other")
+    }
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sleep/SleepOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sleep/SleepOpDescSpec.scala
new file mode 100644
index 0000000000..002cf45bca
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sleep/SleepOpDescSpec.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.sleep
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class SleepOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "SleepOpDesc.operatorInfo" should "advertise the user-friendly name and 
Control group" in {
+    val info = (new SleepOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Sleep"
+    info.operatorGroupName shouldBe OperatorGroupConstants.CONTROL_GROUP
+    info.operatorDescription should include("Sleep")
+  }
+
+  it should "expose exactly one input port and one output port" in {
+    val info = (new SleepOpDesc).operatorInfo
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+  }
+
+  "SleepOpDesc.getPhysicalOp" should "produce a non-parallelizable PhysicalOp 
pinned to a single worker" in {
+    // Sleep is non-parallelizable on purpose: tuples must traverse the
+    // sleep path serially so the delay is observable as a back-pressure
+    // signal upstream. The descriptor pins both flags explicitly.
+    val op = new SleepOpDesc
+    op.sleepTime = 5
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.parallelizable shouldBe false
+    physical.suggestedWorkerNum shouldBe Some(1)
+  }
+
+  it should "wire the SleepOpExec class name into the OpExecInitInfo" in {
+    // The descriptor's getPhysicalOp encodes a fully-qualified Exec class
+    // name; pin it so a rename of SleepOpExec breaks this spec deliberately.
+    // Pattern-match on OpExecWithClassName instead of substring-matching the
+    // toString output, which is brittle to scalapb formatting changes.
+    val op = new SleepOpDesc
+    op.sleepTime = 1
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        className shouldBe "org.apache.texera.amber.operator.sleep.SleepOpExec"
+        descString should not be empty
+      case other =>
+        fail(s"expected OpExecWithClassName, got $other")
+    }
+  }
+
+  it should "carry forward the operatorInfo input/output ports onto the 
PhysicalOp" in {
+    val op = new SleepOpDesc
+    op.sleepTime = 1
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.inputPorts.size shouldBe op.operatorInfo.inputPorts.size
+    physical.outputPorts.size shouldBe op.operatorInfo.outputPorts.size
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetcherOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetcherOpDescSpec.scala
new file mode 100644
index 0000000000..3b8c306b73
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetcherOpDescSpec.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.fetcher
+
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.AttributeType
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class URLFetcherOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  private def configured(decoding: DecodingMethod): URLFetcherOpDesc = {
+    val op = new URLFetcherOpDesc
+    op.url = "https://example.test/data";
+    op.decodingMethod = decoding
+    op
+  }
+
+  "URLFetcherOpDesc.operatorInfo" should "advertise the user-friendly name and 
API group" in {
+    val info = (new URLFetcherOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "URL Fetcher"
+    info.operatorGroupName shouldBe OperatorGroupConstants.API_GROUP
+    info.operatorDescription should include("URL")
+  }
+
+  it should "expose no input ports and one output port (source-shaped)" in {
+    val info = (new URLFetcherOpDesc).operatorInfo
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+  }
+
+  "URLFetcherOpDesc.sourceSchema" should "produce a single STRING column when 
decoding is UTF-8" in {
+    val op = configured(DecodingMethod.UTF_8)
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getName shouldBe "URL content"
+    schema.getAttributes.head.getType shouldBe AttributeType.STRING
+  }
+
+  it should "produce an ANY column for raw-bytes decoding" in {
+    val op = configured(DecodingMethod.RAW_BYTES)
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getName shouldBe "URL content"
+    schema.getAttributes.head.getType shouldBe AttributeType.ANY
+  }
+
+  it should "default to ANY when decodingMethod is left unset (current 
behavior)" in {
+    // Pin: `var decodingMethod: DecodingMethod = _` defaults to null.
+    // sourceSchema's branch is `if (decodingMethod == DecodingMethod.UTF_8)
+    // STRING else ANY`, so a null comparison falls through to ANY without
+    // raising. Documenting the current behavior so a future explicit-null
+    // check breaks this spec deliberately.
+    val op = new URLFetcherOpDesc
+    op.url = "https://example.test/data";
+    val schema = op.sourceSchema()
+    schema.getAttributes should have length 1
+    schema.getAttributes.head.getType shouldBe AttributeType.ANY
+  }
+
+  "URLFetcherOpDesc.getPhysicalOp" should "wire the URLFetcherOpExec class 
name into the OpExecInitInfo" in {
+    // Pattern-match on OpExecWithClassName instead of substring-matching the
+    // toString output, which is brittle to scalapb formatting changes.
+    val op = configured(DecodingMethod.UTF_8)
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithClassName(className, _) =>
+        className shouldBe 
"org.apache.texera.amber.operator.source.fetcher.URLFetcherOpExec"
+      case other =>
+        fail(s"expected OpExecWithClassName, got $other")
+    }
+  }
+
+  it should "propagate sourceSchema onto the single output port" in {
+    // Exercise propagateSchema.func directly so the test actually proves the
+    // sourceSchema gets routed to the output port id, not just that an
+    // output port exists. Inputs are empty (this is a source operator).
+    val op = configured(DecodingMethod.UTF_8)
+    val physical = op.getPhysicalOp(workflowId, executionId)
+    val outputPortId = op.operatorInfo.outputPorts.head.id
+    val propagated = physical.propagateSchema.func(Map.empty)
+    propagated should contain key outputPortId
+    propagated(outputPortId) shouldBe op.sourceSchema()
+  }
+}

Reply via email to