kunwp1 commented on code in PR #5069:
URL: https://github.com/apache/texera/pull/5069#discussion_r3260786595
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala:
##########
@@ -165,15 +165,47 @@ class PythonWorkflowWorker(
clientThreadExecutor.submit(pythonProxyClient)
}
+ private def choosePythonBin(): String = {
+ val fallback =
+ if (pythonENVPath.trim.isEmpty || pythonENVPath.trim == "Default")
"python3"
+ else pythonENVPath
+
+ val cuidOpt = workerConfig.cuid
+ .orElse(sys.env.get("TEXERA_CUID").flatMap(s =>
scala.util.Try(s.toInt).toOption))
+ val pveName = workerConfig.pveName.trim
+
+ if (cuidOpt.isEmpty || pveName.isEmpty) {
+ return fallback
+ }
+
+ val candidate = Paths.get(
+ "/tmp/texera-pve/venvs",
+ cuidOpt.get.toString,
+ pveName,
Review Comment:
Suggested by Claude:
A `pveName` like `../../../usr/local/bin` (or one that resolves through a
symlink) lets a workflow definition pin an arbitrary executable as the Python
interpreter, as long as a `pve/bin/python` file exists at the resolved
location.
UDF authors already have code-exec privileges on their own workflows, but
workflow JSON is imported/shared between users and this turns workflow import
into arbitrary local binary execution.
Validate before constructing the path, e.g. reject anything that doesn't
match `^[A-Za-z0-9._-]+$`, and verify the resolved path is still under
`VenvRoot` (`candidate.normalize().startsWith(VenvRoot)`).
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala:
##########
@@ -165,15 +165,47 @@ class PythonWorkflowWorker(
clientThreadExecutor.submit(pythonProxyClient)
}
+ private def choosePythonBin(): String = {
+ val fallback =
+ if (pythonENVPath.trim.isEmpty || pythonENVPath.trim == "Default")
"python3"
+ else pythonENVPath
+
+ val cuidOpt = workerConfig.cuid
+ .orElse(sys.env.get("TEXERA_CUID").flatMap(s =>
scala.util.Try(s.toInt).toOption))
Review Comment:
Why the `TEXERA_CUID` env-var fallback? It introduces a new env with no
other reader in the codebase. Can we just drop the fallback and if
`workerConfig.cuid` is `None`, just use fallback python?
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala:
##########
@@ -165,15 +165,47 @@ class PythonWorkflowWorker(
clientThreadExecutor.submit(pythonProxyClient)
}
+ private def choosePythonBin(): String = {
+ val fallback =
Review Comment:
Better to utilize `PythonUtils.scala` for the fallback. Also, if we use such
"Default" magic string, we need to prevent users for creating virtual
environments with the name "Default". Can we avoid using such magic string?
(e.g., add a checkbox in the property panel if the user wants to specify venv.
If user clicks it, then show the drop-down)
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala:
##########
@@ -165,15 +165,47 @@ class PythonWorkflowWorker(
clientThreadExecutor.submit(pythonProxyClient)
}
+ private def choosePythonBin(): String = {
+ val fallback =
+ if (pythonENVPath.trim.isEmpty || pythonENVPath.trim == "Default")
"python3"
+ else pythonENVPath
+
+ val cuidOpt = workerConfig.cuid
+ .orElse(sys.env.get("TEXERA_CUID").flatMap(s =>
scala.util.Try(s.toInt).toOption))
+ val pveName = workerConfig.pveName.trim
+
+ if (cuidOpt.isEmpty || pveName.isEmpty) {
+ return fallback
+ }
+
+ val candidate = Paths.get(
+ "/tmp/texera-pve/venvs",
Review Comment:
Hardcoded `/tmp/texera-pve/venvs` duplicates `PveManager.VenvRoot`.
`PveManager` already has a `private def pythonBinPath(cuid, pveName)` that
constructs exactly this path. Expose it (or a `getPythonBin(cuid, pveName):
Option[Path]` that does the exists/executable check too) and call it from here.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala:
##########
@@ -165,15 +165,47 @@ class PythonWorkflowWorker(
clientThreadExecutor.submit(pythonProxyClient)
}
+ private def choosePythonBin(): String = {
+ val fallback =
+ if (pythonENVPath.trim.isEmpty || pythonENVPath.trim == "Default")
"python3"
+ else pythonENVPath
+
+ val cuidOpt = workerConfig.cuid
+ .orElse(sys.env.get("TEXERA_CUID").flatMap(s =>
scala.util.Try(s.toInt).toOption))
+ val pveName = workerConfig.pveName.trim
+
+ if (cuidOpt.isEmpty || pveName.isEmpty) {
+ return fallback
Review Comment:
Remove explicit `return` and use a `if/else` if you want to use idiomatic
Scala:
```scala
if (cuidOpt.isEmpty || pveName.isEmpty) {
fallback
} else {
val candidate = Paths.get("/tmp/texera-pve/venvs", cuidOpt.get.toString,
pveName, "pve", "bin", "python")
if (Files.exists(candidate) && Files.isExecutable(candidate))
candidate.toString
else fallback
}
```
Not a correctness issue; drop if you don't care.
##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2.scala:
##########
@@ -140,6 +147,7 @@ class PythonUDFOpDescV2 extends LogicalOp {
.withPartitionRequirement(partitionRequirement)
.withIsOneToManyOp(true)
.withPropagateSchema(SchemaPropagationFunc(propagateSchema))
+ .withPveName(envName.trim)
Review Comment:
Two sentinels currently mean "no PVE": `""` (the `PhysicalOp.pveName`
default) and `"Default"` (the magic UI string). `choosePythonBin` handles both,
but it's easy to forget one. Normalize at the boundary here:
```scala
.withPveName(if (envName.trim == "Default") "" else envName.trim)
```
Same change applies in `DualInputPortsPythonUDFOpDescV2` and
`PythonUDFSourceOpDescV2`.
##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########
@@ -434,4 +434,26 @@ object PveManager {
List(s"[PVE][ERR] Failed to delete package for cuid=$cuid:
${e.getMessage}")
}
}
+
+ def getPveNames(cuid: Int): List[String] = {
+
+ val cuPath = Paths.get("/tmp/texera-pve/venvs").resolve(cuid.toString)
+
+ if (!Files.exists(cuPath) || !Files.isDirectory(cuPath)) {
+ return List()
+ }
+
+ val stream = Files.list(cuPath)
+
+ try {
+ stream
+ .iterator()
+ .asScala
+ .filter(path => Files.isDirectory(path))
+ .map(path => path.getFileName.toString)
+ .toList
+ } finally {
+ stream.close()
+ }
+ }
Review Comment:
This duplicates `getEnvironments(cuid)`. The existing method already returns
`pveName` (just along with `userPackages`). The frontend could `.map(p =>
p.pveName)`, and you'd avoid maintaining a second definition of "what counts as
an environment." If you do keep this, at least implement it as
`getEnvironments(cuid).map(_.pveName)` so the two methods don't drift. The
current implementation lists any directory under `cuid/` including unrelated
files, while `getEnvironments` doesn't validate venv structure either (checking
that `dir/pve/bin/python` exists).
##########
frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts:
##########
@@ -173,9 +177,31 @@ export class OperatorPropertyEditFrameComponent implements
OnInit, OnChanges, On
private changeDetectorRef: ChangeDetectorRef,
private workflowVersionService: WorkflowVersionService,
private workflowStatusSerivce: WorkflowStatusService,
- private config: GuiConfigService
+ private config: GuiConfigService,
+ private workflowPveService: WorkflowPveService,
+ private computingUnitStatusService: ComputingUnitStatusService
) {}
+ private patchPythonUdfEnvironmentSchema(schema: CustomJSONSchema7,
environments: string[]): CustomJSONSchema7 {
+ const patchedSchema = cloneDeep(schema);
+
+ if (patchedSchema.properties && typeof patchedSchema.properties !==
"boolean") {
+ const envOptions = ["Default", ...environments.filter(e => e !==
"Default")];
+
+ if (!patchedSchema.properties["envName"]) {
+ patchedSchema.properties["envName"] = { type: "string" };
+ }
Review Comment:
Dead branch. The backend always emits `envName` in the schema for these
three operator types. Either drop this fallback and let a missing property
throw or convert to an assertion.
##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala:
##########
@@ -113,4 +113,14 @@ class PveResource {
Response.ok(messages.asJava).build()
}
}
+
+ // --------------------------------------------------
+ // Get list of existing environments for a CU
+ // --------------------------------------------------
+ @GET
+ @Path("/environments")
+ @Produces(Array(MediaType.APPLICATION_JSON))
+ def getPveNames(@QueryParam("cuid") cuid: Int): java.util.List[String] = {
Review Comment:
Suggested by Claude:
`cuid: Int` compiles to the JVM primitive `int`, which can't be null. When
the request has no `cuid` (or it's blank), JAX-RS has no way to represent
"absent" and falls back to the primitive default `0`. So:
| Request | `cuid` value | Behavior |
|---|---|---|
| `GET /pve/environments?cuid=42` | `42` | Correct |
| `GET /pve/environments` *(missing)* | `0` | Looks up
`/tmp/texera-pve/venvs/0/` |
| `GET /pve/environments?cuid=` *(empty)* | `0` | Same |
**Why this matters in practice:**
1. CU ids are auto-incremented in the DB starting from 1, so `0` is
currently unused — but the moment anyone seeds CU `0` (test fixture, manual
SQL, dev reset), that user's PVE list leaks to any request missing the param.
2. The new frontend caller at `operator-property-edit-frame.component.ts`
already guards with `cuid !== undefined ? ... : of([])`. That guard exists in
*one* place. The next caller (a curl, a dev script, a future component) won't
have it, and the API silently does the wrong thing instead of returning 400 —
that's the kind of bug that gets noticed in prod, not review.
3. Returning `[]` for `cuid=0` is also ambiguous with "this CU exists but
has no PVEs" — a real 400 disambiguates client-side error handling.
**Fix — switch to the boxed type and reject null explicitly:**
```scala
@GET
@Path("/environments")
@Produces(Array(MediaType.APPLICATION_JSON))
def getPveNames(@QueryParam("cuid") cuid: java.lang.Integer): Response = {
if (cuid == null) {
Response.status(Response.Status.BAD_REQUEST)
.entity("cuid query parameter is required").build()
} else {
Response.ok(PveManager.getPveNames(cuid).asJava).build()
}
}
```
`java.lang.Integer` is a reference type, so JAX-RS passes `null` when the
param is absent and you can branch on it. (You can also use
`@DefaultValue("-1")` + an explicit `< 0` check, but boxed-null reads more
clearly as "required parameter".)
**Same bug already exists in `fetchPVEs` at line 64 of this file** — it also
takes `@QueryParam("cuid") cuid: Int` and would silently look up CU `0` on a
missing param. Not part of this diff, but worth fixing in the same commit since
both endpoints share the failure mode (and `getPveNames` was modeled on
`fetchPVEs`). If you fix both, consider extracting a small helper:
```scala
private def requireCuid(cuid: java.lang.Integer): Int = {
if (cuid == null) throw new BadRequestException("cuid query parameter is
required")
cuid
}
```
Then each endpoint reads `val id = requireCuid(cuid)` at the top.
##########
frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts:
##########
@@ -249,6 +275,45 @@ export class OperatorPropertyEditFrameComponent implements
OnInit, OnChanges, On
*/
this.formData = cloneDeep(operator.operatorProperties);
+ // Pre-fill the PVE selection to "Default" for freshly-dropped Python UDF
operators.
+ // AJV's useDefaults pass below runs against the unpatched schema, so
relying on the
+ // patched-schema default isn't enough; do it directly on formData here.
Existing
+ // operators that already have an envName keep their value.
+ const isPythonUdf =
+ this.currentOperatorSchema.operatorType === "PythonUDFV2" ||
+ this.currentOperatorSchema.operatorType === "DualInputPortsPythonUDFV2"
||
+ this.currentOperatorSchema.operatorType === "PythonUDFSourceV2";
+ if (isPythonUdf && !this.formData.envName) {
+ this.formData.envName = "Default";
+ }
+
+ const baseSchema = cloneDeep(this.currentOperatorSchema.jsonSchema);
+
+ if (isPythonUdf) {
+ this.computingUnitStatusService
+ .getSelectedComputingUnit()
+ .pipe(
+ take(1),
+ switchMap(unit => {
+ const cuid = unit?.computingUnit?.cuid;
+ return cuid !== undefined ?
this.workflowPveService.getPveNames(cuid) : of<string[]>([]);
+ }),
+ untilDestroyed(this)
+ )
+ .subscribe({
+ next: (environments: string[]) => {
+ const patchedSchema =
this.patchPythonUdfEnvironmentSchema(baseSchema, environments);
+ this.setFormlyFormBinding(patchedSchema);
+ },
+ error: (err: unknown) => {
+ const patchedSchema =
this.patchPythonUdfEnvironmentSchema(baseSchema, []);
+ this.setFormlyFormBinding(patchedSchema);
+ },
Review Comment:
The `err` is unused, and the user gets a dropdown with only `Default`
whether the request failed or genuinely returned `[]`. At minimum
`console.error(err)` or use `NotificationService` so users can tell their PVEs
aren't loading vs. don't exist.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]