This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-source-port by this
push:
new 94b332c27e update
94b332c27e is described below
commit 94b332c27eb2059cbd1ae9d722d3735346aac1a7
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Apr 10 01:57:30 2026 -0700
update
---
.../org/apache/texera/workflow/LogicalPlan.scala | 9 ----
.../dataset/DatasetSelectorSourceOpDesc.scala | 8 ++-
.../dataset/DatasetSelectorSourceOpExec.scala | 62 +++++-----------------
3 files changed, 16 insertions(+), 63 deletions(-)
diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
index 057963a1f9..b64b0ee1d2 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
@@ -154,15 +154,6 @@ case class LogicalPlan(
.find(_.trim.nonEmpty)
.map(_.trim)
.getOrElse(throw new RuntimeException("No input file name"))
- case datasetSelector: DatasetSelectorSourceOpDesc =>
- DatasetSelectorSourceOpExec
- .listFileNames(datasetSelector.datasetVersionPath)
- .headOption
- .getOrElse(
- throw new RuntimeException(
- "Selected dataset version does not contain any files."
- )
- )
case _ =>
throw new RuntimeException(
"Unsupported upstream operator for CSV File Scan From Input
column inference."
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
index 61a061b5c0..e99be7aa0d 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
@@ -25,11 +25,11 @@ import
org.apache.texera.amber.core.executor.OpExecWithClassName
import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp,
SchemaPropagationFunc}
+import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
-import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
import org.apache.texera.amber.util.JSONUtils.objectMapper
-class DatasetSelectorSourceOpDesc extends SourceOperatorDescriptor {
+class DatasetSelectorSourceOpDesc extends LogicalOp {
@JsonProperty(required = true)
@JsonSchemaTitle("Dataset Version")
@@ -52,11 +52,9 @@ class DatasetSelectorSourceOpDesc extends
SourceOperatorDescriptor {
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withPropagateSchema(
- SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id ->
sourceSchema()))
+ SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id ->
Schema().add("filename", AttributeType.STRING)))
)
- override def sourceSchema(): Schema = Schema().add("filename",
AttributeType.STRING)
-
override def operatorInfo: OperatorInfo =
OperatorInfo(
userFriendlyName = "Dataset Selector",
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
index 6cb426f3a2..51da8ffd0b 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
@@ -19,73 +19,37 @@
package org.apache.texera.amber.operator.source.dataset
-import io.lakefs.clients.sdk.model.ObjectStats
import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
import org.apache.texera.amber.core.tuple.TupleLike
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.apache.texera.dao.SqlServer
-import org.apache.texera.dao.SqlServer.withTransaction
import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET
import
org.apache.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION
import org.apache.texera.dao.jooq.generated.tables.User.USER
-import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset,
DatasetVersion}
class DatasetSelectorSourceOpExec private[dataset] (descString: String)
extends SourceOperatorExecutor {
private val desc: DatasetSelectorSourceOpDesc =
objectMapper.readValue(descString, classOf[DatasetSelectorSourceOpDesc])
override def produceTuple(): Iterator[TupleLike] = {
- DatasetSelectorSourceOpExec
- .listFileNames(desc.datasetVersionPath)
- .iterator
- .map(fileName => TupleLike("filename" -> fileName))
- }
-}
-
-object DatasetSelectorSourceOpExec {
-
-
- private def isRealFile(obj: ObjectStats): Boolean = {
- val path = Option(obj.getPath).getOrElse("").trim
- path.nonEmpty && !path.endsWith("/")
- }
-
- def listFileNames(datasetVersionPath: String): Seq[String] = {
- val Array(ownerEmail, datasetName, versionName) =
datasetVersionPath.trim.split("/")
- val (dataset, datasetVersion) = resolveDatasetVersion(ownerEmail,
datasetName, versionName)
- val versionPrefix = s"/$ownerEmail/$datasetName/$versionName"
- LakeFSStorageClient
- .retrieveObjectsOfVersion(dataset.getRepositoryName,
datasetVersion.getVersionHash)
- .iterator
- .filter(isRealFile)
- .toSeq
- .sortBy(_.getPath)
- .map(obj => s"$versionPrefix/${obj.getPath}")
- }
+ val Seq(_, ownerEmail, datasetName, versionName) =
+ desc.datasetVersionPath.split("/").toSeq
- private def resolveDatasetVersion(
- ownerEmail: String,
- datasetName: String,
- versionName: String
- ): (Dataset, DatasetVersion) =
- withTransaction(SqlServer.getInstance().createDSLContext()) { ctx =>
- val dataset = ctx
- .select(DATASET.fields: _*)
+ val (repositoryName, versionHash) =
+ SqlServer.getInstance().createDSLContext()
+ .select(DATASET.REPOSITORY_NAME, DATASET_VERSION.VERSION_HASH)
.from(DATASET)
- .leftJoin(USER)
- .on(USER.UID.eq(DATASET.OWNER_UID))
+ .join(USER).on(USER.UID.eq(DATASET.OWNER_UID))
+ .join(DATASET_VERSION).on(DATASET_VERSION.DID.eq(DATASET.DID))
.where(USER.EMAIL.eq(ownerEmail))
.and(DATASET.NAME.eq(datasetName))
- .fetchOneInto(classOf[Dataset])
-
-
- val datasetVersion = ctx
- .selectFrom(DATASET_VERSION)
- .where(DATASET_VERSION.DID.eq(dataset.getDid))
.and(DATASET_VERSION.NAME.eq(versionName))
- .fetchOneInto(classOf[DatasetVersion])
+ .fetchOne(r => (r.value1(), r.value2()))
- (dataset, datasetVersion)
- }
+ LakeFSStorageClient
+ .retrieveObjectsOfVersion(repositoryName, versionHash)
+ .map(obj => TupleLike("filename" ->
s"${desc.datasetVersionPath}/${obj.getPath}"))
+ .iterator
+ }
}