aglinxinyuan opened a new pull request, #5738:
URL: https://github.com/apache/texera/pull/5738
### What changes were proposed in this PR?
Pin behavior of three previously-uncovered `LogicalOp` descriptors in the
SET / cleaning operator family. Each descriptor wires a physical-op class name
+ port shape + (where applicable) partitioning + schema-propagation contract
through `getPhysicalOp`. No production-code changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `UnionOpDescSpec` | `UnionOpDesc` | 5 |
| `DistinctOpDescSpec` | `DistinctOpDesc` | 7 |
| `DifferenceOpDescSpec` | `DifferenceOpDesc` | 9 |
All three spec files follow the `<srcClassName>Spec.scala` one-to-one
convention. `IntersectOpDescSpec` already exists and gave us the spec-shape
template.
**Behavior pinned — `UnionOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | name `"Union"`, group `SET_GROUP`, description mentions
"Union" |
| Ports | one input, one non-blocking output |
| `getPhysicalOp` | wires
`OpExecWithClassName("…operator.union.UnionOpExec")` |
| Partition requirement | empty (no hash-alignment forced; unlike Distinct /
Difference / Intersect, Union preserves whatever the upstream produced) |
| Independent instances | no static state shared across `new UnionOpDesc` |
**Behavior pinned — `DistinctOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | name `"Distinct"`, group `CLEANING_GROUP`, description
mentions "duplicate" |
| Ports | one input, one **blocking** output |
| `getPhysicalOp` | wires
`OpExecWithClassName("…operator.distinct.DistinctOpExec")`;
`partitionRequirement` is `List(Option(HashPartition()))`; `derivePartition`
always returns `HashPartition` regardless of input partition kind |
**Behavior pinned — `DifferenceOpDesc`**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | name `"Difference"`, group `SET_GROUP`, description
mentions "difference"; two input ports with `displayName` `"left"`
(PortIdentity 0) and `"right"` (PortIdentity 1); one **blocking** output |
| `getPhysicalOp` | wires
`OpExecWithClassName("…operator.difference.DifferenceOpExec")`;
`partitionRequirement` is `List(Option(HashPartition()),
Option(HashPartition()))` (both inputs); `derivePartition` always returns
`HashPartition` |
| Schema propagation | accepts a single shared input schema and produces
that schema on every output port; throws `IllegalArgumentException` when the
two inputs do not share one schema |
### Any related issues, documentation, discussions?
Closes #5734.
### How was this PR tested?
Pure unit-test additions; verified locally with:
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.union.UnionOpDescSpec
org.apache.texera.amber.operator.distinct.DistinctOpDescSpec
org.apache.texera.amber.operator.difference.DifferenceOpDescSpec"` — 21 tests,
all green
- `sbt scalafmtCheckAll` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7 [1M context])
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]