This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-source-port by this
push:
new 90600f7da7 update
90600f7da7 is described below
commit 90600f7da7841c1de80bb0485db9abe1702077a8
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Apr 8 14:08:18 2026 -0700
update
---
.../org/apache/texera/workflow/LogicalPlan.scala | 39 +++++--
.../apache/texera/amber/operator/LogicalOp.scala | 2 +
.../dataset/DatasetSelectorSourceOpDesc.scala | 68 +++++++++++
.../dataset/DatasetSelectorSourceOpExec.scala | 116 ++++++++++++++++++
frontend/src/app/app.module.ts | 2 +
frontend/src/app/common/formly/formly-config.ts | 2 +
.../dataset-version-selector.component.html | 64 ++++++++++
.../dataset-version-selector.component.scss | 9 ++
.../dataset-version-selector.component.ts | 129 +++++++++++++++++++++
.../operator-property-edit-frame.component.ts | 4 +
.../src/assets/operator_images/DatasetSelector.png | Bin 0 -> 22499 bytes
.../texera/amber/compiler/model/LogicalPlan.scala | 39 +++++--
12 files changed, 458 insertions(+), 16 deletions(-)
diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
index 58a0d0e793..057963a1f9 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalPlan.scala
@@ -23,6 +23,10 @@ import com.typesafe.scalalogging.LazyLogging
import org.apache.texera.amber.core.storage.FileResolver
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.source.dataset.{
+ DatasetSelectorSourceOpDesc,
+ DatasetSelectorSourceOpExec
+}
import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
import
org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
@@ -131,20 +135,39 @@ case class LogicalPlan(
case operator @ (csvOp: InputCSVScanSourceOpDesc)
if csvOp.columns == null || csvOp.columns.isEmpty =>
Try {
- val upstreamTextInput = getUpstreamLinks(operator.operatorIdentifier)
+ val upstreamOperator = getUpstreamLinks(operator.operatorIdentifier)
.flatMap(link => operators.find(_.operatorIdentifier ==
link.fromOpId))
- .collectFirst { case textInput: TextInputSourceOpDesc => textInput
}
+ .collectFirst {
+ case textInput: TextInputSourceOpDesc => textInput
+ case datasetSelector: DatasetSelectorSourceOpDesc =>
datasetSelector
+ }
.getOrElse(
throw new RuntimeException(
- "CSV File Scan From Input requires a literal Text Input
filename to infer columns."
+ "CSV File Scan From Input requires either a literal Text Input
filename or a Dataset Selector to infer columns."
)
)
- val fileName = upstreamTextInput.textInput
- .linesIterator
- .find(_.trim.nonEmpty)
- .map(_.trim)
- .getOrElse(throw new RuntimeException("No input file name"))
+ val fileName = upstreamOperator match {
+ case textInput: TextInputSourceOpDesc =>
+ textInput.textInput
+ .linesIterator
+ .find(_.trim.nonEmpty)
+ .map(_.trim)
+ .getOrElse(throw new RuntimeException("No input file name"))
+ case datasetSelector: DatasetSelectorSourceOpDesc =>
+ DatasetSelectorSourceOpExec
+ .listFileNames(datasetSelector.datasetVersionPath)
+ .headOption
+ .getOrElse(
+ throw new RuntimeException(
+ "Selected dataset version does not contain any files."
+ )
+ )
+ case _ =>
+ throw new RuntimeException(
+ "Unsupported upstream operator for CSV File Scan From Input
column inference."
+ )
+ }
csvOp.inferColumnsFromFileName(fileName)
} match {
case Success(_) =>
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index d3290f4cfd..060b8c59af 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -75,6 +75,7 @@ import
org.apache.texera.amber.operator.source.apis.twitter.v2.{
TwitterFullArchiveSearchSourceOpDesc,
TwitterSearchSourceOpDesc
}
+import
org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpDesc
import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
@@ -159,6 +160,7 @@ trait StateTransferFunc
new Type(value = classOf[IfOpDesc], name = "If"),
new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"),
new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"),
+ new Type(value = classOf[DatasetSelectorSourceOpDesc], name =
"DatasetSelector"),
new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"),
new Type(value = classOf[InputCSVScanSourceOpDesc], name =
"InputCSVFileScan"),
// disabled the ParallelCSVScanSourceOpDesc so that it does not confuse
user. it can be re-enabled when doing experiments.
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
new file mode 100644
index 0000000000..61a061b5c0
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpDesc.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.dataset
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{OutputPort, PhysicalOp,
SchemaPropagationFunc}
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+class DatasetSelectorSourceOpDesc extends SourceOperatorDescriptor {
+
+ @JsonProperty(required = true)
+ @JsonSchemaTitle("Dataset Version")
+ var datasetVersionPath: String = _
+
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp =
+ PhysicalOp
+ .sourcePhysicalOp(
+ workflowId,
+ executionId,
+ operatorIdentifier,
+ OpExecWithClassName(
+
"org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpExec",
+ objectMapper.writeValueAsString(this)
+ )
+ )
+ .withInputPorts(operatorInfo.inputPorts)
+ .withOutputPorts(operatorInfo.outputPorts)
+ .withPropagateSchema(
+ SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id ->
sourceSchema()))
+ )
+
+ override def sourceSchema(): Schema = Schema().add("filename",
AttributeType.STRING)
+
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ userFriendlyName = "Dataset Selector",
+ operatorDescription = "Select a dataset version and output one filename
tuple per file",
+ operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+ inputPorts = List.empty,
+ outputPorts = List(OutputPort())
+ )
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
new file mode 100644
index 0000000000..ac6515b376
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/dataset/DatasetSelectorSourceOpExec.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.dataset
+
+import io.lakefs.clients.sdk.model.ObjectStats
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
+import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.SqlServer.withTransaction
+import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET
+import
org.apache.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION
+import org.apache.texera.dao.jooq.generated.tables.User.USER
+import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset,
DatasetVersion}
+
+class DatasetSelectorSourceOpExec private[dataset] (descString: String)
extends SourceOperatorExecutor {
+ private val desc: DatasetSelectorSourceOpDesc =
+ objectMapper.readValue(descString, classOf[DatasetSelectorSourceOpDesc])
+
+ override def produceTuple(): Iterator[TupleLike] = {
+ DatasetSelectorSourceOpExec
+ .listFileNames(desc.datasetVersionPath)
+ .iterator
+ .map(fileName => TupleLike("filename" -> fileName))
+ }
+}
+
+object DatasetSelectorSourceOpExec {
+ def parseDatasetVersionPath(path: String): (String, String, String) = {
+ val parts = Option(path)
+ .map(_.trim)
+ .getOrElse("")
+ .split("/")
+ .filter(_.nonEmpty)
+
+ if (parts.length != 3) {
+ throw new IllegalArgumentException(
+ "Dataset version path must be in the format
/ownerEmail/datasetName/versionName."
+ )
+ }
+
+ (parts(0), parts(1), parts(2))
+ }
+
+ private def isRealFile(obj: ObjectStats): Boolean = {
+ val path = Option(obj.getPath).getOrElse("").trim
+ path.nonEmpty && !path.endsWith("/")
+ }
+
+ def listFileNames(datasetVersionPath: String): Seq[String] = {
+ val (ownerEmail, datasetName, versionName) =
parseDatasetVersionPath(datasetVersionPath)
+ val (dataset, datasetVersion) = resolveDatasetVersion(ownerEmail,
datasetName, versionName)
+ val versionPrefix = s"/$ownerEmail/$datasetName/$versionName"
+ LakeFSStorageClient
+ .retrieveObjectsOfVersion(dataset.getRepositoryName,
datasetVersion.getVersionHash)
+ .iterator
+ .filter(isRealFile)
+ .toSeq
+ .sortBy(_.getPath)
+ .map(obj => s"$versionPrefix/${obj.getPath}")
+ }
+
+ private def resolveDatasetVersion(
+ ownerEmail: String,
+ datasetName: String,
+ versionName: String
+ ): (Dataset, DatasetVersion) =
+ withTransaction(SqlServer.getInstance().createDSLContext()) { ctx =>
+ val dataset = ctx
+ .select(DATASET.fields: _*)
+ .from(DATASET)
+ .leftJoin(USER)
+ .on(USER.UID.eq(DATASET.OWNER_UID))
+ .where(USER.EMAIL.eq(ownerEmail))
+ .and(DATASET.NAME.eq(datasetName))
+ .fetchOneInto(classOf[Dataset])
+
+ if (dataset == null) {
+ throw new IllegalArgumentException(
+ s"Dataset '$datasetName' owned by '$ownerEmail' was not found."
+ )
+ }
+
+ val datasetVersion = ctx
+ .selectFrom(DATASET_VERSION)
+ .where(DATASET_VERSION.DID.eq(dataset.getDid))
+ .and(DATASET_VERSION.NAME.eq(versionName))
+ .fetchOneInto(classOf[DatasetVersion])
+
+ if (datasetVersion == null) {
+ throw new IllegalArgumentException(
+ s"Dataset version '$versionName' for dataset '$datasetName' was not
found."
+ )
+ }
+
+ (dataset, datasetVersion)
+ }
+}
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index 6a92d4be2e..d3e3339f8f 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -104,6 +104,7 @@ import { AgentPanelComponent } from
"./workspace/component/agent-panel/agent-pan
import { AgentChatComponent } from
"./workspace/component/agent-panel/agent-chat/agent-chat.component";
import { AgentRegistrationComponent } from
"./workspace/component/agent-panel/agent-registration/agent-registration.component";
import { InputAutoCompleteComponent } from
"./workspace/component/input-autocomplete/input-autocomplete.component";
+import { DatasetVersionSelectorComponent } from
"./workspace/component/dataset-version-selector/dataset-version-selector.component";
import { CollabWrapperComponent } from
"./common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component";
import { TexeraCopilot } from "./workspace/service/copilot/texera-copilot";
import { NzSwitchModule } from "ng-zorro-antd/switch";
@@ -257,6 +258,7 @@ registerLocaleData(en);
AgentChatComponent,
AgentRegistrationComponent,
InputAutoCompleteComponent,
+ DatasetVersionSelectorComponent,
FileSelectionComponent,
CollabWrapperComponent,
AboutComponent,
diff --git a/frontend/src/app/common/formly/formly-config.ts
b/frontend/src/app/common/formly/formly-config.ts
index d950bd3690..8c814a5993 100644
--- a/frontend/src/app/common/formly/formly-config.ts
+++ b/frontend/src/app/common/formly/formly-config.ts
@@ -27,6 +27,7 @@ import { PresetWrapperComponent } from
"./preset-wrapper/preset-wrapper.componen
import { InputAutoCompleteComponent } from
"../../workspace/component/input-autocomplete/input-autocomplete.component";
import { CollabWrapperComponent } from
"./collab-wrapper/collab-wrapper/collab-wrapper.component";
import { FormlyRepeatDndComponent } from "./repeat-dnd/repeat-dnd.component";
+import { DatasetVersionSelectorComponent } from
"../../workspace/component/dataset-version-selector/dataset-version-selector.component";
/**
* Configuration for using Json Schema with Formly.
@@ -77,6 +78,7 @@ export const TEXERA_FORMLY_CONFIG = {
{ name: "multischema", component: MultiSchemaTypeComponent },
{ name: "codearea", component: CodeareaCustomTemplateComponent },
{ name: "inputautocomplete", component: InputAutoCompleteComponent,
wrappers: ["form-field"] },
+ { name: "datasetversionselector", component:
DatasetVersionSelectorComponent, wrappers: ["form-field"] },
{ name: "repeat-section-dnd", component: FormlyRepeatDndComponent },
],
wrappers: [
diff --git
a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
new file mode 100644
index 0000000000..27aa4b2530
--- /dev/null
+++
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.html
@@ -0,0 +1,64 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<div class="dataset-version-selector">
+ <nz-select
+ nzShowSearch
+ nzAllowClear
+ nzPlaceHolder="Select dataset"
+ [ngModel]="selectedDataset"
+ (ngModelChange)="selectedDataset = $event; onDatasetChange()"
+ [nzLoading]="isLoadingDatasets"
+ class="selector">
+ <nz-option
+ *ngFor="let dataset of datasets"
+ [nzValue]="dataset"
+ [nzLabel]="dataset.dataset.name">
+ </nz-option>
+ </nz-select>
+
+ <nz-select
+ nzShowSearch
+ nzAllowClear
+ nzPlaceHolder="Select version"
+ [ngModel]="selectedVersion"
+ (ngModelChange)="selectedVersion = $event; onVersionChange()"
+ [nzLoading]="isLoadingVersions"
+ [disabled]="!selectedDataset"
+ class="selector">
+ <nz-option
+ *ngFor="let version of datasetVersions"
+ [nzValue]="version"
+ [nzLabel]="version.name">
+ </nz-option>
+ </nz-select>
+
+ <input
+ nz-input
+ [readOnly]="true"
+ [value]="formControl.value || ''"
+ [formlyAttributes]="field" />
+</div>
+
+<div
+ class="alert alert-danger"
+ role="alert"
+ *ngIf="props.showError && formControl.errors">
+ <formly-validation-message [field]="field"></formly-validation-message>
+</div>
diff --git
a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.scss
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.scss
new file mode 100644
index 0000000000..67d32ed69a
--- /dev/null
+++
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.scss
@@ -0,0 +1,9 @@
+.dataset-version-selector {
+ display: flex;
+ flex-direction: column;
+ gap: 8px;
+}
+
+.selector {
+ width: 100%;
+}
diff --git
a/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
new file mode 100644
index 0000000000..154d06b4cb
--- /dev/null
+++
b/frontend/src/app/workspace/component/dataset-version-selector/dataset-version-selector.component.ts
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Component, OnInit } from "@angular/core";
+import { FieldType, FieldTypeConfig } from "@ngx-formly/core";
+import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
+import { DashboardDataset } from
"../../../dashboard/type/dashboard-dataset.interface";
+import { DatasetVersion } from "../../../common/type/dataset";
+import { DatasetService } from
"../../../dashboard/service/user/dataset/dataset.service";
+
+@UntilDestroy()
+@Component({
+ selector: "texera-dataset-version-selector-template",
+ templateUrl: "./dataset-version-selector.component.html",
+ styleUrls: ["./dataset-version-selector.component.scss"],
+})
+export class DatasetVersionSelectorComponent extends
FieldType<FieldTypeConfig> implements OnInit {
+ datasets: ReadonlyArray<DashboardDataset> = [];
+ datasetVersions: ReadonlyArray<DatasetVersion> = [];
+ selectedDataset?: DashboardDataset;
+ selectedVersion?: DatasetVersion;
+ isLoadingDatasets = false;
+ isLoadingVersions = false;
+
+ constructor(private datasetService: DatasetService) {
+ super();
+ }
+
+ ngOnInit(): void {
+ this.loadDatasets();
+ }
+
+ private loadDatasets(): void {
+ this.isLoadingDatasets = true;
+ this.datasetService
+ .retrieveAccessibleDatasets()
+ .pipe(untilDestroyed(this))
+ .subscribe(datasets => {
+ this.datasets = datasets;
+ this.isLoadingDatasets = false;
+ this.restoreSelectionFromValue();
+ });
+ }
+
+ private restoreSelectionFromValue(): void {
+ const parsed = this.parseDatasetVersionPath(this.formControl.value);
+ if (!parsed) {
+ return;
+ }
+
+ this.selectedDataset = this.datasets.find(
+ dataset =>
+ dataset.ownerEmail === parsed.ownerEmail && dataset.dataset.name ===
parsed.datasetName
+ );
+
+ if (this.selectedDataset?.dataset.did !== undefined) {
+ this.loadVersions(this.selectedDataset.dataset.did, parsed.versionName);
+ }
+ }
+
+ onDatasetChange(): void {
+ this.selectedVersion = undefined;
+ this.datasetVersions = [];
+ this.formControl.setValue(null);
+
+ if (this.selectedDataset?.dataset.did !== undefined) {
+ this.loadVersions(this.selectedDataset.dataset.did);
+ }
+ }
+
+ onVersionChange(): void {
+ if (!this.selectedDataset || !this.selectedVersion) {
+ this.formControl.setValue(null);
+ return;
+ }
+
+ this.formControl.setValue(
+
`/${this.selectedDataset.ownerEmail}/${this.selectedDataset.dataset.name}/${this.selectedVersion.name}`
+ );
+ }
+
+ private loadVersions(did: number, versionNameToSelect?: string): void {
+ this.isLoadingVersions = true;
+ this.datasetService
+ .retrieveDatasetVersionList(did)
+ .pipe(untilDestroyed(this))
+ .subscribe(versions => {
+ this.datasetVersions = versions;
+ this.isLoadingVersions = false;
+ if (versionNameToSelect) {
+ this.selectedVersion = versions.find(version => version.name ===
versionNameToSelect);
+ } else if (versions.length > 0) {
+ this.selectedVersion = versions[0];
+ }
+ if (this.selectedVersion) {
+ this.onVersionChange();
+ }
+ });
+ }
+
+ private parseDatasetVersionPath(
+ path: string | null | undefined
+ ): { ownerEmail: string; datasetName: string; versionName: string } |
undefined {
+ const parts = (path ?? "")
+ .split("/")
+ .filter(Boolean);
+ if (parts.length !== 3) {
+ return undefined;
+ }
+ const [ownerEmail, datasetName, versionName] = parts;
+ return { ownerEmail, datasetName, versionName };
+ }
+}
diff --git
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
index 5d457e9050..a436d11142 100644
---
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
+++
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
@@ -453,6 +453,10 @@ export class OperatorPropertyEditFrameComponent implements
OnInit, OnChanges, On
mappedField.type = "inputautocomplete";
}
+ if (mappedField.key == "datasetVersionPath") {
+ mappedField.type = "datasetversionselector";
+ }
+
// if the title is python script (for Python UDF), then make this field
a custom template 'codearea'
if (mapSource?.description?.toLowerCase() === "input your code here") {
if (mappedField.type) {
diff --git a/frontend/src/assets/operator_images/DatasetSelector.png
b/frontend/src/assets/operator_images/DatasetSelector.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and
b/frontend/src/assets/operator_images/DatasetSelector.png differ
diff --git
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
index ca3522dd05..c56650460c 100644
---
a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
+++
b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalPlan.scala
@@ -24,6 +24,10 @@ import org.apache.texera.amber.core.storage.FileResolver
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.source.dataset.{
+ DatasetSelectorSourceOpDesc,
+ DatasetSelectorSourceOpExec
+}
import org.apache.texera.amber.operator.source.scan.ScanSourceOpDesc
import
org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
@@ -141,20 +145,39 @@ case class LogicalPlan(
case operator @ (csvOp: InputCSVScanSourceOpDesc)
if csvOp.columns == null || csvOp.columns.isEmpty =>
Try {
- val upstreamTextInput = getUpstreamLinks(operator.operatorIdentifier)
+ val upstreamOperator = getUpstreamLinks(operator.operatorIdentifier)
.flatMap(link => operators.find(_.operatorIdentifier ==
link.fromOpId))
- .collectFirst { case textInput: TextInputSourceOpDesc => textInput
}
+ .collectFirst {
+ case textInput: TextInputSourceOpDesc => textInput
+ case datasetSelector: DatasetSelectorSourceOpDesc =>
datasetSelector
+ }
.getOrElse(
throw new RuntimeException(
- "CSV File Scan From Input requires a literal Text Input
filename to infer columns."
+ "CSV File Scan From Input requires either a literal Text Input
filename or a Dataset Selector to infer columns."
)
)
- val fileName = upstreamTextInput.textInput
- .linesIterator
- .find(_.trim.nonEmpty)
- .map(_.trim)
- .getOrElse(throw new RuntimeException("No input file name"))
+ val fileName = upstreamOperator match {
+ case textInput: TextInputSourceOpDesc =>
+ textInput.textInput
+ .linesIterator
+ .find(_.trim.nonEmpty)
+ .map(_.trim)
+ .getOrElse(throw new RuntimeException("No input file name"))
+ case datasetSelector: DatasetSelectorSourceOpDesc =>
+ DatasetSelectorSourceOpExec
+ .listFileNames(datasetSelector.datasetVersionPath)
+ .headOption
+ .getOrElse(
+ throw new RuntimeException(
+ "Selected dataset version does not contain any files."
+ )
+ )
+ case _ =>
+ throw new RuntimeException(
+ "Unsupported upstream operator for CSV File Scan From Input
column inference."
+ )
+ }
csvOp.inferColumnsFromFileName(fileName)
} match {
case Success(_) =>