zyratlo commented on code in PR #5258:
URL: https://github.com/apache/texera/pull/5258#discussion_r3431495754


##########
notebook-migration-service/src/main/scala/org/apache/texera/service/resource/NotebookMigrationResource.scala:
##########
@@ -0,0 +1,378 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.texera.service.resource
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.typesafe.scalalogging.LazyLogging
+import jakarta.ws.rs._
+import jakarta.ws.rs.core._
+import org.apache.texera.dao.SqlServer
+import org.jooq.JSONB
+import org.apache.texera.dao.jooq.generated.tables.Notebook
+import org.apache.texera.dao.jooq.generated.tables.WorkflowNotebookMapping
+import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
+import scala.util.control.NonFatal
+import org.apache.texera.common.config.StorageConfig
+
+object NotebookMigrationResource extends LazyLogging {
+
+  private val mapper: ObjectMapper = new 
ObjectMapper().registerModule(DefaultScalaModule)
+
+  // Build an error response body via the mapper so the message is 
JSON-escaped; interpolating
+  // e.getMessage directly produces malformed JSON when it contains quotes, 
backslashes, or newlines.
+  private def errorJson(message: String): String =
+    mapper.writeValueAsString(mapper.createObjectNode().put("error", message))
+
+  private val jupyterUrl = StorageConfig.jupyterURL
+  private val jupyterToken = StorageConfig.jupyterToken
+  // The token is passed as a URL param so the browser iframe can authenticate 
when loading the notebook.
+  // jupyterIframeURL is process-global state. This is safe ONLY because each 
user runs their own pod
+  // (own notebook-migration-service JVM + own Jupyter) in the k8s deployment, 
so this singleton is
+  // effectively per-user. Do NOT deploy this service as a shared multi-user 
instance without adding
+  // per-user keying here, or one user's upload would overwrite another's 
iframe URL.
+  private var jupyterIframeURL = 
s"$jupyterUrl/notebooks/work/notebook.ipynb?token=$jupyterToken"
+
+  private def isJupyterAvailable(jupyterUrl: String): Boolean = {
+    var conn: java.net.HttpURLConnection = null
+    try {
+      conn = new java.net.URL(s"$jupyterUrl/api")
+        .openConnection()
+        .asInstanceOf[java.net.HttpURLConnection]
+
+      conn.setRequestMethod("GET")
+      conn.setConnectTimeout(2000)
+      conn.setReadTimeout(2000)
+
+      val status = conn.getResponseCode
+
+      status == 200 || status == 403
+    } catch {
+      case _: Exception => false
+    } finally {
+      if (conn != null) conn.disconnect()
+    }
+  }
+
+  // Returns the Jupyter iframe reference URL
+  def getJupyterIframeURL(): Response = {
+    if (!isJupyterAvailable(jupyterUrl)) {
+      return Response
+        .status(500)
+        .entity(
+          """
+      {
+        "success": false,
+        "message": "Cannot connect to Jupyter server"
+      }
+      """
+        )
+        .build()
+    }
+
+    Response
+      .ok(
+        s"""
+    {
+      "success": true,
+      "url": "$jupyterIframeURL"
+    }
+    """
+      )
+      .build()
+  }
+
+  // Returns the URL of Jupyter
+  def getJupyterURL(): Response = {
+    if (!isJupyterAvailable(jupyterUrl)) {
+      return Response
+        .status(500)
+        .entity(
+          """
+      {
+        "success": false,
+        "message": "Cannot connect to Jupyter server"
+      }
+      """
+        )
+        .build()
+    }
+
+    Response
+      .ok(
+        s"""
+    {
+      "success": true,
+      "url": "$jupyterUrl"
+    }
+    """
+      )
+      .build()
+  }
+
+  // Set the notebook in Jupyter
+  def setNotebook(body: String): Response = {
+    if (!isJupyterAvailable(jupyterUrl)) {
+      return Response
+        .status(500)
+        .entity(
+          """
+      {
+        "success": false,
+        "message": "Cannot connect to Jupyter server"
+      }
+      """
+        )
+        .build()
+    }
+
+    var conn: HttpURLConnection = null
+    try {
+      val json = mapper.readTree(body)
+
+      val notebookName = json.get("notebookName").asText()
+      val notebookData = json.get("notebookData")
+
+      // Construct Jupyter API URL
+      val apiUrl = s"$jupyterUrl/api/contents/work/$notebookName"
+
+      val url = new URL(apiUrl)
+      conn = url.openConnection().asInstanceOf[HttpURLConnection]
+
+      conn.setRequestMethod("PUT")
+      conn.setDoOutput(true)
+      conn.setRequestProperty("Content-Type", "application/json")
+      // The Jupyter Contents API requires authentication; send the configured 
token.
+      conn.setRequestProperty("Authorization", s"token $jupyterToken")
+
+      val requestBody =
+        s"""
+      {
+        "type": "notebook",
+        "content": $notebookData
+      }
+      """
+
+      val os = conn.getOutputStream
+      os.write(requestBody.getBytes(StandardCharsets.UTF_8))
+      os.flush()
+      os.close()
+
+      val status = conn.getResponseCode
+
+      if (status != 200 && status != 201) {
+        return Response
+          .status(500)
+          .entity(
+            s"""
+        {
+          "success": false,
+          "message": "Failed to upload notebook to Jupyter (status $status)"
+        }
+        """
+          )
+          .build()
+      }
+
+      jupyterIframeURL = 
s"$jupyterUrl/notebooks/work/$notebookName?token=$jupyterToken"
+
+      Response
+        .ok(
+          s"""
+      {
+        "success": true,
+        "message": "Notebook successfully sent to Jupyter."
+      }
+      """
+        )
+        .build()
+
+    } catch {
+      case NonFatal(e) =>
+        logger.error("Error sending notebook to Jupyter", e)
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(errorJson(e.getMessage))
+          .build()
+    } finally {
+      if (conn != null) conn.disconnect()
+    }
+  }
+
+  // Store notebook + mapping in database
+  def storeNotebookAndMapping(body: String): Response = {
+    try {
+      val json = mapper.readTree(body)
+
+      val wid: java.lang.Integer = json.get("wid").asInt()
+      val vid: java.lang.Integer = json.get("vid").asInt()
+      val mappingNode = json.get("mapping")
+      val notebookNode = json.get("notebook")
+
+      val dsl = SqlServer.getInstance().createDSLContext()
+
+      val nid: java.lang.Integer = SqlServer.withTransaction(dsl) { ctx =>
+        // Insert notebook
+        val notebookRecord = ctx
+          .insertInto(Notebook.NOTEBOOK)
+          .set(Notebook.NOTEBOOK.WID, wid)
+          .set(Notebook.NOTEBOOK.NOTEBOOK_, 
JSONB.valueOf(notebookNode.toString))
+          .returning(Notebook.NOTEBOOK.NID)
+          .fetchOne()
+
+        val nidInside: java.lang.Integer = 
notebookRecord.getValue(Notebook.NOTEBOOK.NID)
+
+        // Insert workflow-notebook mapping
+        ctx
+          .insertInto(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING)
+          .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.WID, wid)
+          .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.VID, vid)
+          .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.NID, 
nidInside)
+          .set(
+            WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.MAPPING,
+            JSONB.valueOf(mappingNode.toString)
+          )
+          .execute()

Review Comment:
   Since the intent is (a), the method will check for duplicate wid and reject 
early with a 409 instead of generic 500. The test suite also includes a new 
test to verify this behavior. Added in dc08534.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to