This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-source-port by this
push:
new 9fb5f06be4 update
9fb5f06be4 is described below
commit 9fb5f06be4ce0f6de5940addad39c63b79a4671e
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 7 16:22:45 2026 -0700
update
---
.../org/apache/texera/workflow/LogicalPlan.scala | 41 ++++++++++++++++++++++
.../apache/texera/workflow/WorkflowCompiler.scala | 1 +
.../texera/amber/compiler/WorkflowCompiler.scala | 1 +
.../texera/amber/compiler/model/LogicalPlan.scala | 34 ++++++++++++++++++
4 files changed, 77 insertions(+)
diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
index 974d17f40a..58a0d0e793 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
@@ -24,6 +24,8 @@ import org.apache.texera.amber.core.storage.FileResolver
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
+import
org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
import org.apache.texera.web.model.websocket.request.LogicalPlanPojo
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.util.SupplierUtil
@@ -121,4 +123,43 @@ case class LogicalPlan(
case _ => // Skip non-ScanSourceOpDesc operators
}
}
+
+ def inferInputCSVScanSourceColumns(
+ errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
+ ): Unit = {
+ operators.foreach {
+ case operator @ (csvOp: InputCSVScanSourceOpDesc)
+ if csvOp.columns == null || csvOp.columns.isEmpty =>
+ Try {
+ val upstreamTextInput = getUpstreamLinks(operator.operatorIdentifier)
+ .flatMap(link => operators.find(_.operatorIdentifier ==
link.fromOpId))
+ .collectFirst { case textInput: TextInputSourceOpDesc => textInput
}
+ .getOrElse(
+ throw new RuntimeException(
+ "CSV File Scan From Input requires a literal Text Input
filename to infer columns."
+ )
+ )
+
+ val fileName = upstreamTextInput.textInput
+ .linesIterator
+ .find(_.trim.nonEmpty)
+ .map(_.trim)
+ .getOrElse(throw new RuntimeException("No input file name"))
+ csvOp.inferColumnsFromFileName(fileName)
+ } match {
+ case Success(_) =>
+
+ case Failure(err) =>
+ logger.error("Error inferring columns for
InputCSVScanSourceOpDesc", err)
+ errorList match {
+ case Some(errList) =>
+ errList.append((operator.operatorIdentifier, err))
+ case None =>
+ throw err
+ }
+ }
+
+ case _ => // Skip operators that do not need CSV column inference
+ }
+ }
}
diff --git
a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
index b93aa3e4db..4bccd467b4 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
@@ -146,6 +146,7 @@ class WorkflowCompiler(
// 2. resolve the file name in each scan source operator
logicalPlan.resolveScanSourceOpFileName(None)
+ logicalPlan.inferInputCSVScanSourceColumns(None)
// 3. expand the logical plan to the physical plan, and get a set of
output ports that need storage
val (physicalPlan, outputPortsNeedingStorage) =
diff --git
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/WorkflowCompiler.scala
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/WorkflowCompiler.scala
index 25166e7ac5..93ea1da79e 100644
---
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/WorkflowCompiler.scala
+++
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/WorkflowCompiler.scala
@@ -209,6 +209,7 @@ class WorkflowCompiler(
// 2. resolve the file name in each scan source operator
logicalPlan.resolveScanSourceOpFileName(Some(errorList))
+ logicalPlan.inferInputCSVScanSourceColumns(Some(errorList))
// 3. expand the logical plan to the physical plan
val physicalPlan = expandLogicalPlan(logicalPlan, Some(errorList))
diff --git
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
index eecb435cc8..ca3522dd05 100644
---
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
+++
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
@@ -25,6 +25,8 @@ import
org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
+import
org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.util.SupplierUtil
@@ -131,4 +133,36 @@ case class LogicalPlan(
case _ => // Skip non-ScanSourceOpDesc operators
}
}
+
+ def inferInputCSVScanSourceColumns(
+ errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
+ ): Unit = {
+ operators.foreach {
+ case operator @ (csvOp: InputCSVScanSourceOpDesc)
+ if csvOp.columns == null || csvOp.columns.isEmpty =>
+ Try {
+ val upstreamTextInput = getUpstreamLinks(operator.operatorIdentifier)
+ .flatMap(link => operators.find(_.operatorIdentifier ==
link.fromOpId))
+ .collectFirst { case textInput: TextInputSourceOpDesc => textInput
}
+ .getOrElse(
+ throw new RuntimeException(
+ "CSV File Scan From Input requires a literal Text Input
filename to infer columns."
+ )
+ )
+
+ val fileName = upstreamTextInput.textInput
+ .linesIterator
+ .find(_.trim.nonEmpty)
+ .map(_.trim)
+ .getOrElse(throw new RuntimeException("No input file name"))
+ csvOp.inferColumnsFromFileName(fileName)
+ } match {
+ case Success(_) =>
+ case Failure(err) =>
+ logger.error("Error inferring columns for
InputCSVScanSourceOpDesc", err)
+ errorList.foreach(_.append((operator.operatorIdentifier, err)))
+ }
+ case _ => // Skip operators that do not need CSV column inference
+ }
+ }
}