This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-source-port by this 
push:
     new 94b332c27e update
94b332c27e is described below

commit 94b332c27eb2059cbd1ae9d722d3735346aac1a7
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Apr 10 01:57:30 2026 -0700

    update
---
 .../org/apache/texera/workflow/LogicalPlan.scala   |  9 ----
 .../dataset/DatasetSelectorSourceOpDesc.scala      |  8 ++-
 .../dataset/DatasetSelectorSourceOpExec.scala      | 62 +++++-----------------
 3 files changed, 16 insertions(+), 63 deletions(-)

diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala 
b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
index 057963a1f9..b64b0ee1d2 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
@@ -154,15 +154,6 @@ case class LogicalPlan(
                 .find(_.trim.nonEmpty)
                 .map(_.trim)
                 .getOrElse(throw new RuntimeException("No input file name"))
-            case datasetSelector: DatasetSelectorSourceOpDesc =>
-              DatasetSelectorSourceOpExec
-                .listFileNames(datasetSelector.datasetVersionPath)
-                .headOption
-                .getOrElse(
-                  throw new RuntimeException(
-                    "Selected dataset version does not contain any files."
-                  )
-                )
             case _ =>
               throw new RuntimeException(
                 "Unsupported upstream operator for CSV File Scan From Input 
column inference."
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
index 61a061b5c0..e99be7aa0d 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
@@ -25,11 +25,11 @@ import 
org.apache.texera.amber.core.executor.OpExecWithClassName
 import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
 import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
 import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, 
SchemaPropagationFunc}
+import org.apache.texera.amber.operator.LogicalOp
 import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
-import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
-class DatasetSelectorSourceOpDesc extends SourceOperatorDescriptor {
+class DatasetSelectorSourceOpDesc extends LogicalOp {
 
   @JsonProperty(required = true)
   @JsonSchemaTitle("Dataset Version")
@@ -52,11 +52,9 @@ class DatasetSelectorSourceOpDesc extends 
SourceOperatorDescriptor {
       .withInputPorts(operatorInfo.inputPorts)
       .withOutputPorts(operatorInfo.outputPorts)
       .withPropagateSchema(
-        SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> 
sourceSchema()))
+        SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> 
Schema().add("filename", AttributeType.STRING)))
       )
 
-  override def sourceSchema(): Schema = Schema().add("filename", 
AttributeType.STRING)
-
   override def operatorInfo: OperatorInfo =
     OperatorInfo(
       userFriendlyName = "Dataset Selector",
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
index 6cb426f3a2..51da8ffd0b 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
@@ -19,73 +19,37 @@
 
 package org.apache.texera.amber.operator.source.dataset
 
-import io.lakefs.clients.sdk.model.ObjectStats
 import org.apache.texera.amber.core.executor.SourceOperatorExecutor
 import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
 import org.apache.texera.amber.core.tuple.TupleLike
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.apache.texera.dao.SqlServer
-import org.apache.texera.dao.SqlServer.withTransaction
 import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET
 import 
org.apache.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION
 import org.apache.texera.dao.jooq.generated.tables.User.USER
-import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset, 
DatasetVersion}
 
 class DatasetSelectorSourceOpExec private[dataset] (descString: String) 
extends SourceOperatorExecutor {
   private val desc: DatasetSelectorSourceOpDesc =
     objectMapper.readValue(descString, classOf[DatasetSelectorSourceOpDesc])
 
   override def produceTuple(): Iterator[TupleLike] = {
-    DatasetSelectorSourceOpExec
-      .listFileNames(desc.datasetVersionPath)
-      .iterator
-      .map(fileName => TupleLike("filename" -> fileName))
-  }
-}
-
-object DatasetSelectorSourceOpExec {
-
-
-  private def isRealFile(obj: ObjectStats): Boolean = {
-    val path = Option(obj.getPath).getOrElse("").trim
-    path.nonEmpty && !path.endsWith("/")
-  }
-
-  def listFileNames(datasetVersionPath: String): Seq[String] = {
-    val Array(ownerEmail, datasetName, versionName) = 
datasetVersionPath.trim.split("/")
-    val (dataset, datasetVersion) = resolveDatasetVersion(ownerEmail, 
datasetName, versionName)
-    val versionPrefix = s"/$ownerEmail/$datasetName/$versionName"
-    LakeFSStorageClient
-      .retrieveObjectsOfVersion(dataset.getRepositoryName, 
datasetVersion.getVersionHash)
-      .iterator
-      .filter(isRealFile)
-      .toSeq
-      .sortBy(_.getPath)
-      .map(obj => s"$versionPrefix/${obj.getPath}")
-  }
+    val Seq(_, ownerEmail, datasetName, versionName) =
+      desc.datasetVersionPath.split("/").toSeq
 
-  private def resolveDatasetVersion(
-      ownerEmail: String,
-      datasetName: String,
-      versionName: String
-  ): (Dataset, DatasetVersion) =
-    withTransaction(SqlServer.getInstance().createDSLContext()) { ctx =>
-      val dataset = ctx
-        .select(DATASET.fields: _*)
+    val (repositoryName, versionHash) =
+      SqlServer.getInstance().createDSLContext()
+        .select(DATASET.REPOSITORY_NAME, DATASET_VERSION.VERSION_HASH)
         .from(DATASET)
-        .leftJoin(USER)
-        .on(USER.UID.eq(DATASET.OWNER_UID))
+        .join(USER).on(USER.UID.eq(DATASET.OWNER_UID))
+        .join(DATASET_VERSION).on(DATASET_VERSION.DID.eq(DATASET.DID))
         .where(USER.EMAIL.eq(ownerEmail))
         .and(DATASET.NAME.eq(datasetName))
-        .fetchOneInto(classOf[Dataset])
-
-
-      val datasetVersion = ctx
-        .selectFrom(DATASET_VERSION)
-        .where(DATASET_VERSION.DID.eq(dataset.getDid))
         .and(DATASET_VERSION.NAME.eq(versionName))
-        .fetchOneInto(classOf[DatasetVersion])
+        .fetchOne(r => (r.value1(), r.value2()))
 
-      (dataset, datasetVersion)
-    }
+    LakeFSStorageClient
+      .retrieveObjectsOfVersion(repositoryName, versionHash)
+      .map(obj => TupleLike("filename" -> 
s"${desc.datasetVersionPath}/${obj.getPath}"))
+      .iterator
+  }
 }

Reply via email to