This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-input-source-operator
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-input-source-operator
by this push:
new 1d17d3ea89 fix fmt
1d17d3ea89 is described below
commit 1d17d3ea897363eb76325c6e6a54ea86f0297556
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 12 16:12:44 2026 -0700
fix fmt
---
.../apache/texera/amber/operator/LogicalOp.scala | 6 +-
.../source/scan/InputFileScanSourceOpExec.scala | 129 ---------------------
.../scan/{ => file}/FileScanSourceOpDesc.scala | 3 +-
.../source/scan/file/FileScanSourceOpExec.scala | 47 ++++++++
.../FileScanSourceOpExecSupport.scala} | 93 ++++++++-------
.../{ => file}/InputFileScanSourceOpDesc.scala | 3 +-
.../scan/file/InputFileScanSourceOpExec.scala | 48 ++++++++
.../source/scan/FileScanSourceOpExecSpec.scala | 4 +
.../scan/text/FileScanSourceOpDescSpec.scala | 5 +-
9 files changed, 157 insertions(+), 181 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index ea40c4b815..5227d4e62a 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -77,8 +77,6 @@ import
org.apache.texera.amber.operator.source.apis.twitter.v2.{
}
import
org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpDesc
import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
-import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
-import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
import
org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
@@ -141,6 +139,10 @@ import
org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallCh
import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc
import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder,
ToStringBuilder}
import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
+import org.apache.texera.amber.operator.source.scan.file.{
+ FileScanSourceOpDesc,
+ InputFileScanSourceOpDesc
+}
import
org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc
import java.util.UUID
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
deleted file mode 100644
index 8c4998e6a9..0000000000
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.operator.source.scan
-
-import org.apache.texera.amber.core.storage.{DocumentFactory, FileResolver}
-import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.{LargeBinary, Tuple, TupleLike}
-import org.apache.texera.amber.util.JSONUtils.objectMapper
-import org.apache.texera.service.util.LargeBinaryOutputStream
-import org.apache.commons.compress.archivers.ArchiveStreamFactory
-import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
-import org.apache.commons.io.IOUtils.toByteArray
-import org.apache.texera.amber.core.executor.OperatorExecutor
-
-import java.io._
-import java.net.URI
-import scala.collection.mutable
-import scala.jdk.CollectionConverters.IteratorHasAsScala
-
-class InputFileScanSourceOpExec private[scan] (
- descString: String
-) extends OperatorExecutor {
- private val desc: InputFileScanSourceOpDesc =
- objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
-
- override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
- val fileName =
- FileResolver.resolve(tuple.getFields.collectFirst { case s: String => s
}.get).toASCIIString
- val is: InputStream =
- DocumentFactory.openReadonlyDocument(new URI(fileName)).asInputStream()
-
- val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
- var zipIn: ZipArchiveInputStream = null
- var archiveStream: InputStream = null
- if (desc.extract) {
- zipIn = new ArchiveStreamFactory()
- .createArchiveInputStream(new BufferedInputStream(is))
- .asInstanceOf[ZipArchiveInputStream]
- archiveStream = zipIn
- closeables += zipIn
- } else {
- archiveStream = is
- closeables += is
- }
-
- var filenameIt: Iterator[String] = Iterator.empty
- val fileEntries: Iterator[InputStream] = {
- if (desc.extract) {
- val (it1, it2) = Iterator
- .continually(zipIn.getNextEntry)
- .takeWhile(_ != null)
- .filterNot(_.getName.startsWith("__MACOSX"))
- .duplicate
- filenameIt = it1.map(_.getName)
- it2.map(_ => zipIn)
- } else {
- Iterator(archiveStream)
- }
- }
-
- val rawIterator: Iterator[TupleLike] =
- if (desc.attributeType.isSingle) {
- fileEntries.zipAll(filenameIt, null, null).map {
- case (entry, fileName) =>
- val fields: mutable.ListBuffer[Any] = mutable.ListBuffer()
- if (desc.outputFileName) {
- fields.addOne(fileName)
- }
- fields.addOne(desc.attributeType match {
- case FileAttributeType.SINGLE_STRING =>
- new String(toByteArray(entry), desc.fileEncoding.getCharset)
- case FileAttributeType.LARGE_BINARY =>
- val largeBinary = new LargeBinary()
- val out = new LargeBinaryOutputStream(largeBinary)
- try {
- val buffer = new Array[Byte](8192)
- var bytesRead = entry.read(buffer)
- while (bytesRead != -1) {
- out.write(buffer, 0, bytesRead)
- bytesRead = entry.read(buffer)
- }
- } finally {
- out.close()
- }
- largeBinary
- case _ => parseField(toByteArray(entry),
desc.attributeType.getType)
- })
- TupleLike(fields.toSeq: _*)
- }
- } else {
- fileEntries.flatMap(entry =>
- new BufferedReader(new InputStreamReader(entry,
desc.fileEncoding.getCharset))
- .lines()
- .iterator()
- .asScala
- .slice(
- desc.fileScanOffset.getOrElse(0),
- desc.fileScanOffset.getOrElse(0) +
desc.fileScanLimit.getOrElse(Int.MaxValue)
- )
- .map(line =>
- TupleLike(desc.attributeType match {
- case FileAttributeType.SINGLE_STRING => line
- case _ => parseField(line,
desc.attributeType.getType)
- })
- )
- )
- }
-
- new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
- }
-
-}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
similarity index 95%
rename from
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
rename to
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
index a76a855cad..a695b353ad 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.texera.amber.operator.source.scan
+package org.apache.texera.amber.operator.source.scan.file
import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonProperty}
import com.kjetland.jackson.jsonSchema.annotations.{
@@ -31,6 +31,7 @@ import
org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, Workflow
import org.apache.texera.amber.core.workflow.{PhysicalOp,
SchemaPropagationFunc}
import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.{FileDecodingMethod,
ScanSourceOpDesc}
import org.apache.texera.amber.util.JSONUtils.objectMapper
@JsonIgnoreProperties(value = Array("limit", "offset", "fileEncoding"))
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
new file mode 100644
index 0000000000..bd04e45b08
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.file
+
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+import java.io.IOException
+
+class FileScanSourceOpExec private[scan] (
+ descString: String
+) extends SourceOperatorExecutor
+ with FileScanSourceOpExecSupport {
+ private val desc: FileScanSourceOpDesc =
+ objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
+
+ @throws[IOException]
+ override def produceTuple(): Iterator[TupleLike] = {
+ createTuplesFromFile(
+ fileName = desc.fileName.get,
+ attributeType = desc.attributeType,
+ fileEncoding = desc.fileEncoding,
+ extract = desc.extract,
+ outputFileName = desc.outputFileName,
+ fileScanOffset = desc.fileScanOffset,
+ fileScanLimit = desc.fileScanLimit
+ )
+ }
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
similarity index 61%
rename from
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
rename to
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
index 91c817240c..e29822dfff 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
@@ -17,51 +17,56 @@
* under the License.
*/
-package org.apache.texera.amber.operator.source.scan
+package org.apache.texera.amber.operator.source.scan.file
-import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.commons.compress.archivers.ArchiveStreamFactory
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
+import org.apache.commons.io.IOUtils.toByteArray
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
-import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.amber.operator.source.scan.{
+ AutoClosingIterator,
+ FileAttributeType,
+ FileDecodingMethod
+}
import org.apache.texera.service.util.LargeBinaryOutputStream
-import org.apache.commons.compress.archivers.ArchiveStreamFactory
-import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
-import org.apache.commons.io.IOUtils.toByteArray
import java.io._
import java.net.URI
import scala.collection.mutable
import scala.jdk.CollectionConverters.IteratorHasAsScala
-class FileScanSourceOpExec private[scan] (
- descString: String
-) extends SourceOperatorExecutor {
- private val desc: FileScanSourceOpDesc =
- objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
+private[scan] trait FileScanSourceOpExecSupport {
- @throws[IOException]
- override def produceTuple(): Iterator[TupleLike] = {
- val is: InputStream =
- DocumentFactory.openReadonlyDocument(new
URI(desc.fileName.get)).asInputStream()
+ protected def createTuplesFromFile(
+ fileName: String,
+ attributeType: FileAttributeType,
+ fileEncoding: FileDecodingMethod,
+ extract: Boolean,
+ outputFileName: Boolean,
+ fileScanOffset: Option[Int],
+ fileScanLimit: Option[Int]
+ ): Iterator[TupleLike] = {
+ val inputStream = DocumentFactory.openReadonlyDocument(new
URI(fileName)).asInputStream()
val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
var zipIn: ZipArchiveInputStream = null
- var archiveStream: InputStream = null
- if (desc.extract) {
- zipIn = new ArchiveStreamFactory()
- .createArchiveInputStream(new BufferedInputStream(is))
- .asInstanceOf[ZipArchiveInputStream]
- archiveStream = zipIn
- closeables += zipIn
- } else {
- archiveStream = is
- closeables += is
- }
+ val archiveStream: InputStream =
+ if (extract) {
+ zipIn = new ArchiveStreamFactory()
+ .createArchiveInputStream(new BufferedInputStream(inputStream))
+ .asInstanceOf[ZipArchiveInputStream]
+ closeables += zipIn
+ zipIn
+ } else {
+ closeables += inputStream
+ inputStream
+ }
var filenameIt: Iterator[String] = Iterator.empty
- val fileEntries: Iterator[InputStream] = {
- if (desc.extract) {
+ val fileEntries: Iterator[InputStream] =
+ if (extract) {
val (it1, it2) = Iterator
.continually(zipIn.getNextEntry)
.takeWhile(_ != null)
@@ -72,21 +77,19 @@ class FileScanSourceOpExec private[scan] (
} else {
Iterator(archiveStream)
}
- }
val rawIterator: Iterator[TupleLike] =
- if (desc.attributeType.isSingle) {
+ if (attributeType.isSingle) {
fileEntries.zipAll(filenameIt, null, null).map {
- case (entry, fileName) =>
- val fields: mutable.ListBuffer[Any] = mutable.ListBuffer()
- if (desc.outputFileName) {
- fields.addOne(fileName)
+ case (entry, entryFileName) =>
+ val fields = mutable.ListBuffer.empty[Any]
+ if (outputFileName) {
+ fields += entryFileName
}
- fields.addOne(desc.attributeType match {
+ fields += (attributeType match {
case FileAttributeType.SINGLE_STRING =>
- new String(toByteArray(entry), desc.fileEncoding.getCharset)
+ new String(toByteArray(entry), fileEncoding.getCharset)
case FileAttributeType.LARGE_BINARY =>
- // For large binaries, create reference and upload via
streaming
val largeBinary = new LargeBinary()
val out = new LargeBinaryOutputStream(largeBinary)
try {
@@ -100,26 +103,26 @@ class FileScanSourceOpExec private[scan] (
out.close()
}
largeBinary
- case _ => parseField(toByteArray(entry),
desc.attributeType.getType)
+ case _ => parseField(toByteArray(entry), attributeType.getType)
})
TupleLike(fields.toSeq: _*)
}
} else {
fileEntries.flatMap(entry =>
- new BufferedReader(new InputStreamReader(entry,
desc.fileEncoding.getCharset))
+ new BufferedReader(new InputStreamReader(entry,
fileEncoding.getCharset))
.lines()
.iterator()
.asScala
.slice(
- desc.fileScanOffset.getOrElse(0),
- desc.fileScanOffset.getOrElse(0) +
desc.fileScanLimit.getOrElse(Int.MaxValue)
+ fileScanOffset.getOrElse(0),
+ fileScanOffset.getOrElse(0) +
fileScanLimit.getOrElse(Int.MaxValue)
)
- .map(line => {
- TupleLike(desc.attributeType match {
+ .map(line =>
+ TupleLike(attributeType match {
case FileAttributeType.SINGLE_STRING => line
- case _ => parseField(line,
desc.attributeType.getType)
+ case _ => parseField(line,
attributeType.getType)
})
- })
+ )
)
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
similarity index 96%
rename from
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
rename to
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
index 4e7c88b000..72299fc0d2 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.texera.amber.operator.source.scan
+package org.apache.texera.amber.operator.source.scan.file
import com.fasterxml.jackson.annotation.JsonProperty
import com.kjetland.jackson.jsonSchema.annotations.{
@@ -37,6 +37,7 @@ import org.apache.texera.amber.core.workflow.{
import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.operator.source.scan.FileDecodingMethod
import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
import org.apache.texera.amber.util.JSONUtils.objectMapper
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
new file mode 100644
index 0000000000..cedb868d8e
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.file
+
+import org.apache.texera.amber.core.executor.OperatorExecutor
+import org.apache.texera.amber.core.storage.FileResolver
+import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+class InputFileScanSourceOpExec private[scan] (
+ descString: String
+) extends OperatorExecutor
+ with FileScanSourceOpExecSupport {
+ private val desc: InputFileScanSourceOpDesc =
+ objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
+
+ override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+ val fileName =
+ FileResolver.resolve(tuple.getFields.collectFirst { case s: String => s
}.get).toASCIIString
+ createTuplesFromFile(
+ fileName = fileName,
+ attributeType = desc.attributeType,
+ fileEncoding = desc.fileEncoding,
+ extract = desc.extract,
+ outputFileName = desc.outputFileName,
+ fileScanOffset = desc.fileScanOffset,
+ fileScanLimit = desc.fileScanLimit
+ )
+ }
+
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
index ab3383ca0a..4bec177ba9 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
@@ -20,6 +20,10 @@
package org.apache.texera.amber.operator.source.scan
import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema,
SchemaEnforceable}
+import org.apache.texera.amber.operator.source.scan.file.{
+ FileScanSourceOpDesc,
+ FileScanSourceOpExec
+}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
index 8c08686705..804ffabeb6 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
@@ -22,12 +22,11 @@ package org.apache.texera.amber.operator.source.scan.text
import org.apache.texera.amber.core.storage.FileResolver
import org.apache.texera.amber.core.tuple.{AttributeType, Schema,
SchemaEnforceable, Tuple}
import org.apache.texera.amber.operator.TestOperators
-import org.apache.texera.amber.operator.source.scan.{
- FileAttributeType,
- FileDecodingMethod,
+import org.apache.texera.amber.operator.source.scan.file.{
FileScanSourceOpDesc,
FileScanSourceOpExec
}
+import org.apache.texera.amber.operator.source.scan.{FileAttributeType,
FileDecodingMethod}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec