This is an automated email from the ASF dual-hosted git repository.
mengw15 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 8560cca80b fix(operator): guard against NoSuchElementException in
ParallelCSVScanSourceOpDesc.getPhysicalOp (#4375)
8560cca80b is described below
commit 8560cca80bfebe0b334b64e4d65193767b07c516
Author: Asish Kumar <[email protected]>
AuthorDate: Thu Apr 16 10:24:08 2026 +0530
fix(operator): guard against NoSuchElementException in
ParallelCSVScanSourceOpDesc.getPhysicalOp (#4375)
### What changes were proposed in this PR?
`ParallelCSVScanSourceOpDesc.getPhysicalOp` called `customDelimiter.get`
without
first checking whether the `Option` is defined. When `customDelimiter`
is `None`
(the field's default), this throws a `NoSuchElementException` before the
fallback
comma delimiter can be applied.
**Before:**
```scala
if (customDelimiter.get.isEmpty) { // throws NoSuchElementException when
None
customDelimiter = Option(",")
}
```
**After:**
```scala
if (customDelimiter.isEmpty || customDelimiter.get.isEmpty) {
customDelimiter = Option(",")
}
```
This brings the parallel variant in line with `CSVScanSourceOpDesc`,
which has
always used the correct two-part guard.
### Any related issues, documentation, discussions?
Closes #4374
### How was this PR tested?
Two new test cases were added to `CSVScanSourceOpDescSpec`:
1. `"use comma as the default delimiter when customDelimiter is not set
for parallel CSV"` — verifies that `getPhysicalOp` does not throw when
`customDelimiter` is `None` and that the default `,` is applied.
2. `"use comma as the default delimiter when customDelimiter is empty
string for parallel CSV"` — same verification for `Some("")`.
The existing parallel-CSV schema-inference tests continue to pass
unchanged.
### Was this PR authored or co-authored using generative AI tooling?
No.
---------
Signed-off-by: Asish Kumar <[email protected]>
---
.../source/scan/csv/CSVScanSourceOpDesc.scala | 2 +-
.../scan/csv/ParallelCSVScanSourceOpDesc.scala | 2 +-
.../source/scan/csv/CSVScanSourceOpDescSpec.scala | 32 ++++++++++++++++++++++
3 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
index 9879835e76..a44e2765d5 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
@@ -55,7 +55,7 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc {
executionId: ExecutionIdentity
): PhysicalOp = {
// fill in default values
- if (customDelimiter.isEmpty || customDelimiter.get.isEmpty) {
+ if (customDelimiter.forall(_.isEmpty)) {
customDelimiter = Option(",")
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
index b159e59c3e..6ac2d20af4 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
@@ -56,7 +56,7 @@ class ParallelCSVScanSourceOpDesc extends ScanSourceOpDesc {
executionId: ExecutionIdentity
): PhysicalOp = {
// fill in default values
- if (customDelimiter.get.isEmpty) {
+ if (customDelimiter.forall(_.isEmpty)) {
customDelimiter = Option(",")
}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
index 96161672ba..eb174e0691 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
@@ -128,4 +128,36 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with
BeforeAndAfter {
)
}
+ it should "use comma as the default delimiter when customDelimiter is not
set for parallel CSV" in {
+ parallelCsvScanSourceOpDesc.customDelimiter = None
+
+ parallelCsvScanSourceOpDesc.getPhysicalOp(DEFAULT_WORKFLOW_ID,
DEFAULT_EXECUTION_ID)
+
+ assert(parallelCsvScanSourceOpDesc.customDelimiter.contains(","))
+ }
+
+ it should "use comma as the default delimiter when customDelimiter is empty
string for parallel CSV" in {
+ parallelCsvScanSourceOpDesc.customDelimiter = Some("")
+
+ parallelCsvScanSourceOpDesc.getPhysicalOp(DEFAULT_WORKFLOW_ID,
DEFAULT_EXECUTION_ID)
+
+ assert(parallelCsvScanSourceOpDesc.customDelimiter.contains(","))
+ }
+
+ it should "use comma as the default delimiter when customDelimiter is not
set for CSV" in {
+ csvScanSourceOpDesc.customDelimiter = None
+
+ csvScanSourceOpDesc.getPhysicalOp(DEFAULT_WORKFLOW_ID,
DEFAULT_EXECUTION_ID)
+
+ assert(csvScanSourceOpDesc.customDelimiter.contains(","))
+ }
+
+ it should "use comma as the default delimiter when customDelimiter is empty
string for CSV" in {
+ csvScanSourceOpDesc.customDelimiter = Some("")
+
+ csvScanSourceOpDesc.getPhysicalOp(DEFAULT_WORKFLOW_ID,
DEFAULT_EXECUTION_ID)
+
+ assert(csvScanSourceOpDesc.customDelimiter.contains(","))
+ }
+
}