This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-dataset-selector
in repository https://gitbox.apache.org/repos/asf/texera.git

commit dc7909f55f9d2dc7a2758f04712b873985bd5d28
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Apr 10 16:36:37 2026 -0700

    feat(dataset-selector): add dataset selector operator and property UI
---
 .../apache/texera/amber/operator/LogicalOp.scala   |   2 +
 .../dataset/DatasetSelectorSourceOpDesc.scala      |  66 ++++++++++++++++
 .../dataset/DatasetSelectorSourceOpExec.scala      |  54 ++++++++++++++
 frontend/src/app/app.module.ts                     |   2 +
 frontend/src/app/common/formly/formly-config.ts    |   2 +
 .../dataset-version-selector.component.html        |  45 +++++++++++
 .../dataset-version-selector.component.ts          |  83 +++++++++++++++++++++
 .../operator-property-edit-frame.component.ts      |   4 +
 .../src/assets/operator_images/DatasetSelector.png | Bin 0 -> 22499 bytes
 9 files changed, 258 insertions(+)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 931596b1bf..7ccbb073d6 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -75,6 +75,7 @@ import 
org.apache.texera.amber.operator.source.apis.twitter.v2.{
   TwitterFullArchiveSearchSourceOpDesc,
   TwitterSearchSourceOpDesc
 }
+import 
org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpDesc
 import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
 import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc
@@ -158,6 +159,7 @@ trait StateTransferFunc
     new Type(value = classOf[IfOpDesc], name = "If"),
     new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"),
     new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"),
+    new Type(value = classOf[DatasetSelectorSourceOpDesc], name = 
"DatasetSelector"),
     new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"),
     // disabled the ParallelCSVScanSourceOpDesc so that it does not confuse 
user. it can be re-enabled when doing experiments.
     // new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = 
"ParallelCSVFileScan"),
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
new file mode 100644
index 0000000000..c0aa8a1d04
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.dataset
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, 
SchemaPropagationFunc}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+class DatasetSelectorSourceOpDesc extends LogicalOp {
+
+  @JsonProperty(required = true)
+  @JsonSchemaTitle("Dataset")
+  var datasetVersionPath: String = _
+
+  override def getPhysicalOp(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity
+  ): PhysicalOp =
+    PhysicalOp
+      .sourcePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName(
+          
"org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpExec",
+          objectMapper.writeValueAsString(this)
+        )
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withPropagateSchema(
+        SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> 
Schema().add("filename", AttributeType.STRING)))
+      )
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      userFriendlyName = "Dataset Selector",
+      operatorDescription = "Select a dataset version and output one filename 
tuple per file",
+      operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+      inputPorts = List.empty,
+      outputPorts = List(OutputPort())
+    )
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
new file mode 100644
index 0000000000..eee7bc4051
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.dataset
+
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
+import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET
+import 
org.apache.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION
+import org.apache.texera.dao.jooq.generated.tables.User.USER
+
+class DatasetSelectorSourceOpExec private[dataset] (descString: String) 
extends SourceOperatorExecutor {
+  private val desc: DatasetSelectorSourceOpDesc = 
objectMapper.readValue(descString, classOf[DatasetSelectorSourceOpDesc])
+
+  override def produceTuple(): Iterator[TupleLike] = {
+    val Seq(_, ownerEmail, datasetName, versionName) =
+      desc.datasetVersionPath.split("/").toSeq
+
+    val (repositoryName, versionHash) =
+      SqlServer.getInstance().createDSLContext()
+        .select(DATASET.REPOSITORY_NAME, DATASET_VERSION.VERSION_HASH)
+        .from(DATASET)
+        .join(USER).on(USER.UID.eq(DATASET.OWNER_UID))
+        .join(DATASET_VERSION).on(DATASET_VERSION.DID.eq(DATASET.DID))
+        .where(USER.EMAIL.eq(ownerEmail))
+        .and(DATASET.NAME.eq(datasetName))
+        .and(DATASET_VERSION.NAME.eq(versionName))
+        .fetchOne(r => (r.value1(), r.value2()))
+
+    LakeFSStorageClient
+      .retrieveObjectsOfVersion(repositoryName, versionHash)
+      .map(obj => TupleLike("filename" -> 
s"${desc.datasetVersionPath}/${obj.getPath}"))
+      .iterator
+  }
+}
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index 6a92d4be2e..d3e3339f8f 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -104,6 +104,7 @@ import { AgentPanelComponent } from 
"./workspace/component/agent-panel/agent-pan
 import { AgentChatComponent } from 
"./workspace/component/agent-panel/agent-chat/agent-chat.component";
 import { AgentRegistrationComponent } from 
"./workspace/component/agent-panel/agent-registration/agent-registration.component";
 import { InputAutoCompleteComponent } from 
"./workspace/component/input-autocomplete/input-autocomplete.component";
+import { DatasetVersionSelectorComponent } from 
"./workspace/component/dataset-version-selector/dataset-version-selector.component";
 import { CollabWrapperComponent } from 
"./common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component";
 import { TexeraCopilot } from "./workspace/service/copilot/texera-copilot";
 import { NzSwitchModule } from "ng-zorro-antd/switch";
@@ -257,6 +258,7 @@ registerLocaleData(en);
     AgentChatComponent,
     AgentRegistrationComponent,
     InputAutoCompleteComponent,
+    DatasetVersionSelectorComponent,
     FileSelectionComponent,
     CollabWrapperComponent,
     AboutComponent,
diff --git a/frontend/src/app/common/formly/formly-config.ts 
b/frontend/src/app/common/formly/formly-config.ts
index d950bd3690..8c814a5993 100644
--- a/frontend/src/app/common/formly/formly-config.ts
+++ b/frontend/src/app/common/formly/formly-config.ts
@@ -27,6 +27,7 @@ import { PresetWrapperComponent } from 
"./preset-wrapper/preset-wrapper.componen
 import { InputAutoCompleteComponent } from 
"../../workspace/component/input-autocomplete/input-autocomplete.component";
 import { CollabWrapperComponent } from 
"./collab-wrapper/collab-wrapper/collab-wrapper.component";
 import { FormlyRepeatDndComponent } from "./repeat-dnd/repeat-dnd.component";
+import { DatasetVersionSelectorComponent } from 
"../../workspace/component/dataset-version-selector/dataset-version-selector.component";
 
 /**
  * Configuration for using Json Schema with Formly.
@@ -77,6 +78,7 @@ export const TEXERA_FORMLY_CONFIG = {
     { name: "multischema", component: MultiSchemaTypeComponent },
     { name: "codearea", component: CodeareaCustomTemplateComponent },
     { name: "inputautocomplete", component: InputAutoCompleteComponent, 
wrappers: ["form-field"] },
+    { name: "datasetversionselector", component: 
DatasetVersionSelectorComponent, wrappers: ["form-field"] },
     { name: "repeat-section-dnd", component: FormlyRepeatDndComponent },
   ],
   wrappers: [
diff --git 
a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
new file mode 100644
index 0000000000..1a2ff4889b
--- /dev/null
+++ 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
@@ -0,0 +1,45 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<nz-select
+  nzShowSearch
+  nzAllowClear
+  nzPlaceHolder="dataset"
+  [ngModel]="selectedDataset"
+  (ngModelChange)="selectedDataset = $event; onDatasetChange()">
+  <nz-option
+    *ngFor="let dataset of datasets"
+    [nzValue]="dataset"
+    [nzLabel]="dataset.dataset.name">
+  </nz-option>
+</nz-select>
+
+<nz-select
+  nzShowSearch
+  nzAllowClear
+  nzPlaceHolder="version"
+  [ngModel]="selectedVersion"
+  (ngModelChange)="selectedVersion = $event; onVersionChange()"
+  [disabled]="!selectedDataset">
+  <nz-option
+    *ngFor="let version of datasetVersions"
+    [nzValue]="version"
+    [nzLabel]="version.name">
+  </nz-option>
+</nz-select>
diff --git 
a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
new file mode 100644
index 0000000000..eb2b2b4338
--- /dev/null
+++ 
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import {ChangeDetectorRef, Component, OnInit} from "@angular/core";
+import { FieldType, FieldTypeConfig } from "@ngx-formly/core";
+import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
+import { DashboardDataset } from 
"../../../dashboard/type/dashboard-dataset.interface";
+import { DatasetVersion } from "../../../common/type/dataset";
+import { DatasetService } from 
"../../../dashboard/service/user/dataset/dataset.service";
+
+@UntilDestroy()
+@Component({
+  selector: "texera-dataset-version-selector-template",
+  templateUrl: "./dataset-version-selector.component.html",
+})
+export class DatasetVersionSelectorComponent extends 
FieldType<FieldTypeConfig> implements OnInit {
+  datasets: ReadonlyArray<DashboardDataset> = [];
+  datasetVersions: ReadonlyArray<DatasetVersion> = [];
+  selectedDataset?: DashboardDataset;
+  selectedVersion?: DatasetVersion;
+
+  constructor(private datasetService: DatasetService, private 
changeDetectorRef: ChangeDetectorRef) {
+    super();
+  }
+
+  ngOnInit(): void {
+    this.datasetService
+      .retrieveAccessibleDatasets()
+      .pipe(untilDestroyed(this))
+      .subscribe(datasets => {
+        this.datasets = datasets;
+        const [_, ownerEmail, datasetName, versionName] = 
this.formControl.value.split("/")
+        if (versionName) {
+          this.selectedDataset = this.datasets.find(
+            dataset =>
+              dataset.ownerEmail === ownerEmail && dataset.dataset.name === 
datasetName
+          );
+          this.onDatasetChange()
+        }
+      });
+    }
+
+  onDatasetChange(): void {
+    if (this.selectedDataset) {
+      this.datasetService
+        .retrieveDatasetVersionList(this.selectedDataset.dataset.did!)
+        .pipe(untilDestroyed(this))
+        .subscribe(versions => {
+          this.datasetVersions = versions;
+          this.selectedVersion = versions[0];
+          this.onVersionChange();
+          this.changeDetectorRef.detectChanges();
+        });
+    } else {
+      this.selectedVersion = undefined;
+      this.onVersionChange();
+    }
+  }
+
+  onVersionChange(): void {
+    this.formControl.setValue(
+      this.selectedDataset && this.selectedVersion
+        ? 
`/${this.selectedDataset?.ownerEmail}/${this.selectedDataset?.dataset?.name}/${this.selectedVersion?.name}`
+        : null
+    );
+  }
+}
diff --git 
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
 
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
index 5d457e9050..a436d11142 100644
--- 
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
+++ 
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
@@ -453,6 +453,10 @@ export class OperatorPropertyEditFrameComponent implements 
OnInit, OnChanges, On
         mappedField.type = "inputautocomplete";
       }
 
+      if (mappedField.key == "datasetVersionPath") {
+        mappedField.type = "datasetversionselector";
+      }
+
       // if the title is python script (for Python UDF), then make this field 
a custom template 'codearea'
       if (mapSource?.description?.toLowerCase() === "input your code here") {
         if (mappedField.type) {
diff --git a/frontend/src/assets/operator_images/DatasetSelector.png 
b/frontend/src/assets/operator_images/DatasetSelector.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and 
b/frontend/src/assets/operator_images/DatasetSelector.png differ

Reply via email to