This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-source-port by this 
push:
     new 8cf2f37184 update
8cf2f37184 is described below

commit 8cf2f37184840dab7d9016205f76c016baec8d06
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 7 01:11:47 2026 -0700

    update
---
 .../org/apache/texera/workflow/LogicalPlan.scala   |   6 ---
 .../apache/texera/amber/operator/LogicalOp.scala   |   2 +-
 .../source/scan/InputFileScanSourceOpDesc.scala    |  60 ++++++++++++++++++---
 .../src/assets/operator_images/InputFileScan.png   | Bin 0 -> 22499 bytes
 .../texera/amber/compiler/model/LogicalPlan.scala  |   6 ---
 5 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala 
b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
index d04a9efda3..974d17f40a 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
@@ -97,12 +97,6 @@ case class LogicalPlan(
   ): Unit = {
     operators.foreach {
       case operator @ (scanOp: ScanSourceOpDesc) =>
-        val hasFilenameInputLink = links.exists(link =>
-          link.toOpId == operator.operatorIdentifier && link.toPortId == 
scanOp.operatorInfo.inputPorts.headOption.map(_.id).getOrElse(null)
-        )
-        if (hasFilenameInputLink && scanOp.fileName.isEmpty) {
-          ()
-        } else
         Try {
           // Resolve file path for ScanSourceOpDesc
           val fileName = scanOp.fileName.getOrElse(throw new 
RuntimeException("no input file name"))
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 851e0c3d57..0cd7c0c143 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -164,7 +164,7 @@ trait StateTransferFunc
     // new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = 
"ParallelCSVFileScan"),
     new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
     new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
-    new Type(value = classOf[InputFileScanSourceOpDesc], name = "FileScan"),
+    new Type(value = classOf[InputFileScanSourceOpDesc], name = 
"InputFileScan"),
     new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
     new Type(
       value = classOf[TwitterFullArchiveSearchSourceOpDesc],
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
index 70fb5f8a47..4e7c88b000 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
@@ -19,28 +19,74 @@
 
 package org.apache.texera.amber.operator.source.scan
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.{
+  JsonSchemaInject,
+  JsonSchemaString,
+  JsonSchemaTitle
+}
 import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
 import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
-import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, 
PhysicalOp}
+import org.apache.texera.amber.core.workflow.{
+  InputPort,
+  OutputPort,
+  PhysicalOp,
+  SchemaPropagationFunc
+}
+import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
 import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
-@JsonIgnoreProperties(value = Array("fileName"))
-class InputFileScanSourceOpDesc extends FileScanSourceOpDesc {
+class InputFileScanSourceOpDesc extends SourceOperatorDescriptor with 
TextSourceOpDesc {
+  @JsonProperty(defaultValue = "UTF_8", required = true)
+  @JsonSchemaTitle("Encoding")
+  var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
+
+  @JsonProperty(defaultValue = "false")
+  @JsonSchemaTitle("Extract")
+  val extract: Boolean = false
+
+  @JsonProperty(defaultValue = "false")
+  @JsonSchemaTitle("Include Filename")
+  @JsonSchemaInject(
+    strings = Array(
+      new JsonSchemaString(path = HideAnnotation.hideTarget, value = 
"extract"),
+      new JsonSchemaString(path = HideAnnotation.hideType, value = 
HideAnnotation.Type.equals),
+      new JsonSchemaString(path = HideAnnotation.hideExpectedValue, value = 
"false")
+    )
+  )
+  val outputFileName: Boolean = false
 
   override def getPhysicalOp(
       workflowId: WorkflowIdentity,
       executionId: ExecutionIdentity
   ): PhysicalOp = {
-    super
-      .getPhysicalOp(workflowId, executionId)
-      .copy(opExecInitInfo =
+    PhysicalOp
+      .sourcePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
         OpExecWithClassName(
           
"org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpExec",
           objectMapper.writeValueAsString(this)
         )
       )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withPropagateSchema(
+        SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> 
sourceSchema()))
+      )
+  }
+
+  override def sourceSchema(): Schema = {
+    var schema = Schema()
+    if (outputFileName) {
+      schema = schema.add("filename", AttributeType.STRING)
+    }
+    schema.add(attributeName, attributeType.getType)
   }
 
   override def operatorInfo: OperatorInfo =
diff --git a/frontend/src/assets/operator_images/InputFileScan.png 
b/frontend/src/assets/operator_images/InputFileScan.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and 
b/frontend/src/assets/operator_images/InputFileScan.png differ
diff --git 
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
 
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
index 70827d1343..eecb435cc8 100644
--- 
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
+++ 
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
@@ -115,12 +115,6 @@ case class LogicalPlan(
   ): Unit = {
     operators.foreach {
       case operator @ (scanOp: ScanSourceOpDesc) =>
-        val hasFilenameInputLink = links.exists(link =>
-          link.toOpId == operator.operatorIdentifier && link.toPortId == 
scanOp.operatorInfo.inputPorts.headOption.map(_.id).getOrElse(null)
-        )
-        if (hasFilenameInputLink && scanOp.fileName.isEmpty) {
-          ()
-        } else
         Try {
           // Resolve file path for ScanSourceOpDesc
           val fileName = scanOp.fileName.getOrElse(throw new 
RuntimeException("no input file name"))

Reply via email to