mengw15 commented on code in PR #5258:
URL: https://github.com/apache/texera/pull/5258#discussion_r3431303135


##########
notebook-migration-service/build.sbt:
##########
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import scala.collection.Seq
+
+name := "notebook-migration-service"
+
+
+enablePlugins(JavaAppPackaging)

Review Comment:
   All **6** other dist-producing services in this repo 
(`access-control-service`, `config-service`, `file-service`, 
`workflow-compiling-service`, `computing-unit-managing-service`, and `amber`) 
include a `Universal / mappings := AddMetaInfLicenseFiles.distMappings(...)` 
block right after `enablePlugins(JavaAppPackaging)`. This service is the only 
one without it.
   
   The root `build.sbt` comment (lines 58-59) is explicit: "Dist-producing 
modules additionally override `Universal / mappings` in their own build.sbt 
(not here) — see `AddMetaInfLicenseFiles.distMappings`." Without this block, 
the `notebook-migration-service` dist zip won't have `LICENSE-binary`, 
`NOTICE-binary`, `DISCLAIMER`, and `licenses/` at its top level — an ASF 
binary-distribution compliance requirement for the incubating project. The 
`asfLicensingSettings` applied at root covers META-INF inside each JAR, but not 
the dist zip's top-level files. The common-library modules (`common/auth`, 
`common/config`, `common/dao`, etc.) correctly don't have this block — they 
don't ship dist zips — but this is a service, so it should.



##########
build.sbt:
##########
@@ -169,6 +169,16 @@ lazy val WorkflowExecutionService = (project in 
file("amber"))
   )
   .configs(Test)
   .dependsOn(DAO % "test->test", Auth % "test->test") // test scope dependency
+lazy val NotebookMigrationService = (project in 
file("notebook-migration-service"))
+  .dependsOn(Auth, Config, DAO)
+  .settings(asfLicensingSettings)
+  .settings(
+    dependencyOverrides ++= Seq(
+      // override it as io.dropwizard 4 require 2.16.1 or higher
+      "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.17.0"

Review Comment:
   This override hardcodes `jackson-module-scala = 2.17.0`, but the rest of the 
repo uses `val jacksonVersion = "2.18.6"` (defined at root `build.sbt:63`) via 
`% jacksonVersion`. Other services (`ConfigService`, `FileService`, 
`AccessControlService`, etc.) all reference `jacksonVersion`. Plus 
`notebook-migration-service/build.sbt:76` declares the local dep as `2.18.6` — 
but since the root `dependencyOverrides` wins, this service actually runs with 
2.17.0 while the rest of the repo runs with 2.18.6. Probably a copy-paste; 
should be `% jacksonVersion` to match convention and avoid silent version skew 
across services.



##########
notebook-migration-service/src/main/scala/org/apache/texera/service/resource/NotebookMigrationResource.scala:
##########
@@ -0,0 +1,378 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.texera.service.resource
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.typesafe.scalalogging.LazyLogging
+import jakarta.ws.rs._
+import jakarta.ws.rs.core._
+import org.apache.texera.dao.SqlServer
+import org.jooq.JSONB
+import org.apache.texera.dao.jooq.generated.tables.Notebook
+import org.apache.texera.dao.jooq.generated.tables.WorkflowNotebookMapping
+import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
+import scala.util.control.NonFatal
+import org.apache.texera.common.config.StorageConfig
+
+object NotebookMigrationResource extends LazyLogging {
+
+  private val mapper: ObjectMapper = new 
ObjectMapper().registerModule(DefaultScalaModule)
+
+  // Build an error response body via the mapper so the message is 
JSON-escaped; interpolating
+  // e.getMessage directly produces malformed JSON when it contains quotes, 
backslashes, or newlines.
+  private def errorJson(message: String): String =
+    mapper.writeValueAsString(mapper.createObjectNode().put("error", message))
+
+  private val jupyterUrl = StorageConfig.jupyterURL
+  private val jupyterToken = StorageConfig.jupyterToken
+  // The token is passed as a URL param so the browser iframe can authenticate 
when loading the notebook.
+  // jupyterIframeURL is process-global state. This is safe ONLY because each 
user runs their own pod
+  // (own notebook-migration-service JVM + own Jupyter) in the k8s deployment, 
so this singleton is
+  // effectively per-user. Do NOT deploy this service as a shared multi-user 
instance without adding
+  // per-user keying here, or one user's upload would overwrite another's 
iframe URL.
+  private var jupyterIframeURL = 
s"$jupyterUrl/notebooks/work/notebook.ipynb?token=$jupyterToken"
+
+  private def isJupyterAvailable(jupyterUrl: String): Boolean = {
+    var conn: java.net.HttpURLConnection = null
+    try {
+      conn = new java.net.URL(s"$jupyterUrl/api")
+        .openConnection()
+        .asInstanceOf[java.net.HttpURLConnection]
+
+      conn.setRequestMethod("GET")
+      conn.setConnectTimeout(2000)
+      conn.setReadTimeout(2000)
+
+      val status = conn.getResponseCode
+
+      status == 200 || status == 403
+    } catch {
+      case _: Exception => false
+    } finally {
+      if (conn != null) conn.disconnect()
+    }
+  }
+
+  // Returns the Jupyter iframe reference URL
+  def getJupyterIframeURL(): Response = {
+    if (!isJupyterAvailable(jupyterUrl)) {
+      return Response
+        .status(500)
+        .entity(
+          """
+      {
+        "success": false,
+        "message": "Cannot connect to Jupyter server"
+      }
+      """
+        )
+        .build()
+    }
+
+    Response
+      .ok(
+        s"""
+    {
+      "success": true,
+      "url": "$jupyterIframeURL"
+    }
+    """
+      )
+      .build()
+  }
+
+  // Returns the URL of Jupyter
+  def getJupyterURL(): Response = {
+    if (!isJupyterAvailable(jupyterUrl)) {
+      return Response
+        .status(500)
+        .entity(
+          """
+      {
+        "success": false,
+        "message": "Cannot connect to Jupyter server"
+      }
+      """
+        )
+        .build()
+    }
+
+    Response
+      .ok(
+        s"""
+    {
+      "success": true,
+      "url": "$jupyterUrl"
+    }
+    """
+      )
+      .build()
+  }
+
+  // Set the notebook in Jupyter
+  def setNotebook(body: String): Response = {
+    if (!isJupyterAvailable(jupyterUrl)) {
+      return Response
+        .status(500)
+        .entity(
+          """
+      {
+        "success": false,
+        "message": "Cannot connect to Jupyter server"
+      }
+      """
+        )
+        .build()
+    }
+
+    var conn: HttpURLConnection = null
+    try {
+      val json = mapper.readTree(body)
+
+      val notebookName = json.get("notebookName").asText()
+      val notebookData = json.get("notebookData")
+
+      // Construct Jupyter API URL
+      val apiUrl = s"$jupyterUrl/api/contents/work/$notebookName"
+
+      val url = new URL(apiUrl)
+      conn = url.openConnection().asInstanceOf[HttpURLConnection]
+
+      conn.setRequestMethod("PUT")
+      conn.setDoOutput(true)
+      conn.setRequestProperty("Content-Type", "application/json")
+      // The Jupyter Contents API requires authentication; send the configured 
token.
+      conn.setRequestProperty("Authorization", s"token $jupyterToken")
+
+      val requestBody =
+        s"""
+      {
+        "type": "notebook",
+        "content": $notebookData
+      }
+      """
+
+      val os = conn.getOutputStream
+      os.write(requestBody.getBytes(StandardCharsets.UTF_8))
+      os.flush()
+      os.close()
+
+      val status = conn.getResponseCode
+
+      if (status != 200 && status != 201) {
+        return Response
+          .status(500)
+          .entity(
+            s"""
+        {
+          "success": false,
+          "message": "Failed to upload notebook to Jupyter (status $status)"
+        }
+        """
+          )
+          .build()
+      }
+
+      jupyterIframeURL = 
s"$jupyterUrl/notebooks/work/$notebookName?token=$jupyterToken"
+
+      Response
+        .ok(
+          s"""
+      {
+        "success": true,
+        "message": "Notebook successfully sent to Jupyter."
+      }
+      """
+        )
+        .build()
+
+    } catch {
+      case NonFatal(e) =>
+        logger.error("Error sending notebook to Jupyter", e)
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(errorJson(e.getMessage))
+          .build()
+    } finally {
+      if (conn != null) conn.disconnect()
+    }
+  }
+
+  // Store notebook + mapping in database
+  def storeNotebookAndMapping(body: String): Response = {
+    try {
+      val json = mapper.readTree(body)
+
+      val wid: java.lang.Integer = json.get("wid").asInt()
+      val vid: java.lang.Integer = json.get("vid").asInt()
+      val mappingNode = json.get("mapping")
+      val notebookNode = json.get("notebook")
+
+      val dsl = SqlServer.getInstance().createDSLContext()
+
+      val nid: java.lang.Integer = SqlServer.withTransaction(dsl) { ctx =>
+        // Insert notebook
+        val notebookRecord = ctx
+          .insertInto(Notebook.NOTEBOOK)
+          .set(Notebook.NOTEBOOK.WID, wid)
+          .set(Notebook.NOTEBOOK.NOTEBOOK_, 
JSONB.valueOf(notebookNode.toString))
+          .returning(Notebook.NOTEBOOK.NID)
+          .fetchOne()
+
+        val nidInside: java.lang.Integer = 
notebookRecord.getValue(Notebook.NOTEBOOK.NID)
+
+        // Insert workflow-notebook mapping
+        ctx
+          .insertInto(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING)
+          .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.WID, wid)
+          .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.VID, vid)
+          .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.NID, 
nidInside)
+          .set(
+            WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.MAPPING,
+            JSONB.valueOf(mappingNode.toString)
+          )
+          .execute()

Review Comment:
   The `notebook.wid` UNIQUE constraint from #5055 means at most one notebook 
row per workflow. But this method unconditionally INSERTs into both tables, so:
   
   - First call for (wid=X, vid=Y): both inserts succeed ✓
   - Second call for (wid=X, vid=Z) — saving a mapping for the same workflow at 
a different version: the `notebook` INSERT violates the UNIQUE constraint → SQL 
exception → 500.
   
   But `workflow_notebook_mapping`'s PK is `(wid, vid, nid)`, which implies the 
schema is designed to hold one mapping row per workflow version. The current 
code can't actually produce more than one — only the first version per workflow 
can be stored.
   
   Is the intent (a) "one notebook per workflow, only ever stored once" or (b) 
"support multiple versions, each with its own mapping"?
   
   - If (a): better to detect the existing row and return an explicit "notebook 
already stored for workflow X" 4xx rather than letting the unique violation 
bubble up as a 500.
   - If (b): the notebook INSERT needs to be conditional (e.g., upsert with `ON 
CONFLICT (wid) DO NOTHING` then re-fetch the nid), and the mapping INSERT just 
appends a new (wid, vid, nid) row.
   
   The test suite doesn't currently exercise this — all tests do a single 
store-then-fetch. Adding a "store twice" test case would pin the intended 
behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to