This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-source-port by this
push:
new e9aeb64fbd update
e9aeb64fbd is described below
commit e9aeb64fbd4fb1be062d83df53d3524ae3f1130d
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 6 00:07:06 2026 -0700
update
---
.../texera/amber/core/workflow/PhysicalOp.scala | 12 ++---
.../amber/core/workflow/PhysicalOpSpec.scala | 49 --------------------
.../source/scan/FileScanSourceOpExec.scala | 12 ++---
.../source/scan/InputFileScanSourceOpDesc.scala | 17 +++++++
...pExec.scala => InputFileScanSourceOpExec.scala} | 12 ++---
.../source/scan/InputFileSourceOpExec.scala | 14 +-----
.../source/scan/arrow/ArrowSourceOpExec.scala | 19 ++------
.../source/scan/csv/CSVScanSourceOpExec.scala | 23 ++--------
.../scan/csv/ParallelCSVScanSourceOpExec.scala | 29 ++++--------
.../scan/csvOld/CSVOldScanSourceOpExec.scala | 22 ++-------
.../source/scan/json/JSONLScanSourceOpExec.scala | 24 +++-------
.../source/scan/FileScanSourceOpExecSpec.scala | 42 +----------------
.../scan/InputFileScanSourceOpDescSpec.scala | 12 ++++-
.../scan/text/FileScanSourceOpDescSpec.scala | 4 --
frontend/src/app/app-routing.constant.ts | 1 +
frontend/src/app/app-routing.module.ts | 5 ++
frontend/src/app/app.module.ts | 2 +
.../dashboard/component/dashboard.component.html | 11 +++++
.../app/dashboard/component/dashboard.component.ts | 2 +
.../user-computing-unit.component.html | 29 ++++++++++++
.../user-computing-unit.component.scss} | 42 ++++-------------
.../user-computing-unit.component.spec.ts} | 53 +++++++++-------------
.../user-computing-unit.component.ts} | 42 +++--------------
.../input-autocomplete.component.html | 45 ++++++------------
.../input-autocomplete.component.scss | 5 --
.../input-autocomplete.component.ts | 36 +--------------
.../operator-property-edit-frame.component.ts | 4 --
.../validation/validation-workflow.service.spec.ts | 21 ---------
.../validation/validation-workflow.service.ts | 18 --------
29 files changed, 176 insertions(+), 431 deletions(-)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
index d28d2a460d..494fd6f76d 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala
@@ -431,19 +431,13 @@ case class PhysicalOp(
}
}
- // Extract the schemas that are currently available.
+ // Extract input schemas, checking if all are defined
val inputSchemas = updatedOp.inputPorts.collect {
case (portId, (_, _, Right(schema))) => portId -> schema
}
- val readyForPropagation = updatedOp.inputPorts.forall {
- case (_, (_, links, Right(_))) => true
- case (_, (_, links, Left(_))) => links.isEmpty
- }
-
- if (readyForPropagation) {
- // All linked input schemas are available, propagate to output schema.
- // Unlinked input ports are ignored so optional ports do not block
output inference.
+ if (updatedOp.inputPorts.size == inputSchemas.size) {
+ // All input schemas are available, propagate to output schema
val schemaPropagationResult = Try(propagateSchema.func(inputSchemas))
schemaPropagationResult match {
case Success(schemaMapping) =>
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PhysicalOpSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PhysicalOpSpec.scala
deleted file mode 100644
index b0d0e074cf..0000000000
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PhysicalOpSpec.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.core.workflow
-
-import org.apache.texera.amber.core.executor.OpExecWithClassName
-import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
-import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
OperatorIdentity, WorkflowIdentity}
-import org.scalatest.flatspec.AnyFlatSpec
-
-class PhysicalOpSpec extends AnyFlatSpec {
-
- it should "propagate output schema when an input port is unlinked" in {
- val expectedSchema = Schema().add("filename", AttributeType.STRING)
- val physicalOp = PhysicalOp
- .sourcePhysicalOp(
- WorkflowIdentity(1),
- ExecutionIdentity(1),
- OperatorIdentity("scan-source"),
- OpExecWithClassName("test.OpExec", "{}")
- )
- .withInputPorts(List(InputPort(displayName = "Filename")))
- .withOutputPorts(List(OutputPort()))
- .withPropagateSchema(
- SchemaPropagationFunc(_ => Map(PortIdentity(0) -> expectedSchema))
- )
-
- val propagated = physicalOp.propagateSchema()
- val outputSchema = propagated.outputPorts(PortIdentity(0))._3.toOption
-
- assert(outputSchema.contains(expectedSchema))
- }
-}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
index 3690b473c7..91c817240c 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
@@ -19,6 +19,7 @@
package org.apache.texera.amber.operator.source.scan
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
@@ -35,18 +36,15 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
class FileScanSourceOpExec private[scan] (
descString: String
-) extends InputFileSourceOpExec {
- override protected val desc: FileScanSourceOpDesc =
+) extends SourceOperatorExecutor {
+ private val desc: FileScanSourceOpDesc =
objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
@throws[IOException]
override def produceTuple(): Iterator[TupleLike] = {
- resolvedInputFileNames.iterator.flatMap(produceTuplesForFile)
- }
-
- private def produceTuplesForFile(resolvedFileName: String):
Iterator[TupleLike] = {
val is: InputStream =
- DocumentFactory.openReadonlyDocument(new
URI(resolvedFileName)).asInputStream()
+ DocumentFactory.openReadonlyDocument(new
URI(desc.fileName.get)).asInputStream()
+
val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
var zipIn: ZipArchiveInputStream = null
var archiveStream: InputStream = null
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
index f94253d0b2..0ec39ce200 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
@@ -20,12 +20,29 @@
package org.apache.texera.amber.operator.source.scan
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
import org.apache.texera.amber.core.workflow.{InputPort, OutputPort}
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
@JsonIgnoreProperties(value = Array("fileName"))
class InputFileScanSourceOpDesc extends FileScanSourceOpDesc {
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ) = {
+ super
+ .getPhysicalOp(workflowId, executionId)
+ .copy(opExecInitInfo =
+ OpExecWithClassName(
+
"org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpExec",
+ objectMapper.writeValueAsString(this)
+ )
+ )
+ }
+
override def operatorInfo: OperatorInfo =
OperatorInfo(
userFriendlyName = "File Scan From Input",
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
similarity index 94%
copy from
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
copy to
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
index 3690b473c7..4a1514c7f6 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
@@ -33,11 +33,11 @@ import java.net.URI
import scala.collection.mutable
import scala.jdk.CollectionConverters.IteratorHasAsScala
-class FileScanSourceOpExec private[scan] (
+class InputFileScanSourceOpExec private[scan] (
descString: String
) extends InputFileSourceOpExec {
- override protected val desc: FileScanSourceOpDesc =
- objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
+ private val desc: InputFileScanSourceOpDesc =
+ objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
@throws[IOException]
override def produceTuple(): Iterator[TupleLike] = {
@@ -47,6 +47,7 @@ class FileScanSourceOpExec private[scan] (
private def produceTuplesForFile(resolvedFileName: String):
Iterator[TupleLike] = {
val is: InputStream =
DocumentFactory.openReadonlyDocument(new
URI(resolvedFileName)).asInputStream()
+
val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
var zipIn: ZipArchiveInputStream = null
var archiveStream: InputStream = null
@@ -88,7 +89,6 @@ class FileScanSourceOpExec private[scan] (
case FileAttributeType.SINGLE_STRING =>
new String(toByteArray(entry), desc.fileEncoding.getCharset)
case FileAttributeType.LARGE_BINARY =>
- // For large binaries, create reference and upload via
streaming
val largeBinary = new LargeBinary()
val out = new LargeBinaryOutputStream(largeBinary)
try {
@@ -116,12 +116,12 @@ class FileScanSourceOpExec private[scan] (
desc.fileScanOffset.getOrElse(0),
desc.fileScanOffset.getOrElse(0) +
desc.fileScanLimit.getOrElse(Int.MaxValue)
)
- .map(line => {
+ .map(line =>
TupleLike(desc.attributeType match {
case FileAttributeType.SINGLE_STRING => line
case _ => parseField(line,
desc.attributeType.getType)
})
- })
+ )
)
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
index c4ce21b47e..cd14cc4235 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
@@ -27,9 +27,6 @@ import org.apache.texera.amber.core.workflow.PortIdentity
import scala.collection.mutable.ArrayBuffer
trait InputFileSourceOpExec extends SourceOperatorExecutor {
-
- protected def desc: ScanSourceOpDesc
-
private val inputFileNames = ArrayBuffer.empty[String]
override def processTupleMultiPort(
@@ -45,18 +42,11 @@ trait InputFileSourceOpExec extends SourceOperatorExecutor {
}
protected def resolvedInputFileNames: Seq[String] = {
- val fileNames =
- if (inputFileNames.nonEmpty) {
- inputFileNames.toSeq
- } else {
- desc.fileName.toSeq
- }
-
- if (fileNames.isEmpty) {
+ if (inputFileNames.isEmpty) {
throw new IllegalStateException("No input file is available for this
source operator.")
}
- fileNames.map(fileName =>
+ inputFileNames.toSeq.map(fileName =>
if (FileResolver.isFileResolved(fileName)) {
fileName
} else {
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala
index d851b5b1c6..f23833206f 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/arrow/ArrowSourceOpExec.scala
@@ -19,9 +19,9 @@
package org.apache.texera.amber.operator.source.scan.arrow
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.TupleLike
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
import org.apache.texera.amber.util.ArrowUtils
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.apache.arrow.memory.RootAllocator
@@ -33,26 +33,16 @@ import java.nio.file.{Files, StandardOpenOption}
class ArrowSourceOpExec(
descString: String
-) extends InputFileSourceOpExec {
- override protected val desc: ArrowSourceOpDesc =
+) extends SourceOperatorExecutor {
+ private val desc: ArrowSourceOpDesc =
objectMapper.readValue(descString, classOf[ArrowSourceOpDesc])
private var reader: Option[ArrowFileReader] = None
private var root: Option[VectorSchemaRoot] = None
private var allocator: Option[RootAllocator] = None
override def open(): Unit = {
- if (desc.fileName.isDefined) {
- initializeIfNeeded()
- }
- }
-
- private def initializeIfNeeded(): Unit = {
- if (reader.isDefined) {
- return
- }
try {
- desc.fileName = Some(resolvedInputFileName)
- val file = DocumentFactory.openReadonlyDocument(new
URI(resolvedInputFileName)).asFile()
+ val file = DocumentFactory.openReadonlyDocument(new
URI(desc.fileName.get)).asFile()
val alloc = new RootAllocator()
allocator = Some(alloc)
val channel = Files.newByteChannel(file.toPath, StandardOpenOption.READ)
@@ -68,7 +58,6 @@ class ArrowSourceOpExec(
}
override def produceTuple(): Iterator[TupleLike] = {
- initializeIfNeeded()
val rowIterator = new Iterator[TupleLike] {
private var currentIndex = 0
private var currentBatchIndex = 0
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala
index 5707c064b0..c3fbbe9bb5 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala
@@ -20,26 +20,24 @@
package org.apache.texera.amber.operator.source.scan.csv
import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.{AttributeTypeUtils, Schema,
TupleLike}
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
import org.apache.texera.amber.util.JSONUtils.objectMapper
import java.io.InputStreamReader
import java.net.URI
import scala.collection.immutable.ArraySeq
-class CSVScanSourceOpExec private[csv] (descString: String) extends
InputFileSourceOpExec {
- override protected val desc: CSVScanSourceOpDesc =
- objectMapper.readValue(descString, classOf[CSVScanSourceOpDesc])
+class CSVScanSourceOpExec private[csv] (descString: String) extends
SourceOperatorExecutor {
+ val desc: CSVScanSourceOpDesc = objectMapper.readValue(descString,
classOf[CSVScanSourceOpDesc])
var inputReader: InputStreamReader = _
var parser: CsvParser = _
var nextRow: Array[String] = _
var numRowGenerated = 0
- private var schema: Schema = _
+ private val schema: Schema = desc.sourceSchema()
override def produceTuple(): Iterator[TupleLike] = {
- initializeIfNeeded()
val rowIterator = new Iterator[Array[String]] {
override def hasNext: Boolean = {
@@ -79,19 +77,8 @@ class CSVScanSourceOpExec private[csv] (descString: String)
extends InputFileSou
}
override def open(): Unit = {
- if (desc.fileName.isDefined) {
- initializeIfNeeded()
- }
- }
-
- private def initializeIfNeeded(): Unit = {
- if (parser != null) {
- return
- }
- desc.fileName = Some(resolvedInputFileName)
- schema = desc.sourceSchema()
inputReader = new InputStreamReader(
- DocumentFactory.openReadonlyDocument(new
URI(resolvedInputFileName)).asInputStream(),
+ DocumentFactory.openReadonlyDocument(new
URI(desc.fileName.get)).asInputStream(),
desc.fileEncoding.getCharset
)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala
index 217cb606ed..1c9377e564 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala
@@ -19,10 +19,10 @@
package org.apache.texera.amber.operator.source.scan.csv
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.tuple.{Attribute, AttributeTypeUtils,
Schema, TupleLike}
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeTypeUtils,
TupleLike}
import org.apache.texera.amber.operator.source.BufferedBlockReader
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.tukaani.xz.SeekableFileInputStream
@@ -35,14 +35,13 @@ class ParallelCSVScanSourceOpExec private[csv] (
descString: String,
idx: Int = 0,
workerCount: Int = 1
-) extends InputFileSourceOpExec {
- override protected val desc: ParallelCSVScanSourceOpDesc =
+) extends SourceOperatorExecutor {
+ val desc: ParallelCSVScanSourceOpDesc =
objectMapper.readValue(descString, classOf[ParallelCSVScanSourceOpDesc])
private var reader: BufferedBlockReader = _
- private var schema: Schema = desc.sourceSchema()
+ private val schema = desc.sourceSchema()
- override def produceTuple(): Iterator[TupleLike] = {
- initializeIfNeeded()
+ override def produceTuple(): Iterator[TupleLike] =
new Iterator[TupleLike]() {
override def hasNext: Boolean = reader.hasNext
@@ -87,23 +86,11 @@ class ParallelCSVScanSourceOpExec private[csv] (
}
}.filter(tuple => tuple != null)
- }
override def open(): Unit = {
- if (desc.fileName.isDefined) {
- initializeIfNeeded()
- }
- }
-
- private def initializeIfNeeded(): Unit = {
- if (reader != null) {
- return
- }
- desc.fileName = Some(resolvedInputFileName)
- schema = desc.sourceSchema()
// here, the stream requires to be seekable, so datasetFileDesc creates a
temp file here
// TODO: consider a better way
- val file = DocumentFactory.openReadonlyDocument(new
URI(resolvedInputFileName)).asFile()
+ val file = DocumentFactory.openReadonlyDocument(new
URI(desc.fileName.get)).asFile()
val totalBytes: Long = file.length()
// TODO: add support for limit
// TODO: add support for offset
@@ -126,6 +113,6 @@ class ParallelCSVScanSourceOpExec private[csv] (
if (startOffset == 0 && desc.hasHeader) reader.readLine
}
- override def close(): Unit = if (reader != null) reader.close()
+ override def close(): Unit = reader.close()
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala
index 26df840824..e7b1e938c3 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpExec.scala
@@ -20,9 +20,9 @@
package org.apache.texera.amber.operator.source.scan.csvOld
import com.github.tototoshi.csv.{CSVReader, DefaultCSVFormat}
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.{Attribute, AttributeTypeUtils,
Schema, TupleLike}
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
import org.apache.texera.amber.util.JSONUtils.objectMapper
import java.net.URI
@@ -30,14 +30,13 @@ import scala.collection.compat.immutable.ArraySeq
class CSVOldScanSourceOpExec private[csvOld] (
descString: String
-) extends InputFileSourceOpExec {
- override protected val desc: CSVOldScanSourceOpDesc =
+) extends SourceOperatorExecutor {
+ val desc: CSVOldScanSourceOpDesc =
objectMapper.readValue(descString, classOf[CSVOldScanSourceOpDesc])
var reader: CSVReader = _
var rows: Iterator[Seq[String]] = _
- var schema: Schema = _
+ val schema: Schema = desc.sourceSchema()
override def produceTuple(): Iterator[TupleLike] = {
- initializeIfNeeded()
val tuples = rows
.map(fields =>
@@ -63,21 +62,10 @@ class CSVOldScanSourceOpExec private[csvOld] (
}
override def open(): Unit = {
- if (desc.fileName.isDefined) {
- initializeIfNeeded()
- }
- }
-
- private def initializeIfNeeded(): Unit = {
- if (reader != null) {
- return
- }
- desc.fileName = Some(resolvedInputFileName)
- schema = desc.sourceSchema()
implicit object CustomFormat extends DefaultCSVFormat {
override val delimiter: Char = desc.customDelimiter.get.charAt(0)
}
- val filePath = DocumentFactory.openReadonlyDocument(new
URI(resolvedInputFileName)).asFile().toPath
+ val filePath = DocumentFactory.openReadonlyDocument(new
URI(desc.fileName.get)).asFile().toPath
reader = CSVReader.open(filePath.toString,
desc.fileEncoding.getCharset.name())(CustomFormat)
// skip line if this worker reads the start of a file, and the file has a
header line
val startOffset = desc.offset.getOrElse(0) + (if (desc.hasHeader) 1 else 0)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala
index 850a12ebf6..c96278c3e3 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala
@@ -19,10 +19,10 @@
package org.apache.texera.amber.operator.source.scan.json
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
import org.apache.texera.amber.core.tuple.TupleLike
-import org.apache.texera.amber.operator.source.scan.InputFileSourceOpExec
import org.apache.texera.amber.operator.source.scan.json.JSONUtil.JSONToMap
import org.apache.texera.amber.util.JSONUtils.objectMapper
@@ -35,15 +35,14 @@ class JSONLScanSourceOpExec private[json] (
descString: String,
idx: Int = 0,
workerCount: Int = 1
-) extends InputFileSourceOpExec {
- override protected val desc: JSONLScanSourceOpDesc =
+) extends SourceOperatorExecutor {
+ private val desc: JSONLScanSourceOpDesc =
objectMapper.readValue(descString, classOf[JSONLScanSourceOpDesc])
private var rows: Iterator[String] = _
private var reader: BufferedReader = _
- private var schema = desc.sourceSchema()
+ private val schema = desc.sourceSchema()
override def produceTuple(): Iterator[TupleLike] = {
- initializeIfNeeded()
rows.flatMap { line =>
Try {
val data = JSONToMap(objectMapper.readTree(line),
desc.flatten).withDefaultValue(null)
@@ -59,18 +58,7 @@ class JSONLScanSourceOpExec private[json] (
}
override def open(): Unit = {
- if (desc.fileName.isDefined) {
- initializeIfNeeded()
- }
- }
-
- private def initializeIfNeeded(): Unit = {
- if (reader != null) {
- return
- }
- desc.fileName = Some(resolvedInputFileName)
- schema = desc.sourceSchema()
- val stream = DocumentFactory.openReadonlyDocument(new
URI(resolvedInputFileName)).asInputStream()
+ val stream = DocumentFactory.openReadonlyDocument(new
URI(desc.fileName.get)).asInputStream()
// count lines and partition the task to each worker
reader = new BufferedReader(
new InputStreamReader(stream, desc.fileEncoding.getCharset)
@@ -89,6 +77,6 @@ class JSONLScanSourceOpExec private[json] (
rows = it2.iterator.slice(startOffset, endOffset)
}
- override def close(): Unit = if (reader != null) reader.close()
+ override def close(): Unit = reader.close()
}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
index 6aeea6f798..ab3383ca0a 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
@@ -19,7 +19,7 @@
package org.apache.texera.amber.operator.source.scan
-import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema,
SchemaEnforceable, Tuple}
+import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema,
SchemaEnforceable}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
@@ -145,46 +145,6 @@ class FileScanSourceOpExecSpec extends AnyFlatSpec with
BeforeAndAfterAll {
}
}
- it should "read filename from connected input when provided" in {
- val desc = createDescriptor()
- desc.fileName = None
- desc.attributeType = FileAttributeType.SINGLE_STRING
-
- val executor = new
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
- val inputTuple = Tuple(Schema().add("filename", AttributeType.STRING),
Array(testFile.toString))
-
- executor.open()
- executor.processTuple(inputTuple, 0)
- val tuples = executor
- .produceTuple()
- .map(tupleLike =>
tupleLike.asInstanceOf[SchemaEnforceable].enforceSchema(desc.sourceSchema()))
- .toSeq
- executor.close()
-
- assert(tuples.size == 1)
- assert(tuples.head.getField[String]("line") == "Test content\nLine 2\nLine
3")
- }
-
- it should "read filename from connected input through processTupleMultiPort"
in {
- val desc = createDescriptor()
- desc.fileName = None
- desc.attributeType = FileAttributeType.SINGLE_STRING
-
- val executor = new
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
- val inputTuple = Tuple(Schema().add("filename", AttributeType.STRING),
Array(testFile.toString))
-
- executor.open()
- executor.processTupleMultiPort(inputTuple, 0)
- val tuples = executor
- .produceTuple()
- .map(tupleLike =>
tupleLike.asInstanceOf[SchemaEnforceable].enforceSchema(desc.sourceSchema()))
- .toSeq
- executor.close()
-
- assert(tuples.size == 1)
- assert(tuples.head.getField[String]("line") == "Test content\nLine 2\nLine
3")
- }
-
// LargeBinary Tests
it should "create valid LargeBinary with correct URI parsing" in {
val pointer = new LargeBinary("s3://bucket/path/to/object")
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
index a3b59723cb..e21cc58b36 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
@@ -43,7 +43,7 @@ class InputFileScanSourceOpDescSpec extends AnyFlatSpec {
val desc = new InputFileScanSourceOpDesc()
desc.attributeType = FileAttributeType.SINGLE_STRING
- val executor = new
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
+ val executor = new
InputFileScanSourceOpExec(objectMapper.writeValueAsString(desc))
val inputTuple = Tuple(
Schema().add("filename", AttributeType.STRING),
Array(FileResolver.resolve(TestOperators.TestTextFilePath).toASCIIString)
@@ -64,4 +64,14 @@ class InputFileScanSourceOpDescSpec extends AnyFlatSpec {
.equals("line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\nline10")
)
}
+
+ it should "reject execution when no filename input tuple is provided" in {
+ val desc = new InputFileScanSourceOpDesc()
+ desc.attributeType = FileAttributeType.SINGLE_STRING
+
+ val executor = new
InputFileScanSourceOpExec(objectMapper.writeValueAsString(desc))
+ executor.open()
+ assertThrows[IllegalStateException](executor.produceTuple().toList)
+ executor.close()
+ }
}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
index 4a270331e4..8c08686705 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
@@ -49,10 +49,6 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with
BeforeAndAfter {
assert(inferredSchema.getAttribute("line").getType == AttributeType.STRING)
}
- it should "not expose filename input port on file scan source" in {
- assert(fileScanSourceOpDesc.operatorInfo.inputPorts.isEmpty)
- }
-
it should "infer schema with single column representing entire file in
outputAsSingleTuple mode" in {
fileScanSourceOpDesc.attributeType = FileAttributeType.SINGLE_STRING
val inferredSchema: Schema = fileScanSourceOpDesc.sourceSchema()
diff --git a/frontend/src/app/app-routing.constant.ts
b/frontend/src/app/app-routing.constant.ts
index a22f6439e2..582f6d865b 100644
--- a/frontend/src/app/app-routing.constant.ts
+++ b/frontend/src/app/app-routing.constant.ts
@@ -35,6 +35,7 @@ export const DASHBOARD_USER_WORKSPACE =
`${DASHBOARD_USER}/workflow`;
export const DASHBOARD_USER_WORKFLOW = `${DASHBOARD_USER}/workflow`;
export const DASHBOARD_USER_DATASET = `${DASHBOARD_USER}/dataset`;
export const DASHBOARD_USER_DATASET_CREATE =
`${DASHBOARD_USER_DATASET}/create`;
+export const DASHBOARD_USER_COMPUTING_UNIT = `${DASHBOARD_USER}/unit`;
export const DASHBOARD_USER_QUOTA = `${DASHBOARD_USER}/quota`;
export const DASHBOARD_USER_DISCUSSION = `${DASHBOARD_USER}/discussion`;
diff --git a/frontend/src/app/app-routing.module.ts
b/frontend/src/app/app-routing.module.ts
index a44c55393f..03744c1df9 100644
--- a/frontend/src/app/app-routing.module.ts
+++ b/frontend/src/app/app-routing.module.ts
@@ -24,6 +24,7 @@ import { UserWorkflowComponent } from
"./dashboard/component/user/user-workflow/
import { UserQuotaComponent } from
"./dashboard/component/user/user-quota/user-quota.component";
import { UserProjectSectionComponent } from
"./dashboard/component/user/user-project/user-project-section/user-project-section.component";
import { UserProjectComponent } from
"./dashboard/component/user/user-project/user-project.component";
+import { UserComputingUnitComponent } from
"./dashboard/component/user/user-computing-unit/user-computing-unit.component";
import { WorkspaceComponent } from "./workspace/component/workspace.component";
import { AboutComponent } from "./hub/component/about/about.component";
import { AuthGuardService } from "./common/service/user/auth-guard.service";
@@ -130,6 +131,10 @@ routes.push({
path: "dataset/create",
component: DatasetDetailComponent,
},
+ {
+ path: "unit",
+ component: UserComputingUnitComponent,
+ },
{
path: "quota",
component: UserQuotaComponent,
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index b41b1f80b7..6a92d4be2e 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -185,6 +185,7 @@ import { NzCheckboxModule } from "ng-zorro-antd/checkbox";
import { NzRadioModule } from "ng-zorro-antd/radio";
import { RegistrationRequestModalComponent } from
"./common/service/user/registration-request-modal/registration-request-modal.component";
import { MarkdownDescriptionComponent } from
"./dashboard/component/user/markdown-description/markdown-description.component";
+import { UserComputingUnitComponent } from
"./dashboard/component/user/user-computing-unit/user-computing-unit.component";
registerLocaleData(en);
@@ -283,6 +284,7 @@ registerLocaleData(en);
AdminSettingsComponent,
RegistrationRequestModalComponent,
MarkdownDescriptionComponent,
+ UserComputingUnitComponent,
],
imports: [
BrowserModule,
diff --git a/frontend/src/app/dashboard/component/dashboard.component.html
b/frontend/src/app/dashboard/component/dashboard.component.html
index b238f56b93..1b1bc910b4 100644
--- a/frontend/src/app/dashboard/component/dashboard.component.html
+++ b/frontend/src/app/dashboard/component/dashboard.component.html
@@ -97,6 +97,17 @@
nzType="database"></span>
<span>Datasets</span>
</li>
+ <li
+ nz-menu-item
+ nz-tooltip="Manage computing units"
+ nzMatchRouter="true"
+ nzTooltipPlacement="right"
+ [routerLink]="DASHBOARD_USER_COMPUTING_UNIT">
+ <span
+ nz-icon
+ nzType="deployment-unit"></span>
+ <span>Computing Units</span>
+ </li>
<li
*ngIf="sidebarTabs.quota_enabled"
nz-menu-item
diff --git a/frontend/src/app/dashboard/component/dashboard.component.ts
b/frontend/src/app/dashboard/component/dashboard.component.ts
index 4f642e01d1..baf7593d27 100644
--- a/frontend/src/app/dashboard/component/dashboard.component.ts
+++ b/frontend/src/app/dashboard/component/dashboard.component.ts
@@ -34,6 +34,7 @@ import {
DASHBOARD_ADMIN_GMAIL,
DASHBOARD_ADMIN_SETTINGS,
DASHBOARD_ADMIN_USER,
+ DASHBOARD_USER_COMPUTING_UNIT,
DASHBOARD_USER_DATASET,
DASHBOARD_USER_DISCUSSION,
DASHBOARD_USER_PROJECT,
@@ -80,6 +81,7 @@ export class DashboardComponent implements OnInit {
protected readonly DASHBOARD_USER_PROJECT = DASHBOARD_USER_PROJECT;
protected readonly DASHBOARD_USER_WORKFLOW = DASHBOARD_USER_WORKFLOW;
protected readonly DASHBOARD_USER_DATASET = DASHBOARD_USER_DATASET;
+ protected readonly DASHBOARD_USER_COMPUTING_UNIT =
DASHBOARD_USER_COMPUTING_UNIT;
protected readonly DASHBOARD_USER_QUOTA = DASHBOARD_USER_QUOTA;
protected readonly DASHBOARD_USER_DISCUSSION = DASHBOARD_USER_DISCUSSION;
protected readonly DASHBOARD_ADMIN_USER = DASHBOARD_ADMIN_USER;
diff --git
a/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.html
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.html
new file mode 100644
index 0000000000..53c7cb3ded
--- /dev/null
+++
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.html
@@ -0,0 +1,29 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<div class="section-container subsection-grid-container">
+ <nz-card class="section-title">
+ <h2 class="page-title">Computing Units</h2>
+ </nz-card>
+
+ <nz-card
+ class="section-list-container"
+ [nzBodyStyle]="{ height: '100%'}">
+ </nz-card>
+</div>
diff --git
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.scss
similarity index 61%
copy from
frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
copy to
frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.scss
index 127b393cff..181db6355a 100644
---
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
+++
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.scss
@@ -17,39 +17,13 @@
* under the License.
*/
-mat-form-field {
- width: 100%;
-}
-.input-autocomplete-container {
- display: flex;
- align-items: center;
- width: 100%;
-
- input {
- flex: 1;
- margin-right: 10px;
- }
-
- button {
- white-space: nowrap;
- margin-right: 8px;
- }
+@import "../../dashboard.component.scss";
+@import "../../section-style";
+@import "../../button-style";
- .file-select-button {
- border: 2px solid #1890ff;
- color: #1890ff;
-
- &:hover {
- background-color: #e6f7ff;
- border-color: #1890ff;
- }
-
- &:focus {
- box-shadow: 0 0 0 2px rgba(24, 144, 255, 0.2);
- }
- }
-
- .file-delete-button {
- margin-right: 0;
- }
+.subsection-grid-container {
+ min-width: 100%;
+ width: 100%;
+ min-height: 100%;
+ height: 100%;
}
diff --git
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.spec.ts
similarity index 51%
copy from
frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
copy to
frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.spec.ts
index 127b393cff..a171078e9a 100644
---
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
+++
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.spec.ts
@@ -17,39 +17,28 @@
* under the License.
*/
-mat-form-field {
- width: 100%;
-}
-.input-autocomplete-container {
- display: flex;
- align-items: center;
- width: 100%;
+import { ComponentFixture, TestBed } from "@angular/core/testing";
+import { UserComputingUnitComponent } from "./user-computing-unit.component";
+import { NzCardModule } from "ng-zorro-antd/card";
- input {
- flex: 1;
- margin-right: 10px;
- }
+describe("UserComputingUnitComponent", () => {
+ let component: UserComputingUnitComponent;
+ let fixture: ComponentFixture<UserComputingUnitComponent>;
- button {
- white-space: nowrap;
- margin-right: 8px;
- }
+ beforeEach(async () => {
+ await TestBed.configureTestingModule({
+ declarations: [UserComputingUnitComponent],
+ imports: [NzCardModule],
+ }).compileComponents();
+ });
- .file-select-button {
- border: 2px solid #1890ff;
- color: #1890ff;
+ beforeEach(() => {
+ fixture = TestBed.createComponent(UserComputingUnitComponent);
+ component = fixture.componentInstance;
+ fixture.detectChanges();
+ });
- &:hover {
- background-color: #e6f7ff;
- border-color: #1890ff;
- }
-
- &:focus {
- box-shadow: 0 0 0 2px rgba(24, 144, 255, 0.2);
- }
- }
-
- .file-delete-button {
- margin-right: 0;
- }
-}
+ it("should create", () => {
+ expect(component).toBeTruthy();
+ });
+});
diff --git
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.ts
similarity index 60%
copy from
frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
copy to
frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.ts
index 127b393cff..1084a19d55 100644
---
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
+++
b/frontend/src/app/dashboard/component/user/user-computing-unit/user-computing-unit.component.ts
@@ -17,39 +17,11 @@
* under the License.
*/
-mat-form-field {
- width: 100%;
-}
-.input-autocomplete-container {
- display: flex;
- align-items: center;
- width: 100%;
+import { Component } from "@angular/core";
- input {
- flex: 1;
- margin-right: 10px;
- }
-
- button {
- white-space: nowrap;
- margin-right: 8px;
- }
-
- .file-select-button {
- border: 2px solid #1890ff;
- color: #1890ff;
-
- &:hover {
- background-color: #e6f7ff;
- border-color: #1890ff;
- }
-
- &:focus {
- box-shadow: 0 0 0 2px rgba(24, 144, 255, 0.2);
- }
- }
-
- .file-delete-button {
- margin-right: 0;
- }
-}
+@Component({
+ selector: "texera-computing-unit-section",
+ templateUrl: "user-computing-unit.component.html",
+ styleUrls: ["user-computing-unit.component.scss"],
+})
+export class UserComputingUnitComponent {}
diff --git
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.html
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.html
index 08da66ec8e..53ee531fa6 100644
---
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.html
+++
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.html
@@ -18,37 +18,22 @@
-->
<div [ngClass]="{'input-autocomplete-container': isFileSelectionEnabled}">
- <ng-container *ngIf="!isInputPortConnected; else connectedInputHint">
- <input
- *ngIf="selectedFilePath || !isFileSelectionEnabled"
- nz-input
- [readOnly]="isFileSelectionEnabled"
- required
- [formControl]="formControl"
- [formlyAttributes]="field" />
- <button
- *ngIf="isFileSelectionEnabled"
- nz-button
- class="file-select-button"
- nzSize="small"
- (click)="isFileSelectionEnabled && onClickOpenFileSelectionModal()">
- {{ selectedFilePath ? 'Reselect File' : 'Select File' }}
- </button>
- <button
- *ngIf="isFileSelectionEnabled && selectedFilePath"
- nz-button
- nzDanger
- class="file-delete-button"
- nzSize="small"
- nz-tooltip="Clear selected file"
- (click)="onClickDeleteSelectedFile()">
- X
- </button>
- </ng-container>
+ <input
+ *ngIf="selectedFilePath || !isFileSelectionEnabled"
+ nz-input
+ [readOnly]="isFileSelectionEnabled"
+ required
+ [formControl]="formControl"
+ [formlyAttributes]="field" />
+ <button
+ *ngIf="isFileSelectionEnabled"
+ nz-button
+ class="file-select-button"
+ nzSize="small"
+ (click)="isFileSelectionEnabled && onClickOpenFileSelectionModal()">
+ {{ selectedFilePath ? 'Reselect File' : 'Select File' }}
+ </button>
</div>
-<ng-template #connectedInputHint>
- <span>Filename comes from the connected input port.</span>
-</ng-template>
<div
class="alert alert-danger"
role="alert"
diff --git
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
index 127b393cff..cc76590399 100644
---
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
+++
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.scss
@@ -32,7 +32,6 @@ mat-form-field {
button {
white-space: nowrap;
- margin-right: 8px;
}
.file-select-button {
@@ -48,8 +47,4 @@ mat-form-field {
box-shadow: 0 0 0 2px rgba(24, 144, 255, 0.2);
}
}
-
- .file-delete-button {
- margin-right: 0;
- }
}
diff --git
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
index b97bdaa100..38839f377e 100644
---
a/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
+++
b/frontend/src/app/workspace/component/input-autocomplete/input-autocomplete.component.ts
@@ -17,7 +17,7 @@
* under the License.
*/
-import { Component, OnInit } from "@angular/core";
+import { Component } from "@angular/core";
import { FieldType, FieldTypeConfig } from "@ngx-formly/core";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { WorkflowActionService } from
"../../service/workflow-graph/model/workflow-action.service";
@@ -32,9 +32,7 @@ import { GuiConfigService } from
"../../../common/service/gui-config.service";
templateUrl: "./input-autocomplete.component.html",
styleUrls: ["input-autocomplete.component.scss"],
})
-export class InputAutoCompleteComponent extends FieldType<FieldTypeConfig>
implements OnInit {
- private connectedToInputPort = false;
-
+export class InputAutoCompleteComponent extends FieldType<FieldTypeConfig> {
constructor(
private modalService: NzModalService,
public workflowActionService: WorkflowActionService,
@@ -43,14 +41,6 @@ export class InputAutoCompleteComponent extends
FieldType<FieldTypeConfig> imple
super();
}
- ngOnInit(): void {
- this.refreshInputPortConnectionState();
- this.workflowActionService
- .workflowChanged()
- .pipe(untilDestroyed(this))
- .subscribe(() => this.refreshInputPortConnectionState());
- }
-
onClickOpenFileSelectionModal(): void {
const modal = this.modalService.create({
nzTitle: "Please select one file from datasets",
@@ -77,12 +67,6 @@ export class InputAutoCompleteComponent extends
FieldType<FieldTypeConfig> imple
});
}
- onClickDeleteSelectedFile(): void {
- this.formControl.setValue(null);
- this.formControl.markAsDirty();
- this.formControl.markAsTouched();
- }
-
get enableDatasetSource(): boolean {
return this.config.env.selectingFilesFromDatasetsEnabled;
}
@@ -91,23 +75,7 @@ export class InputAutoCompleteComponent extends
FieldType<FieldTypeConfig> imple
return this.enableDatasetSource;
}
- get isInputPortConnected(): boolean {
- return this.connectedToInputPort;
- }
-
get selectedFilePath(): string | null {
return this.formControl.value;
}
-
- private refreshInputPortConnectionState(): void {
- const operatorID = this.props["operatorID"] as string | undefined;
- if (!operatorID) {
- this.connectedToInputPort = false;
- return;
- }
- this.connectedToInputPort = this.workflowActionService
- .getTexeraGraph()
- .getInputLinksByOperatorId(operatorID)
- .some(link => link.target.portID === "input-0");
- }
}
diff --git
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
index 57f3eb6395..5d457e9050 100644
---
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
+++
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
@@ -451,10 +451,6 @@ export class OperatorPropertyEditFrameComponent implements
OnInit, OnChanges, On
// if the title is fileName, then change it to custom autocomplete input
template
if (mappedField.key == "fileName") {
mappedField.type = "inputautocomplete";
- mappedField.props = {
- ...mappedField.props,
- operatorID: this.currentOperatorId,
- };
}
// if the title is python script (for Python UDF), then make this field
a custom template 'codearea'
diff --git
a/frontend/src/app/workspace/service/validation/validation-workflow.service.spec.ts
b/frontend/src/app/workspace/service/validation/validation-workflow.service.spec.ts
index f94d6141f9..f01cbc59af 100644
---
a/frontend/src/app/workspace/service/validation/validation-workflow.service.spec.ts
+++
b/frontend/src/app/workspace/service/validation/validation-workflow.service.spec.ts
@@ -27,7 +27,6 @@ import {
mockScanSentimentLink,
mockSentimentPredicate,
} from "../workflow-graph/model/mock-workflow-data";
-import { mockFileSourceSchema } from
"../operator-metadata/mock-operator-metadata.data";
import { WorkflowActionService } from
"../workflow-graph/model/workflow-action.service";
import { UndoRedoService } from "../undo-redo/undo-redo.service";
import { OperatorMetadataService } from
"../operator-metadata/operator-metadata.service";
@@ -37,7 +36,6 @@ import { marbles } from "rxjs-marbles";
import { WorkflowUtilService } from
"../workflow-graph/util/workflow-util.service";
import { map } from "rxjs/operators";
import { commonTestProviders } from "../../../common/testing/test-utils";
-import { OperatorPredicate } from "../../types/workflow-common.interface";
describe("ValidationWorkflowService", () => {
let validationWorkflowService: ValidationWorkflowService;
@@ -115,25 +113,6 @@ describe("ValidationWorkflowService", () => {
expect(validationWorkflowService.validateOperator(mockScanPredicate.operatorID).isValid).toBeFalsy();
});
- it("should treat file source filename input as optional when a file is
selected", () => {
- const mockFileSourcePredicate: OperatorPredicate = {
- operatorID: "file-source-1",
- operatorType: mockFileSourceSchema.operatorType,
- operatorVersion: mockFileSourceSchema.operatorVersion,
- operatorProperties: {
- fileName: "/[email protected]/dataset/v1/file.csv",
- },
- inputPorts: [{ portID: "input-0" }],
- outputPorts: [{ portID: "output-0" }],
- showAdvanced: true,
- isDisabled: false,
- };
-
- workflowActionservice.addOperator(mockFileSourcePredicate, mockPoint);
-
-
expect(validationWorkflowService.validateOperator(mockFileSourcePredicate.operatorID).isValid).toBeTruthy();
- });
-
// TODO: this test is incompatible with shared editing.
// it(
// "should subscribe the changes of validateOperatorStream when one
operator box is deleted after valid status ",
diff --git
a/frontend/src/app/workspace/service/validation/validation-workflow.service.ts
b/frontend/src/app/workspace/service/validation/validation-workflow.service.ts
index a4158373e4..93099888b2 100644
---
a/frontend/src/app/workspace/service/validation/validation-workflow.service.ts
+++
b/frontend/src/app/workspace/service/validation/validation-workflow.service.ts
@@ -27,7 +27,6 @@ import { map } from "rxjs/operators";
import { DynamicSchemaService } from
"../dynamic-schema/dynamic-schema.service";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { WorkflowGraph, WorkflowGraphReadonly } from
"../workflow-graph/model/workflow-graph";
-import { OperatorPredicate } from "../../types/workflow-common.interface";
export type ValidationError = {
isValid: false;
@@ -326,9 +325,6 @@ export class ValidationWorkflowService {
for (let i = 0; i < operator.inputPorts.length; i++) {
const port = operator.inputPorts[i];
const portNumInputs = numInputLinksByPort.get(port.portID) ?? 0;
- if (this.isOptionalFileInputPort(operator, port.portID, portNumInputs)) {
- continue;
- }
if (port.allowMultiInputs) {
if (portNumInputs < 1) {
satisfyInput = false;
@@ -353,20 +349,6 @@ export class ValidationWorkflowService {
}
}
- private isOptionalFileInputPort(
- operator: OperatorPredicate,
- portID: string,
- portNumInputs: number
- ): boolean {
- if (portID !== "input-0" || portNumInputs > 0) {
- return false;
- }
-
- const properties = operator.operatorProperties as Record<string, unknown>;
- const selectedFileName = properties["fileName"];
- return typeof selectedFileName === "string" &&
selectedFileName.trim().length > 0;
- }
-
public static combineValidation(...validations: Validation[]): Validation {
let isValid = true;
let messages = {};