This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-source-port by this 
push:
     new 9fb5f06be4 update
9fb5f06be4 is described below

commit 9fb5f06be4ce0f6de5940addad39c63b79a4671e
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 7 16:22:45 2026 -0700

    update
---
 .../org/apache/texera/workflow/LogicalPlan.scala   | 41 ++++++++++++++++++++++
 .../apache/texera/workflow/WorkflowCompiler.scala  |  1 +
 .../texera/amber/compiler/WorkflowCompiler.scala   |  1 +
 .../texera/amber/compiler/model/LogicalPlan.scala  | 34 ++++++++++++++++++
 4 files changed, 77 insertions(+)

diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala 
b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
index 974d17f40a..58a0d0e793 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
@@ -24,6 +24,8 @@ import org.apache.texera.amber.core.storage.FileResolver
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.operator.LogicalOp
 import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
+import 
org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
 import org.apache.texera.web.model.websocket.request.LogicalPlanPojo
 import org.jgrapht.graph.DirectedAcyclicGraph
 import org.jgrapht.util.SupplierUtil
@@ -121,4 +123,43 @@ case class LogicalPlan(
       case _ => // Skip non-ScanSourceOpDesc operators
     }
   }
+
+  def inferInputCSVScanSourceColumns(
+      errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
+  ): Unit = {
+    operators.foreach {
+      case operator @ (csvOp: InputCSVScanSourceOpDesc)
+          if csvOp.columns == null || csvOp.columns.isEmpty =>
+        Try {
+          val upstreamTextInput = getUpstreamLinks(operator.operatorIdentifier)
+            .flatMap(link => operators.find(_.operatorIdentifier == 
link.fromOpId))
+            .collectFirst { case textInput: TextInputSourceOpDesc => textInput 
}
+            .getOrElse(
+              throw new RuntimeException(
+                "CSV File Scan From Input requires a literal Text Input 
filename to infer columns."
+              )
+            )
+
+          val fileName = upstreamTextInput.textInput
+            .linesIterator
+            .find(_.trim.nonEmpty)
+            .map(_.trim)
+            .getOrElse(throw new RuntimeException("No input file name"))
+          csvOp.inferColumnsFromFileName(fileName)
+        } match {
+          case Success(_) =>
+
+          case Failure(err) =>
+            logger.error("Error inferring columns for 
InputCSVScanSourceOpDesc", err)
+            errorList match {
+              case Some(errList) =>
+                errList.append((operator.operatorIdentifier, err))
+              case None =>
+                throw err
+            }
+        }
+
+      case _ => // Skip operators that do not need CSV column inference
+    }
+  }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala 
b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
index b93aa3e4db..4bccd467b4 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/WorkflowCompiler.scala
@@ -146,6 +146,7 @@ class WorkflowCompiler(
 
     // 2. resolve the file name in each scan source operator
     logicalPlan.resolveScanSourceOpFileName(None)
+    logicalPlan.inferInputCSVScanSourceColumns(None)
 
     // 3. expand the logical plan to the physical plan, and get a set of 
output ports that need storage
     val (physicalPlan, outputPortsNeedingStorage) =
diff --git 
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/WorkflowCompiler.scala
 
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/WorkflowCompiler.scala
index 25166e7ac5..93ea1da79e 100644
--- 
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/WorkflowCompiler.scala
+++ 
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/WorkflowCompiler.scala
@@ -209,6 +209,7 @@ class WorkflowCompiler(
 
     // 2. resolve the file name in each scan source operator
     logicalPlan.resolveScanSourceOpFileName(Some(errorList))
+    logicalPlan.inferInputCSVScanSourceColumns(Some(errorList))
 
     // 3. expand the logical plan to the physical plan
     val physicalPlan = expandLogicalPlan(logicalPlan, Some(errorList))
diff --git 
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
 
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
index eecb435cc8..ca3522dd05 100644
--- 
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
+++ 
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
@@ -25,6 +25,8 @@ import 
org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.PortIdentity
 import org.apache.texera.amber.operator.LogicalOp
 import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
+import 
org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
 import org.jgrapht.graph.DirectedAcyclicGraph
 import org.jgrapht.util.SupplierUtil
 
@@ -131,4 +133,36 @@ case class LogicalPlan(
       case _ => // Skip non-ScanSourceOpDesc operators
     }
   }
+
+  def inferInputCSVScanSourceColumns(
+      errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
+  ): Unit = {
+    operators.foreach {
+      case operator @ (csvOp: InputCSVScanSourceOpDesc)
+          if csvOp.columns == null || csvOp.columns.isEmpty =>
+        Try {
+          val upstreamTextInput = getUpstreamLinks(operator.operatorIdentifier)
+            .flatMap(link => operators.find(_.operatorIdentifier == 
link.fromOpId))
+            .collectFirst { case textInput: TextInputSourceOpDesc => textInput 
}
+            .getOrElse(
+              throw new RuntimeException(
+                "CSV File Scan From Input requires a literal Text Input 
filename to infer columns."
+              )
+            )
+
+          val fileName = upstreamTextInput.textInput
+            .linesIterator
+            .find(_.trim.nonEmpty)
+            .map(_.trim)
+            .getOrElse(throw new RuntimeException("No input file name"))
+          csvOp.inferColumnsFromFileName(fileName)
+        } match {
+          case Success(_) =>
+          case Failure(err) =>
+            logger.error("Error inferring columns for 
InputCSVScanSourceOpDesc", err)
+            errorList.foreach(_.append((operator.operatorIdentifier, err)))
+        }
+      case _ => // Skip operators that do not need CSV column inference
+    }
+  }
 }

Reply via email to