This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-dataset-selector in repository https://gitbox.apache.org/repos/asf/texera.git
commit dc7909f55f9d2dc7a2758f04712b873985bd5d28 Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Apr 10 16:36:37 2026 -0700 feat(dataset-selector): add dataset selector operator and property UI --- .../apache/texera/amber/operator/LogicalOp.scala | 2 + .../dataset/DatasetSelectorSourceOpDesc.scala | 66 ++++++++++++++++ .../dataset/DatasetSelectorSourceOpExec.scala | 54 ++++++++++++++ frontend/src/app/app.module.ts | 2 + frontend/src/app/common/formly/formly-config.ts | 2 + .../dataset-version-selector.component.html | 45 +++++++++++ .../dataset-version-selector.component.ts | 83 +++++++++++++++++++++ .../operator-property-edit-frame.component.ts | 4 + .../src/assets/operator_images/DatasetSelector.png | Bin 0 -> 22499 bytes 9 files changed, 258 insertions(+) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index 931596b1bf..7ccbb073d6 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -75,6 +75,7 @@ import org.apache.texera.amber.operator.source.apis.twitter.v2.{ TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc } +import org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpDesc import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc @@ -158,6 +159,7 @@ trait StateTransferFunc new Type(value = classOf[IfOpDesc], name = "If"), new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"), new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"), + new Type(value = classOf[DatasetSelectorSourceOpDesc], name = "DatasetSelector"), new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"), // disabled the ParallelCSVScanSourceOpDesc so that it does not confuse user. it can be re-enabled when doing experiments. // new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = "ParallelCSVFileScan"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala new file mode 100644 index 0000000000..c0aa8a1d04 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.dataset + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp, SchemaPropagationFunc} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class DatasetSelectorSourceOpDesc extends LogicalOp { + + @JsonProperty(required = true) + @JsonSchemaTitle("Dataset") + var datasetVersionPath: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = + PhysicalOp + .sourcePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withPropagateSchema( + SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> Schema().add("filename", AttributeType.STRING))) + ) + + override def operatorInfo: OperatorInfo = + OperatorInfo( + userFriendlyName = "Dataset Selector", + operatorDescription = "Select a dataset version and output one filename tuple per file", + operatorGroupName = OperatorGroupConstants.INPUT_GROUP, + inputPorts = List.empty, + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala new file mode 100644 index 0000000000..eee7bc4051 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.dataset + +import org.apache.texera.amber.core.executor.SourceOperatorExecutor +import org.apache.texera.amber.core.storage.util.LakeFSStorageClient +import org.apache.texera.amber.core.tuple.TupleLike +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET +import org.apache.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION +import org.apache.texera.dao.jooq.generated.tables.User.USER + +class DatasetSelectorSourceOpExec private[dataset] (descString: String) extends SourceOperatorExecutor { + private val desc: DatasetSelectorSourceOpDesc = objectMapper.readValue(descString, classOf[DatasetSelectorSourceOpDesc]) + + override def produceTuple(): Iterator[TupleLike] = { + val Seq(_, ownerEmail, datasetName, versionName) = + desc.datasetVersionPath.split("/").toSeq + + val (repositoryName, versionHash) = + SqlServer.getInstance().createDSLContext() + .select(DATASET.REPOSITORY_NAME, DATASET_VERSION.VERSION_HASH) + .from(DATASET) + .join(USER).on(USER.UID.eq(DATASET.OWNER_UID)) + .join(DATASET_VERSION).on(DATASET_VERSION.DID.eq(DATASET.DID)) + .where(USER.EMAIL.eq(ownerEmail)) + .and(DATASET.NAME.eq(datasetName)) + .and(DATASET_VERSION.NAME.eq(versionName)) + .fetchOne(r => (r.value1(), r.value2())) + + LakeFSStorageClient + .retrieveObjectsOfVersion(repositoryName, versionHash) + .map(obj => TupleLike("filename" -> s"${desc.datasetVersionPath}/${obj.getPath}")) + .iterator + } +} diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts index 6a92d4be2e..d3e3339f8f 100644 --- a/frontend/src/app/app.module.ts +++ b/frontend/src/app/app.module.ts @@ -104,6 +104,7 @@ import { AgentPanelComponent } from "./workspace/component/agent-panel/agent-pan import { AgentChatComponent } from "./workspace/component/agent-panel/agent-chat/agent-chat.component"; import { AgentRegistrationComponent } from "./workspace/component/agent-panel/agent-registration/agent-registration.component"; import { InputAutoCompleteComponent } from "./workspace/component/input-autocomplete/input-autocomplete.component"; +import { DatasetVersionSelectorComponent } from "./workspace/component/dataset-version-selector/dataset-version-selector.component"; import { CollabWrapperComponent } from "./common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component"; import { TexeraCopilot } from "./workspace/service/copilot/texera-copilot"; import { NzSwitchModule } from "ng-zorro-antd/switch"; @@ -257,6 +258,7 @@ registerLocaleData(en); AgentChatComponent, AgentRegistrationComponent, InputAutoCompleteComponent, + DatasetVersionSelectorComponent, FileSelectionComponent, CollabWrapperComponent, AboutComponent, diff --git a/frontend/src/app/common/formly/formly-config.ts b/frontend/src/app/common/formly/formly-config.ts index d950bd3690..8c814a5993 100644 --- a/frontend/src/app/common/formly/formly-config.ts +++ b/frontend/src/app/common/formly/formly-config.ts @@ -27,6 +27,7 @@ import { PresetWrapperComponent } from "./preset-wrapper/preset-wrapper.componen import { InputAutoCompleteComponent } from "../../workspace/component/input-autocomplete/input-autocomplete.component"; import { CollabWrapperComponent } from "./collab-wrapper/collab-wrapper/collab-wrapper.component"; import { FormlyRepeatDndComponent } from "./repeat-dnd/repeat-dnd.component"; +import { DatasetVersionSelectorComponent } from "../../workspace/component/dataset-version-selector/dataset-version-selector.component"; /** * Configuration for using Json Schema with Formly. @@ -77,6 +78,7 @@ export const TEXERA_FORMLY_CONFIG = { { name: "multischema", component: MultiSchemaTypeComponent }, { name: "codearea", component: CodeareaCustomTemplateComponent }, { name: "inputautocomplete", component: InputAutoCompleteComponent, wrappers: ["form-field"] }, + { name: "datasetversionselector", component: DatasetVersionSelectorComponent, wrappers: ["form-field"] }, { name: "repeat-section-dnd", component: FormlyRepeatDndComponent }, ], wrappers: [ diff --git a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html new file mode 100644 index 0000000000..1a2ff4889b --- /dev/null +++ b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html @@ -0,0 +1,45 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<nz-select + nzShowSearch + nzAllowClear + nzPlaceHolder="dataset" + [ngModel]="selectedDataset" + (ngModelChange)="selectedDataset = $event; onDatasetChange()"> + <nz-option + *ngFor="let dataset of datasets" + [nzValue]="dataset" + [nzLabel]="dataset.dataset.name"> + </nz-option> +</nz-select> + +<nz-select + nzShowSearch + nzAllowClear + nzPlaceHolder="version" + [ngModel]="selectedVersion" + (ngModelChange)="selectedVersion = $event; onVersionChange()" + [disabled]="!selectedDataset"> + <nz-option + *ngFor="let version of datasetVersions" + [nzValue]="version" + [nzLabel]="version.name"> + </nz-option> +</nz-select> diff --git a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts new file mode 100644 index 0000000000..eb2b2b4338 --- /dev/null +++ b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import {ChangeDetectorRef, Component, OnInit} from "@angular/core"; +import { FieldType, FieldTypeConfig } from "@ngx-formly/core"; +import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; +import { DashboardDataset } from "../../../dashboard/type/dashboard-dataset.interface"; +import { DatasetVersion } from "../../../common/type/dataset"; +import { DatasetService } from "../../../dashboard/service/user/dataset/dataset.service"; + +@UntilDestroy() +@Component({ + selector: "texera-dataset-version-selector-template", + templateUrl: "./dataset-version-selector.component.html", +}) +export class DatasetVersionSelectorComponent extends FieldType<FieldTypeConfig> implements OnInit { + datasets: ReadonlyArray<DashboardDataset> = []; + datasetVersions: ReadonlyArray<DatasetVersion> = []; + selectedDataset?: DashboardDataset; + selectedVersion?: DatasetVersion; + + constructor(private datasetService: DatasetService, private changeDetectorRef: ChangeDetectorRef) { + super(); + } + + ngOnInit(): void { + this.datasetService + .retrieveAccessibleDatasets() + .pipe(untilDestroyed(this)) + .subscribe(datasets => { + this.datasets = datasets; + const [_, ownerEmail, datasetName, versionName] = this.formControl.value.split("/") + if (versionName) { + this.selectedDataset = this.datasets.find( + dataset => + dataset.ownerEmail === ownerEmail && dataset.dataset.name === datasetName + ); + this.onDatasetChange() + } + }); + } + + onDatasetChange(): void { + if (this.selectedDataset) { + this.datasetService + .retrieveDatasetVersionList(this.selectedDataset.dataset.did!) + .pipe(untilDestroyed(this)) + .subscribe(versions => { + this.datasetVersions = versions; + this.selectedVersion = versions[0]; + this.onVersionChange(); + this.changeDetectorRef.detectChanges(); + }); + } else { + this.selectedVersion = undefined; + this.onVersionChange(); + } + } + + onVersionChange(): void { + this.formControl.setValue( + this.selectedDataset && this.selectedVersion + ? `/${this.selectedDataset?.ownerEmail}/${this.selectedDataset?.dataset?.name}/${this.selectedVersion?.name}` + : null + ); + } +} diff --git a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts index 5d457e9050..a436d11142 100644 --- a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts +++ b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts @@ -453,6 +453,10 @@ export class OperatorPropertyEditFrameComponent implements OnInit, OnChanges, On mappedField.type = "inputautocomplete"; } + if (mappedField.key == "datasetVersionPath") { + mappedField.type = "datasetversionselector"; + } + // if the title is python script (for Python UDF), then make this field a custom template 'codearea' if (mapSource?.description?.toLowerCase() === "input your code here") { if (mappedField.type) { diff --git a/frontend/src/assets/operator_images/DatasetSelector.png b/frontend/src/assets/operator_images/DatasetSelector.png new file mode 100644 index 0000000000..c570e230ac Binary files /dev/null and b/frontend/src/assets/operator_images/DatasetSelector.png differ
