This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-source-port by this
push:
new 8cf2f37184 update
8cf2f37184 is described below
commit 8cf2f37184840dab7d9016205f76c016baec8d06
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 7 01:11:47 2026 -0700
update
---
.../org/apache/texera/workflow/LogicalPlan.scala | 6 ---
.../apache/texera/amber/operator/LogicalOp.scala | 2 +-
.../source/scan/InputFileScanSourceOpDesc.scala | 60 ++++++++++++++++++---
.../src/assets/operator_images/InputFileScan.png | Bin 0 -> 22499 bytes
.../texera/amber/compiler/model/LogicalPlan.scala | 6 ---
5 files changed, 54 insertions(+), 20 deletions(-)
diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
index d04a9efda3..974d17f40a 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
@@ -97,12 +97,6 @@ case class LogicalPlan(
): Unit = {
operators.foreach {
case operator @ (scanOp: ScanSourceOpDesc) =>
- val hasFilenameInputLink = links.exists(link =>
- link.toOpId == operator.operatorIdentifier && link.toPortId ==
scanOp.operatorInfo.inputPorts.headOption.map(_.id).getOrElse(null)
- )
- if (hasFilenameInputLink && scanOp.fileName.isEmpty) {
- ()
- } else
Try {
// Resolve file path for ScanSourceOpDesc
val fileName = scanOp.fileName.getOrElse(throw new
RuntimeException("no input file name"))
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 851e0c3d57..0cd7c0c143 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -164,7 +164,7 @@ trait StateTransferFunc
// new Type(value = classOf[ParallelCSVScanSourceOpDesc], name =
"ParallelCSVFileScan"),
new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
- new Type(value = classOf[InputFileScanSourceOpDesc], name = "FileScan"),
+ new Type(value = classOf[InputFileScanSourceOpDesc], name =
"InputFileScan"),
new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
new Type(
value = classOf[TwitterFullArchiveSearchSourceOpDesc],
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
index 70fb5f8a47..4e7c88b000 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
@@ -19,28 +19,74 @@
package org.apache.texera.amber.operator.source.scan
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.{
+ JsonSchemaInject,
+ JsonSchemaString,
+ JsonSchemaTitle
+}
import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
-import org.apache.texera.amber.core.workflow.{InputPort, OutputPort,
PhysicalOp}
+import org.apache.texera.amber.core.workflow.{
+ InputPort,
+ OutputPort,
+ PhysicalOp,
+ SchemaPropagationFunc
+}
+import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
import org.apache.texera.amber.util.JSONUtils.objectMapper
-@JsonIgnoreProperties(value = Array("fileName"))
-class InputFileScanSourceOpDesc extends FileScanSourceOpDesc {
+class InputFileScanSourceOpDesc extends SourceOperatorDescriptor with
TextSourceOpDesc {
+ @JsonProperty(defaultValue = "UTF_8", required = true)
+ @JsonSchemaTitle("Encoding")
+ var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
+
+ @JsonProperty(defaultValue = "false")
+ @JsonSchemaTitle("Extract")
+ val extract: Boolean = false
+
+ @JsonProperty(defaultValue = "false")
+ @JsonSchemaTitle("Include Filename")
+ @JsonSchemaInject(
+ strings = Array(
+ new JsonSchemaString(path = HideAnnotation.hideTarget, value =
"extract"),
+ new JsonSchemaString(path = HideAnnotation.hideType, value =
HideAnnotation.Type.equals),
+ new JsonSchemaString(path = HideAnnotation.hideExpectedValue, value =
"false")
+ )
+ )
+ val outputFileName: Boolean = false
override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
- super
- .getPhysicalOp(workflowId, executionId)
- .copy(opExecInitInfo =
+ PhysicalOp
+ .sourcePhysicalOp(
+ workflowId,
+ executionId,
+ operatorIdentifier,
OpExecWithClassName(
"org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpExec",
objectMapper.writeValueAsString(this)
)
)
+ .withInputPorts(operatorInfo.inputPorts)
+ .withOutputPorts(operatorInfo.outputPorts)
+ .withPropagateSchema(
+ SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id ->
sourceSchema()))
+ )
+ }
+
+ override def sourceSchema(): Schema = {
+ var schema = Schema()
+ if (outputFileName) {
+ schema = schema.add("filename", AttributeType.STRING)
+ }
+ schema.add(attributeName, attributeType.getType)
}
override def operatorInfo: OperatorInfo =
diff --git a/frontend/src/assets/operator_images/InputFileScan.png
b/frontend/src/assets/operator_images/InputFileScan.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and
b/frontend/src/assets/operator_images/InputFileScan.png differ
diff --git
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
index 70827d1343..eecb435cc8 100644
---
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
+++
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
@@ -115,12 +115,6 @@ case class LogicalPlan(
): Unit = {
operators.foreach {
case operator @ (scanOp: ScanSourceOpDesc) =>
- val hasFilenameInputLink = links.exists(link =>
- link.toOpId == operator.operatorIdentifier && link.toPortId ==
scanOp.operatorInfo.inputPorts.headOption.map(_.id).getOrElse(null)
- )
- if (hasFilenameInputLink && scanOp.fileName.isEmpty) {
- ()
- } else
Try {
// Resolve file path for ScanSourceOpDesc
val fileName = scanOp.fileName.getOrElse(throw new
RuntimeException("no input file name"))