This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-source-port by this 
push:
     new 90600f7da7 update
90600f7da7 is described below

commit 90600f7da7841c1de80bb0485db9abe1702077a8
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Apr 8 14:08:18 2026 -0700

    update
---
 .../org/apache/texera/workflow/LogicalPlan.scala   |  39 +++++--
 .../apache/texera/amber/operator/LogicalOp.scala   |   2 +
 .../dataset/DatasetSelectorSourceOpDesc.scala      |  68 +++++++++++
 .../dataset/DatasetSelectorSourceOpExec.scala      | 116 ++++++++++++++++++
 frontend/src/app/app.module.ts                     |   2 +
 frontend/src/app/common/formly/formly-config.ts    |   2 +
 .../dataset-version-selector.component.html        |  64 ++++++++++
 .../dataset-version-selector.component.scss        |   9 ++
 .../dataset-version-selector.component.ts          | 129 +++++++++++++++++++++
 .../operator-property-edit-frame.component.ts      |   4 +
 .../src/assets/operator_images/DatasetSelector.png | Bin 0 -> 22499 bytes
 .../texera/amber/compiler/model/LogicalPlan.scala  |  39 +++++--
 12 files changed, 458 insertions(+), 16 deletions(-)

diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala 
b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
index 58a0d0e793..057963a1f9 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
@@ -23,6 +23,10 @@ import com.typesafe.scalalogging.LazyLogging
 import org.apache.texera.amber.core.storage.FileResolver
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.source.dataset.{
+  DatasetSelectorSourceOpDesc,
+  DatasetSelectorSourceOpExec
+}
 import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
 import 
org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
@@ -131,20 +135,39 @@ case class LogicalPlan(
       case operator @ (csvOp: InputCSVScanSourceOpDesc)
           if csvOp.columns == null || csvOp.columns.isEmpty =>
         Try {
-          val upstreamTextInput = getUpstreamLinks(operator.operatorIdentifier)
+          val upstreamOperator = getUpstreamLinks(operator.operatorIdentifier)
             .flatMap(link => operators.find(_.operatorIdentifier == 
link.fromOpId))
-            .collectFirst { case textInput: TextInputSourceOpDesc => textInput 
}
+            .collectFirst {
+              case textInput: TextInputSourceOpDesc       => textInput
+              case datasetSelector: DatasetSelectorSourceOpDesc => 
datasetSelector
+            }
             .getOrElse(
               throw new RuntimeException(
-                "CSV File Scan From Input requires a literal Text Input 
filename to infer columns."
+                "CSV File Scan From Input requires either a literal Text Input 
filename or a Dataset Selector to infer columns."
               )
             )
 
-          val fileName = upstreamTextInput.textInput
-            .linesIterator
-            .find(_.trim.nonEmpty)
-            .map(_.trim)
-            .getOrElse(throw new RuntimeException("No input file name"))
+          val fileName = upstreamOperator match {
+            case textInput: TextInputSourceOpDesc =>
+              textInput.textInput
+                .linesIterator
+                .find(_.trim.nonEmpty)
+                .map(_.trim)
+                .getOrElse(throw new RuntimeException("No input file name"))
+            case datasetSelector: DatasetSelectorSourceOpDesc =>
+              DatasetSelectorSourceOpExec
+                .listFileNames(datasetSelector.datasetVersionPath)
+                .headOption
+                .getOrElse(
+                  throw new RuntimeException(
+                    "Selected dataset version does not contain any files."
+                  )
+                )
+            case _ =>
+              throw new RuntimeException(
+                "Unsupported upstream operator for CSV File Scan From Input 
column inference."
+              )
+          }
           csvOp.inferColumnsFromFileName(fileName)
         } match {
           case Success(_) =>
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index d3290f4cfd..060b8c59af 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -75,6 +75,7 @@ import 
org.apache.texera.amber.operator.source.apis.twitter.v2.{
   TwitterFullArchiveSearchSourceOpDesc,
   TwitterSearchSourceOpDesc
 }
+import 
org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpDesc
 import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
 import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
@@ -159,6 +160,7 @@ trait StateTransferFunc
     new Type(value = classOf[IfOpDesc], name = "If"),
     new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"),
     new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"),
+    new Type(value = classOf[DatasetSelectorSourceOpDesc], name = 
"DatasetSelector"),
     new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"),
     new Type(value = classOf[InputCSVScanSourceOpDesc], name = 
"InputCSVFileScan"),
     // disabled the ParallelCSVScanSourceOpDesc so that it does not confuse 
user. it can be re-enabled when doing experiments.
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
new file mode 100644
index 0000000000..61a061b5c0
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.dataset
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, 
SchemaPropagationFunc}
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+class DatasetSelectorSourceOpDesc extends SourceOperatorDescriptor {
+
+  @JsonProperty(required = true)
+  @JsonSchemaTitle("Dataset Version")
+  var datasetVersionPath: String = _
+
+  override def getPhysicalOp(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity
+  ): PhysicalOp =
+    PhysicalOp
+      .sourcePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName(
+          
"org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpExec",
+          objectMapper.writeValueAsString(this)
+        )
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withPropagateSchema(
+        SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> 
sourceSchema()))
+      )
+
+  override def sourceSchema(): Schema = Schema().add("filename", 
AttributeType.STRING)
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      userFriendlyName = "Dataset Selector",
+      operatorDescription = "Select a dataset version and output one filename 
tuple per file",
+      operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+      inputPorts = List.empty,
+      outputPorts = List(OutputPort())
+    )
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
new file mode 100644
index 0000000000..ac6515b376
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.dataset
+
+import io.lakefs.clients.sdk.model.ObjectStats
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
+import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.SqlServer.withTransaction
+import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET
+import 
org.apache.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION
+import org.apache.texera.dao.jooq.generated.tables.User.USER
+import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset, 
DatasetVersion}
+
+class DatasetSelectorSourceOpExec private[dataset] (descString: String) 
extends SourceOperatorExecutor {
+  private val desc: DatasetSelectorSourceOpDesc =
+    objectMapper.readValue(descString, classOf[DatasetSelectorSourceOpDesc])
+
+  override def produceTuple(): Iterator[TupleLike] = {
+    DatasetSelectorSourceOpExec
+      .listFileNames(desc.datasetVersionPath)
+      .iterator
+      .map(fileName => TupleLike("filename" -> fileName))
+  }
+}
+
+object DatasetSelectorSourceOpExec {
+  def parseDatasetVersionPath(path: String): (String, String, String) = {
+    val parts = Option(path)
+      .map(_.trim)
+      .getOrElse("")
+      .split("/")
+      .filter(_.nonEmpty)
+
+    if (parts.length != 3) {
+      throw new IllegalArgumentException(
+        "Dataset version path must be in the format 
/ownerEmail/datasetName/versionName."
+      )
+    }
+
+    (parts(0), parts(1), parts(2))
+  }
+
+  private def isRealFile(obj: ObjectStats): Boolean = {
+    val path = Option(obj.getPath).getOrElse("").trim
+    path.nonEmpty && !path.endsWith("/")
+  }
+
+  def listFileNames(datasetVersionPath: String): Seq[String] = {
+    val (ownerEmail, datasetName, versionName) = 
parseDatasetVersionPath(datasetVersionPath)
+    val (dataset, datasetVersion) = resolveDatasetVersion(ownerEmail, 
datasetName, versionName)
+    val versionPrefix = s"/$ownerEmail/$datasetName/$versionName"
+    LakeFSStorageClient
+      .retrieveObjectsOfVersion(dataset.getRepositoryName, 
datasetVersion.getVersionHash)
+      .iterator
+      .filter(isRealFile)
+      .toSeq
+      .sortBy(_.getPath)
+      .map(obj => s"$versionPrefix/${obj.getPath}")
+  }
+
+  private def resolveDatasetVersion(
+      ownerEmail: String,
+      datasetName: String,
+      versionName: String
+  ): (Dataset, DatasetVersion) =
+    withTransaction(SqlServer.getInstance().createDSLContext()) { ctx =>
+      val dataset = ctx
+        .select(DATASET.fields: _*)
+        .from(DATASET)
+        .leftJoin(USER)
+        .on(USER.UID.eq(DATASET.OWNER_UID))
+        .where(USER.EMAIL.eq(ownerEmail))
+        .and(DATASET.NAME.eq(datasetName))
+        .fetchOneInto(classOf[Dataset])
+
+      if (dataset == null) {
+        throw new IllegalArgumentException(
+          s"Dataset '$datasetName' owned by '$ownerEmail' was not found."
+        )
+      }
+
+      val datasetVersion = ctx
+        .selectFrom(DATASET_VERSION)
+        .where(DATASET_VERSION.DID.eq(dataset.getDid))
+        .and(DATASET_VERSION.NAME.eq(versionName))
+        .fetchOneInto(classOf[DatasetVersion])
+
+      if (datasetVersion == null) {
+        throw new IllegalArgumentException(
+          s"Dataset version '$versionName' for dataset '$datasetName' was not 
found."
+        )
+      }
+
+      (dataset, datasetVersion)
+    }
+}
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index 6a92d4be2e..d3e3339f8f 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -104,6 +104,7 @@ import { AgentPanelComponent } from 
"./workspace/component/agent-panel/agent-pan
 import { AgentChatComponent } from 
"./workspace/component/agent-panel/agent-chat/agent-chat.component";
 import { AgentRegistrationComponent } from 
"./workspace/component/agent-panel/agent-registration/agent-registration.component";
 import { InputAutoCompleteComponent } from 
"./workspace/component/input-autocomplete/input-autocomplete.component";
+import { DatasetVersionSelectorComponent } from 
"./workspace/component/dataset-version-selector/dataset-version-selector.component";
 import { CollabWrapperComponent } from 
"./common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component";
 import { TexeraCopilot } from "./workspace/service/copilot/texera-copilot";
 import { NzSwitchModule } from "ng-zorro-antd/switch";
@@ -257,6 +258,7 @@ registerLocaleData(en);
     AgentChatComponent,
     AgentRegistrationComponent,
     InputAutoCompleteComponent,
+    DatasetVersionSelectorComponent,
     FileSelectionComponent,
     CollabWrapperComponent,
     AboutComponent,
diff --git a/frontend/src/app/common/formly/formly-config.ts 
b/frontend/src/app/common/formly/formly-config.ts
index d950bd3690..8c814a5993 100644
--- a/frontend/src/app/common/formly/formly-config.ts
+++ b/frontend/src/app/common/formly/formly-config.ts
@@ -27,6 +27,7 @@ import { PresetWrapperComponent } from 
"./preset-wrapper/preset-wrapper.componen
 import { InputAutoCompleteComponent } from 
"../../workspace/component/input-autocomplete/input-autocomplete.component";
 import { CollabWrapperComponent } from 
"./collab-wrapper/collab-wrapper/collab-wrapper.component";
 import { FormlyRepeatDndComponent } from "./repeat-dnd/repeat-dnd.component";
+import { DatasetVersionSelectorComponent } from 
"../../workspace/component/dataset-version-selector/dataset-version-selector.component";
 
 /**
  * Configuration for using Json Schema with Formly.
@@ -77,6 +78,7 @@ export const TEXERA_FORMLY_CONFIG = {
     { name: "multischema", component: MultiSchemaTypeComponent },
     { name: "codearea", component: CodeareaCustomTemplateComponent },
     { name: "inputautocomplete", component: InputAutoCompleteComponent, 
wrappers: ["form-field"] },
+    { name: "datasetversionselector", component: 
DatasetVersionSelectorComponent, wrappers: ["form-field"] },
     { name: "repeat-section-dnd", component: FormlyRepeatDndComponent },
   ],
   wrappers: [
diff --git 
a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
new file mode 100644
index 0000000000..27aa4b2530
--- /dev/null
+++ 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
@@ -0,0 +1,64 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<div class="dataset-version-selector">
+  <nz-select
+    nzShowSearch
+    nzAllowClear
+    nzPlaceHolder="Select dataset"
+    [ngModel]="selectedDataset"
+    (ngModelChange)="selectedDataset = $event; onDatasetChange()"
+    [nzLoading]="isLoadingDatasets"
+    class="selector">
+    <nz-option
+      *ngFor="let dataset of datasets"
+      [nzValue]="dataset"
+      [nzLabel]="dataset.dataset.name">
+    </nz-option>
+  </nz-select>
+
+  <nz-select
+    nzShowSearch
+    nzAllowClear
+    nzPlaceHolder="Select version"
+    [ngModel]="selectedVersion"
+    (ngModelChange)="selectedVersion = $event; onVersionChange()"
+    [nzLoading]="isLoadingVersions"
+    [disabled]="!selectedDataset"
+    class="selector">
+    <nz-option
+      *ngFor="let version of datasetVersions"
+      [nzValue]="version"
+      [nzLabel]="version.name">
+    </nz-option>
+  </nz-select>
+
+  <input
+    nz-input
+    [readOnly]="true"
+    [value]="formControl.value || ''"
+    [formlyAttributes]="field" />
+</div>
+
+<div
+  class="alert alert-danger"
+  role="alert"
+  *ngIf="props.showError && formControl.errors">
+  <formly-validation-message [field]="field"></formly-validation-message>
+</div>
diff --git 
a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.scss
 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.scss
new file mode 100644
index 0000000000..67d32ed69a
--- /dev/null
+++ 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.scss
@@ -0,0 +1,9 @@
+.dataset-version-selector {
+  display: flex;
+  flex-direction: column;
+  gap: 8px;
+}
+
+.selector {
+  width: 100%;
+}
diff --git 
a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
new file mode 100644
index 0000000000..154d06b4cb
--- /dev/null
+++ 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Component, OnInit } from "@angular/core";
+import { FieldType, FieldTypeConfig } from "@ngx-formly/core";
+import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
+import { DashboardDataset } from 
"../../../dashboard/type/dashboard-dataset.interface";
+import { DatasetVersion } from "../../../common/type/dataset";
+import { DatasetService } from 
"../../../dashboard/service/user/dataset/dataset.service";
+
+@UntilDestroy()
+@Component({
+  selector: "texera-dataset-version-selector-template",
+  templateUrl: "./dataset-version-selector.component.html",
+  styleUrls: ["./dataset-version-selector.component.scss"],
+})
+export class DatasetVersionSelectorComponent extends 
FieldType<FieldTypeConfig> implements OnInit {
+  datasets: ReadonlyArray<DashboardDataset> = [];
+  datasetVersions: ReadonlyArray<DatasetVersion> = [];
+  selectedDataset?: DashboardDataset;
+  selectedVersion?: DatasetVersion;
+  isLoadingDatasets = false;
+  isLoadingVersions = false;
+
+  constructor(private datasetService: DatasetService) {
+    super();
+  }
+
+  ngOnInit(): void {
+    this.loadDatasets();
+  }
+
+  private loadDatasets(): void {
+    this.isLoadingDatasets = true;
+    this.datasetService
+      .retrieveAccessibleDatasets()
+      .pipe(untilDestroyed(this))
+      .subscribe(datasets => {
+        this.datasets = datasets;
+        this.isLoadingDatasets = false;
+        this.restoreSelectionFromValue();
+      });
+  }
+
+  private restoreSelectionFromValue(): void {
+    const parsed = this.parseDatasetVersionPath(this.formControl.value);
+    if (!parsed) {
+      return;
+    }
+
+    this.selectedDataset = this.datasets.find(
+      dataset =>
+        dataset.ownerEmail === parsed.ownerEmail && dataset.dataset.name === 
parsed.datasetName
+    );
+
+    if (this.selectedDataset?.dataset.did !== undefined) {
+      this.loadVersions(this.selectedDataset.dataset.did, parsed.versionName);
+    }
+  }
+
+  onDatasetChange(): void {
+    this.selectedVersion = undefined;
+    this.datasetVersions = [];
+    this.formControl.setValue(null);
+
+    if (this.selectedDataset?.dataset.did !== undefined) {
+      this.loadVersions(this.selectedDataset.dataset.did);
+    }
+  }
+
+  onVersionChange(): void {
+    if (!this.selectedDataset || !this.selectedVersion) {
+      this.formControl.setValue(null);
+      return;
+    }
+
+    this.formControl.setValue(
+      
`/${this.selectedDataset.ownerEmail}/${this.selectedDataset.dataset.name}/${this.selectedVersion.name}`
+    );
+  }
+
+  private loadVersions(did: number, versionNameToSelect?: string): void {
+    this.isLoadingVersions = true;
+    this.datasetService
+      .retrieveDatasetVersionList(did)
+      .pipe(untilDestroyed(this))
+      .subscribe(versions => {
+        this.datasetVersions = versions;
+        this.isLoadingVersions = false;
+        if (versionNameToSelect) {
+          this.selectedVersion = versions.find(version => version.name === 
versionNameToSelect);
+        } else if (versions.length > 0) {
+          this.selectedVersion = versions[0];
+        }
+        if (this.selectedVersion) {
+          this.onVersionChange();
+        }
+      });
+  }
+
+  private parseDatasetVersionPath(
+    path: string | null | undefined
+  ): { ownerEmail: string; datasetName: string; versionName: string } | 
undefined {
+    const parts = (path ?? "")
+      .split("/")
+      .filter(Boolean);
+    if (parts.length !== 3) {
+      return undefined;
+    }
+    const [ownerEmail, datasetName, versionName] = parts;
+    return { ownerEmail, datasetName, versionName };
+  }
+}
diff --git 
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
 
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
index 5d457e9050..a436d11142 100644
--- 
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
+++ 
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
@@ -453,6 +453,10 @@ export class OperatorPropertyEditFrameComponent implements 
OnInit, OnChanges, On
         mappedField.type = "inputautocomplete";
       }
 
+      if (mappedField.key == "datasetVersionPath") {
+        mappedField.type = "datasetversionselector";
+      }
+
       // if the title is python script (for Python UDF), then make this field 
a custom template 'codearea'
       if (mapSource?.description?.toLowerCase() === "input your code here") {
         if (mappedField.type) {
diff --git a/frontend/src/assets/operator_images/DatasetSelector.png 
b/frontend/src/assets/operator_images/DatasetSelector.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and 
b/frontend/src/assets/operator_images/DatasetSelector.png differ
diff --git 
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
 
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
index ca3522dd05..c56650460c 100644
--- 
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
+++ 
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
@@ -24,6 +24,10 @@ import org.apache.texera.amber.core.storage.FileResolver
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.PortIdentity
 import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.source.dataset.{
+  DatasetSelectorSourceOpDesc,
+  DatasetSelectorSourceOpExec
+}
 import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
 import 
org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
@@ -141,20 +145,39 @@ case class LogicalPlan(
       case operator @ (csvOp: InputCSVScanSourceOpDesc)
           if csvOp.columns == null || csvOp.columns.isEmpty =>
         Try {
-          val upstreamTextInput = getUpstreamLinks(operator.operatorIdentifier)
+          val upstreamOperator = getUpstreamLinks(operator.operatorIdentifier)
             .flatMap(link => operators.find(_.operatorIdentifier == 
link.fromOpId))
-            .collectFirst { case textInput: TextInputSourceOpDesc => textInput 
}
+            .collectFirst {
+              case textInput: TextInputSourceOpDesc       => textInput
+              case datasetSelector: DatasetSelectorSourceOpDesc => 
datasetSelector
+            }
             .getOrElse(
               throw new RuntimeException(
-                "CSV File Scan From Input requires a literal Text Input 
filename to infer columns."
+                "CSV File Scan From Input requires either a literal Text Input 
filename or a Dataset Selector to infer columns."
               )
             )
 
-          val fileName = upstreamTextInput.textInput
-            .linesIterator
-            .find(_.trim.nonEmpty)
-            .map(_.trim)
-            .getOrElse(throw new RuntimeException("No input file name"))
+          val fileName = upstreamOperator match {
+            case textInput: TextInputSourceOpDesc =>
+              textInput.textInput
+                .linesIterator
+                .find(_.trim.nonEmpty)
+                .map(_.trim)
+                .getOrElse(throw new RuntimeException("No input file name"))
+            case datasetSelector: DatasetSelectorSourceOpDesc =>
+              DatasetSelectorSourceOpExec
+                .listFileNames(datasetSelector.datasetVersionPath)
+                .headOption
+                .getOrElse(
+                  throw new RuntimeException(
+                    "Selected dataset version does not contain any files."
+                  )
+                )
+            case _ =>
+              throw new RuntimeException(
+                "Unsupported upstream operator for CSV File Scan From Input 
column inference."
+              )
+          }
           csvOp.inferColumnsFromFileName(fileName)
         } match {
           case Success(_) =>

Reply via email to