This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-source-port by this 
push:
     new e9aeb64fbd update
e9aeb64fbd is described below

commit e9aeb64fbd4fb1be062d83df53d3524ae3f1130d
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 6 00:07:06 2026 -0700

    update
---
 .../texera/amber/core/workflow/PhysicalOp.scala    | 12 ++---
 .../amber/core/workflow/PhysicalOpSpec.scala       | 49 --------------------
 .../source/scan/FileScanSourceOpExec.scala         | 12 ++---
 .../source/scan/InputFileScanSourceOpDesc.scala    | 17 +++++++
 ...pExec.scala => InputFileScanSourceOpExec.scala} | 12 ++---
 .../source/scan/InputFileSourceOpExec.scala        | 14 +-----
 .../source/scan/arrow/ArrowSourceOpExec.scala      | 19 ++------
 .../source/scan/csv/CSVScanSourceOpExec.scala      | 23 ++--------
 .../scan/csv/ParallelCSVScanSourceOpExec.scala     | 29 ++++--------
 .../scan/csvOld/CSVOldScanSourceOpExec.scala       | 22 ++-------
 .../source/scan/json/JSONLScanSourceOpExec.scala   | 24 +++-------
 .../source/scan/FileScanSourceOpExecSpec.scala     | 42 +----------------
 .../scan/InputFileScanSourceOpDescSpec.scala       | 12 ++++-
 .../scan/text/FileScanSourceOpDescSpec.scala       |  4 --
 frontend/src/app/app-routing.constant.ts           |  1 +
 frontend/src/app/app-routing.module.ts             |  5 ++
 frontend/src/app/app.module.ts                     |  2 +
 .../dashboard/component/dashboard.component.html   | 11 +++++
 .../app/dashboard/component/dashboard.component.ts |  2 +
 .../user-computing-unit.component.html             | 29 ++++++++++++
 .../user-computing-unit.component.scss}            | 42 ++++-------------
 .../user-computing-unit.component.spec.ts}         | 53 +++++++++-------------
 .../user-computing-unit.component.ts}              | 42 +++--------------
 .../input-autocomplete.component.html              | 45 ++++++------------
 .../input-autocomplete.component.scss              |  5 --
 .../input-autocomplete.component.ts                | 36 +--------------
 .../operator-property-edit-frame.component.ts      |  4 --
 .../validation/validation-workflow.service.spec.ts | 21 ---------
 .../validation/validation-workflow.service.ts      | 18 --------
 29 files changed, 176 insertions(+), 431 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
index d28d2a460d..494fd6f76d 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
@@ -431,19 +431,13 @@ case class PhysicalOp(
       }
     }
 
-    // Extract the schemas that are currently available.
+    // Extract input schemas, checking if all are defined
     val inputSchemas = updatedOp.inputPorts.collect {
       case (portId, (_, _, Right(schema))) => portId -> schema
     }
 
-    val readyForPropagation = updatedOp.inputPorts.forall {
-      case (_, (_, links, Right(_))) => true
-      case (_, (_, links, Left(_)))  => links.isEmpty
-    }
-
-    if (readyForPropagation) {
-      // All linked input schemas are available, propagate to output schema.
-      // Unlinked input ports are ignored so optional ports do not block 
output inference.
+    if (updatedOp.inputPorts.size == inputSchemas.size) {
+      // All input schemas are available, propagate to output schema
       val schemaPropagationResult = Try(propagateSchema.func(inputSchemas))
       schemaPropagationResult match {
         case Success(schemaMapping) =>
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PhysicalOpSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PhysicalOpSpec.scala
deleted file mode 100644
index b0d0e074cf..0000000000
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PhysicalOpSpec.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.core.workflow
-
-import org.apache.texera.amber.core.executor.OpExecWithClassName
-import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
-import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
OperatorIdentity, WorkflowIdentity}
-import org.scalatest.flatspec.AnyFlatSpec
-
-class PhysicalOpSpec extends AnyFlatSpec {
-
-  it should "propagate output schema when an input port is unlinked" in {
-    val expectedSchema = Schema().add("filename", AttributeType.STRING)
-    val physicalOp = PhysicalOp
-      .sourcePhysicalOp(
-        WorkflowIdentity(1),
-        ExecutionIdentity(1),
-        OperatorIdentity("scan-source"),
-        OpExecWithClassName("test.OpExec", "{}")
-      )
-      .withInputPorts(List(InputPort(displayName = "Filename")))
-      .withOutputPorts(List(OutputPort()))
-      .withPropagateSchema(
-        SchemaPropagationFunc(_ => Map(PortIdentity(0) -> expectedSchema))
-      )
-
-    val propagated = physicalOp.propagateSchema()
-    val outputSchema = propagated.outputPorts(PortIdentity(0))._3.toOption
-
-    assert(outputSchema.contains(expectedSchema))
-  }
-}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
index 3690b473c7..91c817240c 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
@@ -19,6 +19,7 @@
 
 package org.apache.texera.amber.operator.source.scan
 
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
 import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
@@ -35,18 +36,15 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
 
 class FileScanSourceOpExec private[scan] (
     descString: String
-) extends InputFileSourceOpExec {
-  override protected val desc: FileScanSourceOpDesc =
+) extends SourceOperatorExecutor {
+  private val desc: FileScanSourceOpDesc =
     objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
 
   @throws[IOException]
   override def produceTuple(): Iterator[TupleLike] = {
-    resolvedInputFileNames.iterator.flatMap(produceTuplesForFile)
-  }
-
-  private def produceTuplesForFile(resolvedFileName: String): 
Iterator[TupleLike] = {
     val is: InputStream =
-      DocumentFactory.openReadonlyDocument(new 
URI(resolvedFileName)).asInputStream()
+      DocumentFactory.openReadonlyDocument(new 
URI(desc.fileName.get)).asInputStream()
+
     val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
     var zipIn: ZipArchiveInputStream = null
     var archiveStream: InputStream = null
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
index f94253d0b2..0ec39ce200 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
@@ -20,12 +20,29 @@
 package org.apache.texera.amber.operator.source.scan
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
 import org.apache.texera.amber.core.workflow.{InputPort, OutputPort}
 import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
 
 @JsonIgnoreProperties(value = Array("fileName"))
 class InputFileScanSourceOpDesc extends FileScanSourceOpDesc {
 
+  override def getPhysicalOp(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity
+  ) = {
+    super
+      .getPhysicalOp(workflowId, executionId)
+      .copy(opExecInitInfo =
+        OpExecWithClassName(
+          
"org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpExec",
+          objectMapper.writeValueAsString(this)
+        )
+      )
+  }
+
   override def operatorInfo: OperatorInfo =
     OperatorInfo(
       userFriendlyName = "File Scan From Input",
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
similarity index 94%
copy from 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
copy to 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
index 3690b473c7..4a1514c7f6 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
@@ -33,11 +33,11 @@ import java.net.URI
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.IteratorHasAsScala
 
-class FileScanSourceOpExec private[scan] (
+class InputFileScanSourceOpExec private[scan] (
     descString: String
 ) extends InputFileSourceOpExec {
-  override protected val desc: FileScanSourceOpDesc =
-    objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
+  private val desc: InputFileScanSourceOpDesc =
+    objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
 
   @throws[IOException]
   override def produceTuple(): Iterator[TupleLike] = {
@@ -47,6 +47,7 @@ class FileScanSourceOpExec private[scan] (
   private def produceTuplesForFile(resolvedFileName: String): 
Iterator[TupleLike] = {
     val is: InputStream =
       DocumentFactory.openReadonlyDocument(new 
URI(resolvedFileName)).asInputStream()
+
     val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
     var zipIn: ZipArchiveInputStream = null
     var archiveStream: InputStream = null
@@ -88,7 +89,6 @@ class FileScanSourceOpExec private[scan] (
               case FileAttributeType.SINGLE_STRING =>
                 new String(toByteArray(entry), desc.fileEncoding.getCharset)
               case FileAttributeType.LARGE_BINARY =>
-                // For large binaries, create reference and upload via 
streaming
                 val largeBinary = new LargeBinary()
                 val out = new LargeBinaryOutputStream(largeBinary)
                 try {
@@ -116,12 +116,12 @@ class FileScanSourceOpExec private[scan] (
               desc.fileScanOffset.getOrElse(0),
               desc.fileScanOffset.getOrElse(0) + 
desc.fileScanLimit.getOrElse(Int.MaxValue)
             )
-            .map(line => {
+            .map(line =>
               TupleLike(desc.attributeType match {
                 case FileAttributeType.SINGLE_STRING => line
                 case _                               => parseField(line, 
desc.attributeType.getType)
               })
-            })
+            )
         )
       }
 
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
index c4ce21b47e..cd14cc4235 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
@@ -27,9 +27,6 @@ import org.apache.texera.amber.core.workflow.PortIdentity
 import scala.collection.mutable.ArrayBuffer
 
 trait InputFileSourceOpExec extends SourceOperatorExecutor {
-
-  protected def desc: ScanSourceOpDesc
-
   private val inputFileNames = ArrayBuffer.empty[String]
 
   override def processTupleMultiPort(
@@ -45,18 +42,11 @@ trait InputFileSourceOpExec extends SourceOperatorExecutor {
   }
 
   protected def resolvedInputFileNames: Seq[String] = {
-    val fileNames =
-      if (inputFileNames.nonEmpty) {
-        inputFileNames.toSeq
-      } else {
-        desc.fileName.toSeq
-      }
-
-    if (fileNames.isEmpty) {
+    if (inputFileNames.isEmpty) {
       throw new IllegalStateException("No input file is available for this 
source operator.")
     }
 
-    fileNames.map(fileName =>
+    inputFileNames.toSeq.map(fileName =>
       if (FileResolver.isFileResolved(fileName)) {
         fileName
       } else {
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala
index d851b5b1c6..f23833206f 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala
@@ -19,9 +19,9 @@
 
 package org.apache.texera.amber.operator.source.scan.arrow
 
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.tuple.TupleLike
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
 import org.apache.texera.amber.util.ArrowUtils
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.apache.arrow.memory.RootAllocator
@@ -33,26 +33,16 @@ import java.nio.file.{Files, StandardOpenOption}
 
 class ArrowSourceOpExec(
     descString: String
-) extends InputFileSourceOpExec {
-  override protected val desc: ArrowSourceOpDesc =
+) extends SourceOperatorExecutor {
+  private val desc: ArrowSourceOpDesc =
     objectMapper.readValue(descString, classOf[ArrowSourceOpDesc])
   private var reader: Option[ArrowFileReader] = None
   private var root: Option[VectorSchemaRoot] = None
   private var allocator: Option[RootAllocator] = None
 
   override def open(): Unit = {
-    if (desc.fileName.isDefined) {
-      initializeIfNeeded()
-    }
-  }
-
-  private def initializeIfNeeded(): Unit = {
-    if (reader.isDefined) {
-      return
-    }
     try {
-      desc.fileName = Some(resolvedInputFileName)
-      val file = DocumentFactory.openReadonlyDocument(new 
URI(resolvedInputFileName)).asFile()
+      val file = DocumentFactory.openReadonlyDocument(new 
URI(desc.fileName.get)).asFile()
       val alloc = new RootAllocator()
       allocator = Some(alloc)
       val channel = Files.newByteChannel(file.toPath, StandardOpenOption.READ)
@@ -68,7 +58,6 @@ class ArrowSourceOpExec(
   }
 
   override def produceTuple(): Iterator[TupleLike] = {
-    initializeIfNeeded()
     val rowIterator = new Iterator[TupleLike] {
       private var currentIndex = 0
       private var currentBatchIndex = 0
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala
index 5707c064b0..c3fbbe9bb5 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala
@@ -20,26 +20,24 @@
 package org.apache.texera.amber.operator.source.scan.csv
 
 import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.tuple.{AttributeTypeUtils, Schema, 
TupleLike}
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
 import java.io.InputStreamReader
 import java.net.URI
 import scala.collection.immutable.ArraySeq
 
-class CSVScanSourceOpExec private[csv] (descString: String) extends 
InputFileSourceOpExec {
-  override protected val desc: CSVScanSourceOpDesc =
-    objectMapper.readValue(descString, classOf[CSVScanSourceOpDesc])
+class CSVScanSourceOpExec private[csv] (descString: String) extends 
SourceOperatorExecutor {
+  val desc: CSVScanSourceOpDesc = objectMapper.readValue(descString, 
classOf[CSVScanSourceOpDesc])
   var inputReader: InputStreamReader = _
   var parser: CsvParser = _
   var nextRow: Array[String] = _
   var numRowGenerated = 0
-  private var schema: Schema = _
+  private val schema: Schema = desc.sourceSchema()
 
   override def produceTuple(): Iterator[TupleLike] = {
-    initializeIfNeeded()
 
     val rowIterator = new Iterator[Array[String]] {
       override def hasNext: Boolean = {
@@ -79,19 +77,8 @@ class CSVScanSourceOpExec private[csv] (descString: String) 
extends InputFileSou
   }
 
   override def open(): Unit = {
-    if (desc.fileName.isDefined) {
-      initializeIfNeeded()
-    }
-  }
-
-  private def initializeIfNeeded(): Unit = {
-    if (parser != null) {
-      return
-    }
-    desc.fileName = Some(resolvedInputFileName)
-    schema = desc.sourceSchema()
     inputReader = new InputStreamReader(
-      DocumentFactory.openReadonlyDocument(new 
URI(resolvedInputFileName)).asInputStream(),
+      DocumentFactory.openReadonlyDocument(new 
URI(desc.fileName.get)).asInputStream(),
       desc.fileEncoding.getCharset
     )
 
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala
index 217cb606ed..1c9377e564 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala
@@ -19,10 +19,10 @@
 
 package org.apache.texera.amber.operator.source.scan.csv
 
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
 import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.tuple.{Attribute, AttributeTypeUtils, 
Schema, TupleLike}
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeTypeUtils, 
TupleLike}
 import org.apache.texera.amber.operator.source.BufferedBlockReader
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.tukaani.xz.SeekableFileInputStream
 
@@ -35,14 +35,13 @@ class ParallelCSVScanSourceOpExec private[csv] (
     descString: String,
     idx: Int = 0,
     workerCount: Int = 1
-) extends InputFileSourceOpExec {
-  override protected val desc: ParallelCSVScanSourceOpDesc =
+) extends SourceOperatorExecutor {
+  val desc: ParallelCSVScanSourceOpDesc =
     objectMapper.readValue(descString, classOf[ParallelCSVScanSourceOpDesc])
   private var reader: BufferedBlockReader = _
-  private var schema: Schema = desc.sourceSchema()
+  private val schema = desc.sourceSchema()
 
-  override def produceTuple(): Iterator[TupleLike] = {
-    initializeIfNeeded()
+  override def produceTuple(): Iterator[TupleLike] =
     new Iterator[TupleLike]() {
       override def hasNext: Boolean = reader.hasNext
 
@@ -87,23 +86,11 @@ class ParallelCSVScanSourceOpExec private[csv] (
       }
 
     }.filter(tuple => tuple != null)
-  }
 
   override def open(): Unit = {
-    if (desc.fileName.isDefined) {
-      initializeIfNeeded()
-    }
-  }
-
-  private def initializeIfNeeded(): Unit = {
-    if (reader != null) {
-      return
-    }
-    desc.fileName = Some(resolvedInputFileName)
-    schema = desc.sourceSchema()
     // here, the stream requires to be seekable, so datasetFileDesc creates a 
temp file here
     // TODO: consider a better way
-    val file = DocumentFactory.openReadonlyDocument(new 
URI(resolvedInputFileName)).asFile()
+    val file = DocumentFactory.openReadonlyDocument(new 
URI(desc.fileName.get)).asFile()
     val totalBytes: Long = file.length()
     // TODO: add support for limit
     // TODO: add support for offset
@@ -126,6 +113,6 @@ class ParallelCSVScanSourceOpExec private[csv] (
     if (startOffset == 0 && desc.hasHeader) reader.readLine
   }
 
-  override def close(): Unit = if (reader != null) reader.close()
+  override def close(): Unit = reader.close()
 
 }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala
index 26df840824..e7b1e938c3 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala
@@ -20,9 +20,9 @@
 package org.apache.texera.amber.operator.source.scan.csvOld
 
 import com.github.tototoshi.csv.{CSVReader, DefaultCSVFormat}
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeTypeUtils, 
Schema, TupleLike}
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
 import java.net.URI
@@ -30,14 +30,13 @@ import scala.collection.compat.immutable.ArraySeq
 
 class CSVOldScanSourceOpExec private[csvOld] (
     descString: String
-) extends InputFileSourceOpExec {
-  override protected val desc: CSVOldScanSourceOpDesc =
+) extends SourceOperatorExecutor {
+  val desc: CSVOldScanSourceOpDesc =
     objectMapper.readValue(descString, classOf[CSVOldScanSourceOpDesc])
   var reader: CSVReader = _
   var rows: Iterator[Seq[String]] = _
-  var schema: Schema = _
+  val schema: Schema = desc.sourceSchema()
   override def produceTuple(): Iterator[TupleLike] = {
-    initializeIfNeeded()
 
     val tuples = rows
       .map(fields =>
@@ -63,21 +62,10 @@ class CSVOldScanSourceOpExec private[csvOld] (
   }
 
   override def open(): Unit = {
-    if (desc.fileName.isDefined) {
-      initializeIfNeeded()
-    }
-  }
-
-  private def initializeIfNeeded(): Unit = {
-    if (reader != null) {
-      return
-    }
-    desc.fileName = Some(resolvedInputFileName)
-    schema = desc.sourceSchema()
     implicit object CustomFormat extends DefaultCSVFormat {
       override val delimiter: Char = desc.customDelimiter.get.charAt(0)
     }
-    val filePath = DocumentFactory.openReadonlyDocument(new 
URI(resolvedInputFileName)).asFile().toPath
+    val filePath = DocumentFactory.openReadonlyDocument(new 
URI(desc.fileName.get)).asFile().toPath
     reader = CSVReader.open(filePath.toString, 
desc.fileEncoding.getCharset.name())(CustomFormat)
     // skip line if this worker reads the start of a file, and the file has a 
header line
     val startOffset = desc.offset.getOrElse(0) + (if (desc.hasHeader) 1 else 0)
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala
index 850a12ebf6..c96278c3e3 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala
@@ -19,10 +19,10 @@
 
 package org.apache.texera.amber.operator.source.scan.json
 
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
 import org.apache.texera.amber.core.tuple.TupleLike
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
 import org.apache.texera.amber.operator.source.scan.json.JSONUtil.JSONToMap
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
@@ -35,15 +35,14 @@ class JSONLScanSourceOpExec private[json] (
     descString: String,
     idx: Int = 0,
     workerCount: Int = 1
-) extends InputFileSourceOpExec {
-  override protected val desc: JSONLScanSourceOpDesc =
+) extends SourceOperatorExecutor {
+  private val desc: JSONLScanSourceOpDesc =
     objectMapper.readValue(descString, classOf[JSONLScanSourceOpDesc])
   private var rows: Iterator[String] = _
   private var reader: BufferedReader = _
-  private var schema = desc.sourceSchema()
+  private val schema = desc.sourceSchema()
 
   override def produceTuple(): Iterator[TupleLike] = {
-    initializeIfNeeded()
     rows.flatMap { line =>
       Try {
         val data = JSONToMap(objectMapper.readTree(line), 
desc.flatten).withDefaultValue(null)
@@ -59,18 +58,7 @@ class JSONLScanSourceOpExec private[json] (
   }
 
   override def open(): Unit = {
-    if (desc.fileName.isDefined) {
-      initializeIfNeeded()
-    }
-  }
-
-  private def initializeIfNeeded(): Unit = {
-    if (reader != null) {
-      return
-    }
-    desc.fileName = Some(resolvedInputFileName)
-    schema = desc.sourceSchema()
-    val stream = DocumentFactory.openReadonlyDocument(new 
URI(resolvedInputFileName)).asInputStream()
+    val stream = DocumentFactory.openReadonlyDocument(new 
URI(desc.fileName.get)).asInputStream()
     // count lines and partition the task to each worker
     reader = new BufferedReader(
       new InputStreamReader(stream, desc.fileEncoding.getCharset)
@@ -89,6 +77,6 @@ class JSONLScanSourceOpExec private[json] (
     rows = it2.iterator.slice(startOffset, endOffset)
   }
 
-  override def close(): Unit = if (reader != null) reader.close()
+  override def close(): Unit = reader.close()
 
 }
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
index 6aeea6f798..ab3383ca0a 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
@@ -19,7 +19,7 @@
 
 package org.apache.texera.amber.operator.source.scan
 
-import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema, 
SchemaEnforceable, Tuple}
+import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema, 
SchemaEnforceable}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.flatspec.AnyFlatSpec
@@ -145,46 +145,6 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with 
BeforeAndAfterAll {
     }
   }
 
-  it should "read filename from connected input when provided" in {
-    val desc = createDescriptor()
-    desc.fileName = None
-    desc.attributeType = FileAttributeType.SINGLE_STRING
-
-    val executor = new 
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
-    val inputTuple = Tuple(Schema().add("filename", AttributeType.STRING), 
Array(testFile.toString))
-
-    executor.open()
-    executor.processTuple(inputTuple, 0)
-    val tuples = executor
-      .produceTuple()
-      .map(tupleLike => 
tupleLike.asInstanceOf[SchemaEnforceable].enforceSchema(desc.sourceSchema()))
-      .toSeq
-    executor.close()
-
-    assert(tuples.size == 1)
-    assert(tuples.head.getField[String]("line") == "Test content\nLine 2\nLine 
3")
-  }
-
-  it should "read filename from connected input through processTupleMultiPort" 
in {
-    val desc = createDescriptor()
-    desc.fileName = None
-    desc.attributeType = FileAttributeType.SINGLE_STRING
-
-    val executor = new 
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
-    val inputTuple = Tuple(Schema().add("filename", AttributeType.STRING), 
Array(testFile.toString))
-
-    executor.open()
-    executor.processTupleMultiPort(inputTuple, 0)
-    val tuples = executor
-      .produceTuple()
-      .map(tupleLike => 
tupleLike.asInstanceOf[SchemaEnforceable].enforceSchema(desc.sourceSchema()))
-      .toSeq
-    executor.close()
-
-    assert(tuples.size == 1)
-    assert(tuples.head.getField[String]("line") == "Test content\nLine 2\nLine 
3")
-  }
-
   // LargeBinary Tests
   it should "create valid LargeBinary with correct URI parsing" in {
     val pointer = new LargeBinary("s3://bucket/path/to/object")
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
index a3b59723cb..e21cc58b36 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
@@ -43,7 +43,7 @@ class InputFileScanSourceOpDescSpec extends AnyFlatSpec {
     val desc = new InputFileScanSourceOpDesc()
     desc.attributeType = FileAttributeType.SINGLE_STRING
 
-    val executor = new 
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
+    val executor = new 
InputFileScanSourceOpExec(objectMapper.writeValueAsString(desc))
     val inputTuple = Tuple(
       Schema().add("filename", AttributeType.STRING),
       Array(FileResolver.resolve(TestOperators.TestTextFilePath).toASCIIString)
@@ -64,4 +64,14 @@ class InputFileScanSourceOpDescSpec extends AnyFlatSpec {
         
.equals("line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\nline10")
     )
   }
+
+  it should "reject execution when no filename input tuple is provided" in {
+    val desc = new InputFileScanSourceOpDesc()
+    desc.attributeType = FileAttributeType.SINGLE_STRING
+
+    val executor = new 
InputFileScanSourceOpExec(objectMapper.writeValueAsString(desc))
+    executor.open()
+    assertThrows[IllegalStateException](executor.produceTuple().toList)
+    executor.close()
+  }
 }
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
index 4a270331e4..8c08686705 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
@@ -49,10 +49,6 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with 
BeforeAndAfter {
     assert(inferredSchema.getAttribute("line").getType == AttributeType.STRING)
   }
 
-  it should "not expose filename input port on file scan source" in {
-    assert(fileScanSourceOpDesc.operatorInfo.inputPorts.isEmpty)
-  }
-
   it should "infer schema with single column representing entire file in 
outputAsSingleTuple mode" in {
     fileScanSourceOpDesc.attributeType = FileAttributeType.SINGLE_STRING
     val inferredSchema: Schema = fileScanSourceOpDesc.sourceSchema()
diff --git a/frontend/src/app/app-routing.constant.ts 
b/frontend/src/app/app-routing.constant.ts
index a22f6439e2..582f6d865b 100644
--- a/frontend/src/app/app-routing.constant.ts
+++ b/frontend/src/app/app-routing.constant.ts
@@ -35,6 +35,7 @@ export const DASHBOARD_USER_WORKSPACE = 
`${DASHBOARD_USER}/workflow`;
 export const DASHBOARD_USER_WORKFLOW = `${DASHBOARD_USER}/workflow`;
 export const DASHBOARD_USER_DATASET = `${DASHBOARD_USER}/dataset`;
 export const DASHBOARD_USER_DATASET_CREATE = 
`${DASHBOARD_USER_DATASET}/create`;
+export const DASHBOARD_USER_COMPUTING_UNIT = `${DASHBOARD_USER}/unit`;
 export const DASHBOARD_USER_QUOTA = `${DASHBOARD_USER}/quota`;
 export const DASHBOARD_USER_DISCUSSION = `${DASHBOARD_USER}/discussion`;
 
diff --git a/frontend/src/app/app-routing.module.ts 
b/frontend/src/app/app-routing.module.ts
index a44c55393f..03744c1df9 100644
--- a/frontend/src/app/app-routing.module.ts
+++ b/frontend/src/app/app-routing.module.ts
@@ -24,6 +24,7 @@ import { UserWorkflowComponent } from 
"./dashboard/component/user/user-workflow/
 import { UserQuotaComponent } from 
"./dashboard/component/user/user-quota/user-quota.component";
 import { UserProjectSectionComponent } from 
"./dashboard/component/user/user-project/user-project-section/user-project-section.component";
 import { UserProjectComponent } from 
"./dashboard/component/user/user-project/user-project.component";
+import { UserComputingUnitComponent } from 
"./dashboard/component/user/user-computing-unit/user-computing-unit.component";
 import { WorkspaceComponent } from "./workspace/component/workspace.component";
 import { AboutComponent } from "./hub/component/about/about.component";
 import { AuthGuardService } from "./common/service/user/auth-guard.service";
@@ -130,6 +131,10 @@ routes.push({
           path: "dataset/create",
           component: DatasetDetailComponent,
         },
+        {
+          path: "unit",
+          component: UserComputingUnitComponent,
+        },
         {
           path: "quota",
           component: UserQuotaComponent,
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index b41b1f80b7..6a92d4be2e 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -185,6 +185,7 @@ import { NzCheckboxModule } from "ng-zorro-antd/checkbox";
 import { NzRadioModule } from "ng-zorro-antd/radio";
 import { RegistrationRequestModalComponent } from 
"./common/service/user/registration-request-modal/registration-request-modal.component";
 import { MarkdownDescriptionComponent } from 
"./dashboard/component/user/markdown-description/markdown-description.component";
+import { UserComputingUnitComponent } from 
"./dashboard/component/user/user-computing-unit/user-computing-unit.component";
 
 registerLocaleData(en);
 
@@ -283,6 +284,7 @@ registerLocaleData(en);
     AdminSettingsComponent,
     RegistrationRequestModalComponent,
     MarkdownDescriptionComponent,
+    UserComputingUnitComponent,
   ],
   imports: [
     BrowserModule,
diff --git a/frontend/src/app/dashboard/component/dashboard.component.html 
b/frontend/src/app/dashboard/component/dashboard.component.html
index b238f56b93..1b1bc910b4 100644
--- a/frontend/src/app/dashboard/component/dashboard.component.html
+++ b/frontend/src/app/dashboard/component/dashboard.component.html
@@ -97,6 +97,17 @@
               nzType="database"></span>
             <span>Datasets</span>
           </li>
+          <li
+            nz-menu-item
+            nz-tooltip="Manage computing units"
+            nzMatchRouter="true"
+            nzTooltipPlacement="right"
+            [routerLink]="DASHBOARD_USER_COMPUTING_UNIT">
+            <span
+              nz-icon
+              nzType="deployment-unit"></span>
+            <span>Computing Units</span>
+          </li>
           <li
             *ngIf="sidebarTabs.quota_enabled"
             nz-menu-item
diff --git a/frontend/src/app/dashboard/component/dashboard.component.ts 
b/frontend/src/app/dashboard/component/dashboard.component.ts
index 4f642e01d1..baf7593d27 100644
--- a/frontend/src/app/dashboard/component/dashboard.component.ts
+++ b/frontend/src/app/dashboard/component/dashboard.component.ts
@@ -34,6 +34,7 @@ import {
   DASHBOARD_ADMIN_GMAIL,
   DASHBOARD_ADMIN_SETTINGS,
   DASHBOARD_ADMIN_USER,
+  DASHBOARD_USER_COMPUTING_UNIT,
   DASHBOARD_USER_DATASET,
   DASHBOARD_USER_DISCUSSION,
   DASHBOARD_USER_PROJECT,
@@ -80,6 +81,7 @@ export class DashboardComponent implements OnInit {
   protected readonly DASHBOARD_USER_PROJECT = DASHBOARD_USER_PROJECT;
   protected readonly DASHBOARD_USER_WORKFLOW = DASHBOARD_USER_WORKFLOW;
   protected readonly DASHBOARD_USER_DATASET = DASHBOARD_USER_DATASET;
+  protected readonly DASHBOARD_USER_COMPUTING_UNIT = 
DASHBOARD_USER_COMPUTING_UNIT;
   protected readonly DASHBOARD_USER_QUOTA = DASHBOARD_USER_QUOTA;
   protected readonly DASHBOARD_USER_DISCUSSION = DASHBOARD_USER_DISCUSSION;
   protected readonly DASHBOARD_ADMIN_USER = DASHBOARD_ADMIN_USER;
diff --git 
a/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.html
 
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.html
new file mode 100644
index 0000000000..53c7cb3ded
--- /dev/null
+++ 
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.html
@@ -0,0 +1,29 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<div class="section-container subsection-grid-container">
+  <nz-card class="section-title">
+    <h2 class="page-title">Computing Units</h2>
+  </nz-card>
+
+  <nz-card
+    class="section-list-container"
+    [nzBodyStyle]="{ height: '100%'}">
+  </nz-card>
+</div>
diff --git 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
 
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.scss
similarity index 61%
copy from 
frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
copy to 
frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.scss
index 127b393cff..181db6355a 100644
--- 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
+++ 
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.scss
@@ -17,39 +17,13 @@
  * under the License.
  */
 
-mat-form-field {
-  width: 100%;
-}
-.input-autocomplete-container {
-  display: flex;
-  align-items: center;
-  width: 100%;
-
-  input {
-    flex: 1;
-    margin-right: 10px;
-  }
-
-  button {
-    white-space: nowrap;
-    margin-right: 8px;
-  }
+@import "../../dashboard.component.scss";
+@import "../../section-style";
+@import "../../button-style";
 
-  .file-select-button {
-    border: 2px solid #1890ff;
-    color: #1890ff;
-
-    &:hover {
-      background-color: #e6f7ff;
-      border-color: #1890ff;
-    }
-
-    &:focus {
-      box-shadow: 0 0 0 2px rgba(24, 144, 255, 0.2);
-    }
-  }
-
-  .file-delete-button {
-    margin-right: 0;
-  }
+.subsection-grid-container {
+  min-width: 100%;
+  width: 100%;
+  min-height: 100%;
+  height: 100%;
 }
diff --git 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
 
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.spec.ts
similarity index 51%
copy from 
frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
copy to 
frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.spec.ts
index 127b393cff..a171078e9a 100644
--- 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
+++ 
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.spec.ts
@@ -17,39 +17,28 @@
  * under the License.
  */
 
-mat-form-field {
-  width: 100%;
-}
-.input-autocomplete-container {
-  display: flex;
-  align-items: center;
-  width: 100%;
+import { ComponentFixture, TestBed } from "@angular/core/testing";
+import { UserComputingUnitComponent } from "./user-computing-unit.component";
+import { NzCardModule } from "ng-zorro-antd/card";
 
-  input {
-    flex: 1;
-    margin-right: 10px;
-  }
+describe("UserComputingUnitComponent", () => {
+  let component: UserComputingUnitComponent;
+  let fixture: ComponentFixture<UserComputingUnitComponent>;
 
-  button {
-    white-space: nowrap;
-    margin-right: 8px;
-  }
+  beforeEach(async () => {
+    await TestBed.configureTestingModule({
+      declarations: [UserComputingUnitComponent],
+      imports: [NzCardModule],
+    }).compileComponents();
+  });
 
-  .file-select-button {
-    border: 2px solid #1890ff;
-    color: #1890ff;
+  beforeEach(() => {
+    fixture = TestBed.createComponent(UserComputingUnitComponent);
+    component = fixture.componentInstance;
+    fixture.detectChanges();
+  });
 
-    &:hover {
-      background-color: #e6f7ff;
-      border-color: #1890ff;
-    }
-
-    &:focus {
-      box-shadow: 0 0 0 2px rgba(24, 144, 255, 0.2);
-    }
-  }
-
-  .file-delete-button {
-    margin-right: 0;
-  }
-}
+  it("should create", () => {
+    expect(component).toBeTruthy();
+  });
+});
diff --git 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
 
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.ts
similarity index 60%
copy from 
frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
copy to 
frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.ts
index 127b393cff..1084a19d55 100644
--- 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
+++ 
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.ts
@@ -17,39 +17,11 @@
  * under the License.
  */
 
-mat-form-field {
-  width: 100%;
-}
-.input-autocomplete-container {
-  display: flex;
-  align-items: center;
-  width: 100%;
+import { Component } from "@angular/core";
 
-  input {
-    flex: 1;
-    margin-right: 10px;
-  }
-
-  button {
-    white-space: nowrap;
-    margin-right: 8px;
-  }
-
-  .file-select-button {
-    border: 2px solid #1890ff;
-    color: #1890ff;
-
-    &:hover {
-      background-color: #e6f7ff;
-      border-color: #1890ff;
-    }
-
-    &:focus {
-      box-shadow: 0 0 0 2px rgba(24, 144, 255, 0.2);
-    }
-  }
-
-  .file-delete-button {
-    margin-right: 0;
-  }
-}
+@Component({
+  selector: "texera-computing-unit-section",
+  templateUrl: "user-computing-unit.component.html",
+  styleUrls: ["user-computing-unit.component.scss"],
+})
+export class UserComputingUnitComponent {}
diff --git 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.html
 
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.html
index 08da66ec8e..53ee531fa6 100644
--- 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.html
+++ 
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.html
@@ -18,37 +18,22 @@
 -->
 
 <div [ngClass]="{'input-autocomplete-container': isFileSelectionEnabled}">
-  <ng-container *ngIf="!isInputPortConnected; else connectedInputHint">
-    <input
-      *ngIf="selectedFilePath || !isFileSelectionEnabled"
-      nz-input
-      [readOnly]="isFileSelectionEnabled"
-      required
-      [formControl]="formControl"
-      [formlyAttributes]="field" />
-    <button
-      *ngIf="isFileSelectionEnabled"
-      nz-button
-      class="file-select-button"
-      nzSize="small"
-      (click)="isFileSelectionEnabled && onClickOpenFileSelectionModal()">
-      {{ selectedFilePath ? 'Reselect File' : 'Select File' }}
-    </button>
-    <button
-      *ngIf="isFileSelectionEnabled && selectedFilePath"
-      nz-button
-      nzDanger
-      class="file-delete-button"
-      nzSize="small"
-      nz-tooltip="Clear selected file"
-      (click)="onClickDeleteSelectedFile()">
-      X
-    </button>
-  </ng-container>
+  <input
+    *ngIf="selectedFilePath || !isFileSelectionEnabled"
+    nz-input
+    [readOnly]="isFileSelectionEnabled"
+    required
+    [formControl]="formControl"
+    [formlyAttributes]="field" />
+  <button
+    *ngIf="isFileSelectionEnabled"
+    nz-button
+    class="file-select-button"
+    nzSize="small"
+    (click)="isFileSelectionEnabled && onClickOpenFileSelectionModal()">
+    {{ selectedFilePath ? 'Reselect File' : 'Select File' }}
+  </button>
 </div>
-<ng-template #connectedInputHint>
-  <span>Filename comes from the connected input port.</span>
-</ng-template>
 <div
   class="alert alert-danger"
   role="alert"
diff --git 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
 
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
index 127b393cff..cc76590399 100644
--- 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
+++ 
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
@@ -32,7 +32,6 @@ mat-form-field {
 
   button {
     white-space: nowrap;
-    margin-right: 8px;
   }
 
   .file-select-button {
@@ -48,8 +47,4 @@ mat-form-field {
       box-shadow: 0 0 0 2px rgba(24, 144, 255, 0.2);
     }
   }
-
-  .file-delete-button {
-    margin-right: 0;
-  }
 }
diff --git 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
 
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
index b97bdaa100..38839f377e 100644
--- 
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
+++ 
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-import { Component, OnInit } from "@angular/core";
+import { Component } from "@angular/core";
 import { FieldType, FieldTypeConfig } from "@ngx-formly/core";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
 import { WorkflowActionService } from 
"../../service/workflow-graph/model/workflow-action.service";
@@ -32,9 +32,7 @@ import { GuiConfigService } from 
"../../../common/service/gui-config.service";
   templateUrl: "./input-autocomplete.component.html",
   styleUrls: ["input-autocomplete.component.scss"],
 })
-export class InputAutoCompleteComponent extends FieldType<FieldTypeConfig> 
implements OnInit {
-  private connectedToInputPort = false;
-
+export class InputAutoCompleteComponent extends FieldType<FieldTypeConfig> {
   constructor(
     private modalService: NzModalService,
     public workflowActionService: WorkflowActionService,
@@ -43,14 +41,6 @@ export class InputAutoCompleteComponent extends 
FieldType<FieldTypeConfig> imple
     super();
   }
 
-  ngOnInit(): void {
-    this.refreshInputPortConnectionState();
-    this.workflowActionService
-      .workflowChanged()
-      .pipe(untilDestroyed(this))
-      .subscribe(() => this.refreshInputPortConnectionState());
-  }
-
   onClickOpenFileSelectionModal(): void {
     const modal = this.modalService.create({
       nzTitle: "Please select one file from datasets",
@@ -77,12 +67,6 @@ export class InputAutoCompleteComponent extends 
FieldType<FieldTypeConfig> imple
     });
   }
 
-  onClickDeleteSelectedFile(): void {
-    this.formControl.setValue(null);
-    this.formControl.markAsDirty();
-    this.formControl.markAsTouched();
-  }
-
   get enableDatasetSource(): boolean {
     return this.config.env.selectingFilesFromDatasetsEnabled;
   }
@@ -91,23 +75,7 @@ export class InputAutoCompleteComponent extends 
FieldType<FieldTypeConfig> imple
     return this.enableDatasetSource;
   }
 
-  get isInputPortConnected(): boolean {
-    return this.connectedToInputPort;
-  }
-
   get selectedFilePath(): string | null {
     return this.formControl.value;
   }
-
-  private refreshInputPortConnectionState(): void {
-    const operatorID = this.props["operatorID"] as string | undefined;
-    if (!operatorID) {
-      this.connectedToInputPort = false;
-      return;
-    }
-    this.connectedToInputPort = this.workflowActionService
-      .getTexeraGraph()
-      .getInputLinksByOperatorId(operatorID)
-      .some(link => link.target.portID === "input-0");
-  }
 }
diff --git 
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
 
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
index 57f3eb6395..5d457e9050 100644
--- 
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
+++ 
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
@@ -451,10 +451,6 @@ export class OperatorPropertyEditFrameComponent implements 
OnInit, OnChanges, On
       // if the title is fileName, then change it to custom autocomplete input 
template
       if (mappedField.key == "fileName") {
         mappedField.type = "inputautocomplete";
-        mappedField.props = {
-          ...mappedField.props,
-          operatorID: this.currentOperatorId,
-        };
       }
 
       // if the title is python script (for Python UDF), then make this field 
a custom template 'codearea'
diff --git 
a/frontend/src/app/workspace/service/validation/validation-workflow.service.spec.ts
 
b/frontend/src/app/workspace/service/validation/validation-workflow.service.spec.ts
index f94d6141f9..f01cbc59af 100644
--- 
a/frontend/src/app/workspace/service/validation/validation-workflow.service.spec.ts
+++ 
b/frontend/src/app/workspace/service/validation/validation-workflow.service.spec.ts
@@ -27,7 +27,6 @@ import {
   mockScanSentimentLink,
   mockSentimentPredicate,
 } from "../workflow-graph/model/mock-workflow-data";
-import { mockFileSourceSchema } from 
"../operator-metadata/mock-operator-metadata.data";
 import { WorkflowActionService } from 
"../workflow-graph/model/workflow-action.service";
 import { UndoRedoService } from "../undo-redo/undo-redo.service";
 import { OperatorMetadataService } from 
"../operator-metadata/operator-metadata.service";
@@ -37,7 +36,6 @@ import { marbles } from "rxjs-marbles";
 import { WorkflowUtilService } from 
"../workflow-graph/util/workflow-util.service";
 import { map } from "rxjs/operators";
 import { commonTestProviders } from "../../../common/testing/test-utils";
-import { OperatorPredicate } from "../../types/workflow-common.interface";
 
 describe("ValidationWorkflowService", () => {
   let validationWorkflowService: ValidationWorkflowService;
@@ -115,25 +113,6 @@ describe("ValidationWorkflowService", () => {
     
expect(validationWorkflowService.validateOperator(mockScanPredicate.operatorID).isValid).toBeFalsy();
   });
 
-  it("should treat file source filename input as optional when a file is 
selected", () => {
-    const mockFileSourcePredicate: OperatorPredicate = {
-      operatorID: "file-source-1",
-      operatorType: mockFileSourceSchema.operatorType,
-      operatorVersion: mockFileSourceSchema.operatorVersion,
-      operatorProperties: {
-        fileName: "/[email protected]/dataset/v1/file.csv",
-      },
-      inputPorts: [{ portID: "input-0" }],
-      outputPorts: [{ portID: "output-0" }],
-      showAdvanced: true,
-      isDisabled: false,
-    };
-
-    workflowActionservice.addOperator(mockFileSourcePredicate, mockPoint);
-
-    
expect(validationWorkflowService.validateOperator(mockFileSourcePredicate.operatorID).isValid).toBeTruthy();
-  });
-
   // TODO: this test is incompatible with shared editing.
   // it(
   //   "should subscribe the changes of validateOperatorStream when one 
operator box is deleted after valid status ",
diff --git 
a/frontend/src/app/workspace/service/validation/validation-workflow.service.ts 
b/frontend/src/app/workspace/service/validation/validation-workflow.service.ts
index a4158373e4..93099888b2 100644
--- 
a/frontend/src/app/workspace/service/validation/validation-workflow.service.ts
+++ 
b/frontend/src/app/workspace/service/validation/validation-workflow.service.ts
@@ -27,7 +27,6 @@ import { map } from "rxjs/operators";
 import { DynamicSchemaService } from 
"../dynamic-schema/dynamic-schema.service";
 import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
 import { WorkflowGraph, WorkflowGraphReadonly } from 
"../workflow-graph/model/workflow-graph";
-import { OperatorPredicate } from "../../types/workflow-common.interface";
 
 export type ValidationError = {
   isValid: false;
@@ -326,9 +325,6 @@ export class ValidationWorkflowService {
     for (let i = 0; i < operator.inputPorts.length; i++) {
       const port = operator.inputPorts[i];
       const portNumInputs = numInputLinksByPort.get(port.portID) ?? 0;
-      if (this.isOptionalFileInputPort(operator, port.portID, portNumInputs)) {
-        continue;
-      }
       if (port.allowMultiInputs) {
         if (portNumInputs < 1) {
           satisfyInput = false;
@@ -353,20 +349,6 @@ export class ValidationWorkflowService {
     }
   }
 
-  private isOptionalFileInputPort(
-    operator: OperatorPredicate,
-    portID: string,
-    portNumInputs: number
-  ): boolean {
-    if (portID !== "input-0" || portNumInputs > 0) {
-      return false;
-    }
-
-    const properties = operator.operatorProperties as Record<string, unknown>;
-    const selectedFileName = properties["fileName"];
-    return typeof selectedFileName === "string" && 
selectedFileName.trim().length > 0;
-  }
-
   public static combineValidation(...validations: Validation[]): Validation {
     let isValid = true;
     let messages = {};

Reply via email to