mengw15 commented on code in PR #5258: URL: https://github.com/apache/texera/pull/5258#discussion_r3431303135
########## notebook-migration-service/build.sbt: ########## @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import scala.collection.Seq + +name := "notebook-migration-service" + + +enablePlugins(JavaAppPackaging) Review Comment: All **6** other dist-producing services in this repo (`access-control-service`, `config-service`, `file-service`, `workflow-compiling-service`, `computing-unit-managing-service`, and `amber`) include a `Universal / mappings := AddMetaInfLicenseFiles.distMappings(...)` block right after `enablePlugins(JavaAppPackaging)`. This service is the only one without it. The root `build.sbt` comment (lines 58-59) is explicit: "Dist-producing modules additionally override `Universal / mappings` in their own build.sbt (not here) — see `AddMetaInfLicenseFiles.distMappings`." Without this block, the `notebook-migration-service` dist zip won't have `LICENSE-binary`, `NOTICE-binary`, `DISCLAIMER`, and `licenses/` at its top level — an ASF binary-distribution compliance requirement for the incubating project. The `asfLicensingSettings` applied at root covers META-INF inside each JAR, but not the dist zip's top-level files. The common-library modules (`common/auth`, `common/config`, `common/dao`, etc.) correctly don't have this block — they don't ship dist zips — but this is a service, so it should. ########## build.sbt: ########## @@ -169,6 +169,16 @@ lazy val WorkflowExecutionService = (project in file("amber")) ) .configs(Test) .dependsOn(DAO % "test->test", Auth % "test->test") // test scope dependency +lazy val NotebookMigrationService = (project in file("notebook-migration-service")) + .dependsOn(Auth, Config, DAO) + .settings(asfLicensingSettings) + .settings( + dependencyOverrides ++= Seq( + // override it as io.dropwizard 4 require 2.16.1 or higher + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.17.0" Review Comment: This override hardcodes `jackson-module-scala = 2.17.0`, but the rest of the repo uses `val jacksonVersion = "2.18.6"` (defined at root `build.sbt:63`) via `% jacksonVersion`. Other services (`ConfigService`, `FileService`, `AccessControlService`, etc.) all reference `jacksonVersion`. Plus `notebook-migration-service/build.sbt:76` declares the local dep as `2.18.6` — but since the root `dependencyOverrides` wins, this service actually runs with 2.17.0 while the rest of the repo runs with 2.18.6. Probably a copy-paste; should be `% jacksonVersion` to match convention and avoid silent version skew across services. ########## notebook-migration-service/src/main/scala/org/apache/texera/service/resource/NotebookMigrationResource.scala: ########## @@ -0,0 +1,378 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.texera.service.resource + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.typesafe.scalalogging.LazyLogging +import jakarta.ws.rs._ +import jakarta.ws.rs.core._ +import org.apache.texera.dao.SqlServer +import org.jooq.JSONB +import org.apache.texera.dao.jooq.generated.tables.Notebook +import org.apache.texera.dao.jooq.generated.tables.WorkflowNotebookMapping +import java.net.{HttpURLConnection, URL} +import java.nio.charset.StandardCharsets +import scala.util.control.NonFatal +import org.apache.texera.common.config.StorageConfig + +object NotebookMigrationResource extends LazyLogging { + + private val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + + // Build an error response body via the mapper so the message is JSON-escaped; interpolating + // e.getMessage directly produces malformed JSON when it contains quotes, backslashes, or newlines. + private def errorJson(message: String): String = + mapper.writeValueAsString(mapper.createObjectNode().put("error", message)) + + private val jupyterUrl = StorageConfig.jupyterURL + private val jupyterToken = StorageConfig.jupyterToken + // The token is passed as a URL param so the browser iframe can authenticate when loading the notebook. + // jupyterIframeURL is process-global state. This is safe ONLY because each user runs their own pod + // (own notebook-migration-service JVM + own Jupyter) in the k8s deployment, so this singleton is + // effectively per-user. Do NOT deploy this service as a shared multi-user instance without adding + // per-user keying here, or one user's upload would overwrite another's iframe URL. + private var jupyterIframeURL = s"$jupyterUrl/notebooks/work/notebook.ipynb?token=$jupyterToken" + + private def isJupyterAvailable(jupyterUrl: String): Boolean = { + var conn: java.net.HttpURLConnection = null + try { + conn = new java.net.URL(s"$jupyterUrl/api") + .openConnection() + .asInstanceOf[java.net.HttpURLConnection] + + conn.setRequestMethod("GET") + conn.setConnectTimeout(2000) + conn.setReadTimeout(2000) + + val status = conn.getResponseCode + + status == 200 || status == 403 + } catch { + case _: Exception => false + } finally { + if (conn != null) conn.disconnect() + } + } + + // Returns the Jupyter iframe reference URL + def getJupyterIframeURL(): Response = { + if (!isJupyterAvailable(jupyterUrl)) { + return Response + .status(500) + .entity( + """ + { + "success": false, + "message": "Cannot connect to Jupyter server" + } + """ + ) + .build() + } + + Response + .ok( + s""" + { + "success": true, + "url": "$jupyterIframeURL" + } + """ + ) + .build() + } + + // Returns the URL of Jupyter + def getJupyterURL(): Response = { + if (!isJupyterAvailable(jupyterUrl)) { + return Response + .status(500) + .entity( + """ + { + "success": false, + "message": "Cannot connect to Jupyter server" + } + """ + ) + .build() + } + + Response + .ok( + s""" + { + "success": true, + "url": "$jupyterUrl" + } + """ + ) + .build() + } + + // Set the notebook in Jupyter + def setNotebook(body: String): Response = { + if (!isJupyterAvailable(jupyterUrl)) { + return Response + .status(500) + .entity( + """ + { + "success": false, + "message": "Cannot connect to Jupyter server" + } + """ + ) + .build() + } + + var conn: HttpURLConnection = null + try { + val json = mapper.readTree(body) + + val notebookName = json.get("notebookName").asText() + val notebookData = json.get("notebookData") + + // Construct Jupyter API URL + val apiUrl = s"$jupyterUrl/api/contents/work/$notebookName" + + val url = new URL(apiUrl) + conn = url.openConnection().asInstanceOf[HttpURLConnection] + + conn.setRequestMethod("PUT") + conn.setDoOutput(true) + conn.setRequestProperty("Content-Type", "application/json") + // The Jupyter Contents API requires authentication; send the configured token. + conn.setRequestProperty("Authorization", s"token $jupyterToken") + + val requestBody = + s""" + { + "type": "notebook", + "content": $notebookData + } + """ + + val os = conn.getOutputStream + os.write(requestBody.getBytes(StandardCharsets.UTF_8)) + os.flush() + os.close() + + val status = conn.getResponseCode + + if (status != 200 && status != 201) { + return Response + .status(500) + .entity( + s""" + { + "success": false, + "message": "Failed to upload notebook to Jupyter (status $status)" + } + """ + ) + .build() + } + + jupyterIframeURL = s"$jupyterUrl/notebooks/work/$notebookName?token=$jupyterToken" + + Response + .ok( + s""" + { + "success": true, + "message": "Notebook successfully sent to Jupyter." + } + """ + ) + .build() + + } catch { + case NonFatal(e) => + logger.error("Error sending notebook to Jupyter", e) + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(errorJson(e.getMessage)) + .build() + } finally { + if (conn != null) conn.disconnect() + } + } + + // Store notebook + mapping in database + def storeNotebookAndMapping(body: String): Response = { + try { + val json = mapper.readTree(body) + + val wid: java.lang.Integer = json.get("wid").asInt() + val vid: java.lang.Integer = json.get("vid").asInt() + val mappingNode = json.get("mapping") + val notebookNode = json.get("notebook") + + val dsl = SqlServer.getInstance().createDSLContext() + + val nid: java.lang.Integer = SqlServer.withTransaction(dsl) { ctx => + // Insert notebook + val notebookRecord = ctx + .insertInto(Notebook.NOTEBOOK) + .set(Notebook.NOTEBOOK.WID, wid) + .set(Notebook.NOTEBOOK.NOTEBOOK_, JSONB.valueOf(notebookNode.toString)) + .returning(Notebook.NOTEBOOK.NID) + .fetchOne() + + val nidInside: java.lang.Integer = notebookRecord.getValue(Notebook.NOTEBOOK.NID) + + // Insert workflow-notebook mapping + ctx + .insertInto(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING) + .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.WID, wid) + .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.VID, vid) + .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.NID, nidInside) + .set( + WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.MAPPING, + JSONB.valueOf(mappingNode.toString) + ) + .execute() Review Comment: The `notebook.wid` UNIQUE constraint from #5055 means at most one notebook row per workflow. But this method unconditionally INSERTs into both tables, so: - First call for (wid=X, vid=Y): both inserts succeed ✓ - Second call for (wid=X, vid=Z) — saving a mapping for the same workflow at a different version: the `notebook` INSERT violates the UNIQUE constraint → SQL exception → 500. But `workflow_notebook_mapping`'s PK is `(wid, vid, nid)`, which implies the schema is designed to hold one mapping row per workflow version. The current code can't actually produce more than one — only the first version per workflow can be stored. Is the intent (a) "one notebook per workflow, only ever stored once" or (b) "support multiple versions, each with its own mapping"? - If (a): better to detect the existing row and return an explicit "notebook already stored for workflow X" 4xx rather than letting the unique violation bubble up as a 500. - If (b): the notebook INSERT needs to be conditional (e.g., upsert with `ON CONFLICT (wid) DO NOTHING` then re-fetch the nid), and the mapping INSERT just appends a new (wid, vid, nid) row. The test suite doesn't currently exercise this — all tests do a single store-then-fetch. Adding a "store twice" test case would pin the intended behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
