PG1204 commented on code in PR #5124:
URL: https://github.com/apache/texera/pull/5124#discussion_r3270303695


##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")

Review Comment:
   As per the discussions, it was decided that the user would have to add their 
token to the system in the operator's property panel. I have removed the env 
var path entirely and will rely on the user's input of the token, for both 
inference and model info retrieval. There would be a fall back to anonymous HF 
calls (works fine for the public model and task lists, just at HF's public rate 
limits), if there's no token provided by the user.
   
   Yes, there will be a future PR that would allow the user to add the token in 
the operator.



##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+
+      // ── Search mode: forward query to HF Hub, return results directly ──
+      if (search != null && search.trim.nonEmpty) {
+        return fetchSearchResults(task, search.trim, hfToken)
+      }
+
+      // ── Browse mode: return ALL models for this task (cached) ──
+      val cached = modelCache.get(task)
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      // Not cached — fetch all pages from HF Hub API
+      val allModels = fetchAllModelsForTask(task, hfToken)
+      val json = objectMapper.writeValueAsString(allModels)
+      modelCache.put(task, json)
+
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @POST
+  @Path("/upload-audio")
+  @Consumes(Array(MediaType.WILDCARD))
+  def uploadAudioReference(
+      @QueryParam("filename") filename: String,
+      bytes: Array[Byte]
+  ): Response = {
+    try {
+      if (bytes == null || bytes.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio payload is empty."}""")
+          .build()
+      }
+
+      val safeFileName = Option(filename)
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(name => Paths.get(name).getFileName.toString)
+        .getOrElse("audio.bin")
+      val extension = {
+        val idx = safeFileName.lastIndexOf('.')
+        if (idx >= 0 && idx < safeFileName.length - 1) 
safeFileName.substring(idx) else ".bin"
+      }
+
+      val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), 
"texera-hf-audio")
+      Files.createDirectories(tempDir)
+      val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", 
extension)
+      Files.write(tempFile, bytes)
+
+      val json = objectMapper.writeValueAsString(
+        Map(
+          "path" -> tempFile.toAbsolutePath.toString,
+          "fileName" -> safeFileName
+        ).asJava
+      )
+      Response.ok(json).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @GET
+  @Path("/audio-preview")
+  def previewUploadedAudio(@QueryParam("path") path: String): Response = {
+    try {
+      val trimmedPath = Option(path).map(_.trim).getOrElse("")
+      if (trimmedPath.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio path is required."}""")
+          .build()
+      }
+
+      val tempDir = Paths
+        .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio")
+        .toAbsolutePath
+        .normalize()
+      val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize()
+      if (!requestedPath.startsWith(tempDir)) {
+        return Response
+          .status(Response.Status.FORBIDDEN)
+          .entity("""{"error":"Audio path is outside the allowed preview 
directory."}""")
+          .build()
+      }
+      if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) 
{
+        return Response
+          .status(Response.Status.NOT_FOUND)
+          .entity("""{"error":"Uploaded audio file was not found."}""")
+          .build()
+      }
+
+      val contentType = Option(Files.probeContentType(requestedPath))
+        .filter(_.trim.nonEmpty)
+        .getOrElse(inferAudioContentType(requestedPath))
+      Response.ok(Files.readAllBytes(requestedPath), contentType).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to read uploaded audio: 
${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @GET
+  @Path("/media-proxy")
+  def proxyRemoteMedia(@QueryParam("url") url: String): Response = {
+    try {
+      val trimmedUrl = Option(url).map(_.trim).getOrElse("")
+      if (trimmedUrl.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Media URL is required."}""")
+          .build()
+      }
+      if (!trimmedUrl.startsWith("http://";) && 
!trimmedUrl.startsWith("https://";)) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Only http(s) media URLs are supported."}""")
+          .build()
+      }
+
+      val upstreamResponse = Unirest
+        .get(trimmedUrl)
+        .connectTimeout(10000)
+        .socketTimeout(120000)
+        .asBytes()
+
+      if (upstreamResponse.getStatus != 200) {
+        return Response
+          .status(upstreamResponse.getStatus)
+          .entity(
+            s"""{"error":"Failed to fetch remote media: 
${upstreamResponse.getStatusText}"}"""
+          )
+          .build()
+      }
+
+      val contentType = 
Option(upstreamResponse.getHeaders.getFirst("Content-Type"))
+        .filter(_.trim.nonEmpty)
+        .getOrElse(MediaType.APPLICATION_OCTET_STREAM)
+      Response.ok(upstreamResponse.getBody, contentType).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to proxy remote media: 
${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  /** Search HF Hub for models matching a query within a task. */
+  private def fetchSearchResults(task: String, query: String, hfToken: 
String): Response = {
+    var request = Unirest
+      .get("https://huggingface.co/api/models";)
+      .queryString("pipeline_tag", task)
+      .queryString("sort", "downloads")
+      .queryString("direction", "-1")
+      .queryString("limit", "100")
+      .queryString("filter", task)
+      .queryString("inference", "warm")
+      .queryString("search", query)
+
+    if (hfToken.nonEmpty) {
+      request = request.header("Authorization", s"Bearer $hfToken")
+    }
+
+    val hfResponse = request.asString()
+
+    if (hfResponse.getStatus != 200) {
+      return Response
+        .status(hfResponse.getStatus)
+        .entity(s"""{"error":"Hugging Face API error: 
${hfResponse.getStatusText}"}""")
+        .build()
+    }
+
+    val rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType)
+    val out = buildSimplifiedList(rawModels)
+    Response.ok(objectMapper.writeValueAsString(out)).build()
+  }
+
+  /**
+    * Fetch pipeline task tags from the Hugging Face Hub API.
+    * GET /api/huggingface/tasks
+    *
+    * Returns a JSON array of objects: [{ "tag": "text-generation", "label": 
"Text Generation" }, ...]
+    * The result is cached server-side for the lifetime of the process.
+    */
+  @GET
+  @Path("/tasks")
+  def listTasks(): Response = {
+    try {
+      val cached = taskCache.get("all")
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+      var request = Unirest
+        .get("https://huggingface.co/api/tasks";)
+        .connectTimeout(10000)
+        .socketTimeout(15000)
+
+      if (hfToken.nonEmpty) {
+        request = request.header("Authorization", s"Bearer $hfToken")
+      }
+
+      val hfResponse = request.asString()
+
+      if (hfResponse.getStatus != 200) {
+        return Response
+          .status(hfResponse.getStatus)
+          .entity(s"""{"error":"Hugging Face API error: 
${hfResponse.getStatusText}"}""")
+          .build()
+      }
+
+      // /api/tasks returns a JSON object: { "<pipeline_tag>": { "label": 
"...", ... }, ... }
+      // Using readTree so no entry is dropped regardless of its value type 
(null, array, etc.)
+      val root: JsonNode = objectMapper.readTree(hfResponse.getBody)
+      val taskList = new java.util.ArrayList[java.util.Map[String, Object]]()
+      val iter = root.fields()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val tag = entry.getKey
+        val info: JsonNode = entry.getValue
+        val label =
+          if (info != null && info.isObject && info.has("label")) 
info.get("label").asText(tag)
+          else tag
+        val taskEntry = new java.util.LinkedHashMap[String, Object]()
+        taskEntry.put("tag", tag)
+        taskEntry.put("label", label)
+        taskList.add(taskEntry)
+      }
+
+      // Filter out tasks that have no models available with hosted inference
+      val availableTasks = taskList
+        .parallelStream()
+        .filter(task => hasModelsForTask(task.get("tag").toString, hfToken))
+        .collect(Collectors.toList())
+
+      val json = objectMapper.writeValueAsString(availableTasks)
+      taskCache.put("all", json)
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch tasks: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  /**
+    * Fetch ALL models for a given task by paginating through the HF Hub API.
+    * HF Hub uses a Link header with rel="next" for pagination.
+    * We fetch pages of 1000 models at a time, up to MAX_PAGES pages.
+    */
+  private def fetchAllModelsForTask(
+      task: String,
+      hfToken: String
+  ): java.util.List[java.util.Map[String, Object]] = {
+    val allResults = new java.util.ArrayList[java.util.Map[String, Object]]()
+    var nextUrl: String = null
+    var pageCount = 0

Review Comment:
   Fixes:
   
   - Added `MAX_PAGES = 50`. `pageCount` now bounds the loop:
     `while (nextUrl != null && pageCount < MAX_PAGES)`.
   - Return type changed from `List[…]` to `PageResult(models, truncated)`; 
caller propagates `truncated=true` via an `X-Texera-Truncated` header.
   - Logs a WARN when the cap is hit.
   
   50 × `PAGE_SIZE=1000` = 50K models per task. Covers nearly every HF task 
today; `text-generation` (~150K) truncates, but search mode (`?search=…`) hits 
HF directly without the cap, so specific models are still findable.
   
   Trade-offs of raising the cap:
   - Pagination is sequential (each page's URL is in the previous response's 
Link header), so each extra page extends the cold-miss wait.
   - Cache grows linearly - 50K ≈ 10 MB/task, 200K ≈ 40 MB/task.
   - More HF calls = higher chance of hitting their rate limits (429/503).
   - Past the first few thousand entries the list is mostly low-download or 
abandoned models - diminishing UX value.



##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+
+      // ── Search mode: forward query to HF Hub, return results directly ──
+      if (search != null && search.trim.nonEmpty) {
+        return fetchSearchResults(task, search.trim, hfToken)
+      }
+
+      // ── Browse mode: return ALL models for this task (cached) ──
+      val cached = modelCache.get(task)
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      // Not cached — fetch all pages from HF Hub API
+      val allModels = fetchAllModelsForTask(task, hfToken)
+      val json = objectMapper.writeValueAsString(allModels)
+      modelCache.put(task, json)
+
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @POST
+  @Path("/upload-audio")
+  @Consumes(Array(MediaType.WILDCARD))
+  def uploadAudioReference(
+      @QueryParam("filename") filename: String,
+      bytes: Array[Byte]
+  ): Response = {
+    try {
+      if (bytes == null || bytes.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio payload is empty."}""")
+          .build()
+      }
+
+      val safeFileName = Option(filename)
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(name => Paths.get(name).getFileName.toString)
+        .getOrElse("audio.bin")
+      val extension = {
+        val idx = safeFileName.lastIndexOf('.')
+        if (idx >= 0 && idx < safeFileName.length - 1) 
safeFileName.substring(idx) else ".bin"
+      }
+
+      val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), 
"texera-hf-audio")
+      Files.createDirectories(tempDir)
+      val tempFile: NioPath = Files.createTempFile(tempDir, "hf-audio-", 
extension)
+      Files.write(tempFile, bytes)
+
+      val json = objectMapper.writeValueAsString(
+        Map(
+          "path" -> tempFile.toAbsolutePath.toString,
+          "fileName" -> safeFileName
+        ).asJava
+      )
+      Response.ok(json).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to upload audio: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @GET
+  @Path("/audio-preview")
+  def previewUploadedAudio(@QueryParam("path") path: String): Response = {
+    try {
+      val trimmedPath = Option(path).map(_.trim).getOrElse("")
+      if (trimmedPath.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio path is required."}""")
+          .build()
+      }
+
+      val tempDir = Paths
+        .get(System.getProperty("java.io.tmpdir"), "texera-hf-audio")
+        .toAbsolutePath
+        .normalize()
+      val requestedPath = Paths.get(trimmedPath).toAbsolutePath.normalize()
+      if (!requestedPath.startsWith(tempDir)) {
+        return Response
+          .status(Response.Status.FORBIDDEN)
+          .entity("""{"error":"Audio path is outside the allowed preview 
directory."}""")
+          .build()
+      }
+      if (!Files.exists(requestedPath) || !Files.isRegularFile(requestedPath)) 
{
+        return Response
+          .status(Response.Status.NOT_FOUND)
+          .entity("""{"error":"Uploaded audio file was not found."}""")
+          .build()
+      }
+
+      val contentType = Option(Files.probeContentType(requestedPath))
+        .filter(_.trim.nonEmpty)
+        .getOrElse(inferAudioContentType(requestedPath))
+      Response.ok(Files.readAllBytes(requestedPath), contentType).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to read uploaded audio: 
${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @GET
+  @Path("/media-proxy")
+  def proxyRemoteMedia(@QueryParam("url") url: String): Response = {
+    try {
+      val trimmedUrl = Option(url).map(_.trim).getOrElse("")
+      if (trimmedUrl.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Media URL is required."}""")
+          .build()
+      }
+      if (!trimmedUrl.startsWith("http://";) && 
!trimmedUrl.startsWith("https://";)) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Only http(s) media URLs are supported."}""")
+          .build()
+      }
+
+      val upstreamResponse = Unirest
+        .get(trimmedUrl)
+        .connectTimeout(10000)
+        .socketTimeout(120000)
+        .asBytes()
+
+      if (upstreamResponse.getStatus != 200) {
+        return Response
+          .status(upstreamResponse.getStatus)
+          .entity(
+            s"""{"error":"Failed to fetch remote media: 
${upstreamResponse.getStatusText}"}"""
+          )
+          .build()
+      }
+
+      val contentType = 
Option(upstreamResponse.getHeaders.getFirst("Content-Type"))
+        .filter(_.trim.nonEmpty)
+        .getOrElse(MediaType.APPLICATION_OCTET_STREAM)
+      Response.ok(upstreamResponse.getBody, contentType).build()
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to proxy remote media: 
${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  /** Search HF Hub for models matching a query within a task. */
+  private def fetchSearchResults(task: String, query: String, hfToken: 
String): Response = {
+    var request = Unirest
+      .get("https://huggingface.co/api/models";)
+      .queryString("pipeline_tag", task)
+      .queryString("sort", "downloads")
+      .queryString("direction", "-1")
+      .queryString("limit", "100")
+      .queryString("filter", task)
+      .queryString("inference", "warm")
+      .queryString("search", query)
+
+    if (hfToken.nonEmpty) {
+      request = request.header("Authorization", s"Bearer $hfToken")
+    }
+
+    val hfResponse = request.asString()
+
+    if (hfResponse.getStatus != 200) {
+      return Response
+        .status(hfResponse.getStatus)
+        .entity(s"""{"error":"Hugging Face API error: 
${hfResponse.getStatusText}"}""")
+        .build()
+    }
+
+    val rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType)
+    val out = buildSimplifiedList(rawModels)
+    Response.ok(objectMapper.writeValueAsString(out)).build()
+  }
+
+  /**
+    * Fetch pipeline task tags from the Hugging Face Hub API.
+    * GET /api/huggingface/tasks
+    *
+    * Returns a JSON array of objects: [{ "tag": "text-generation", "label": 
"Text Generation" }, ...]
+    * The result is cached server-side for the lifetime of the process.
+    */
+  @GET
+  @Path("/tasks")
+  def listTasks(): Response = {
+    try {
+      val cached = taskCache.get("all")
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+      var request = Unirest
+        .get("https://huggingface.co/api/tasks";)
+        .connectTimeout(10000)
+        .socketTimeout(15000)
+
+      if (hfToken.nonEmpty) {
+        request = request.header("Authorization", s"Bearer $hfToken")
+      }
+
+      val hfResponse = request.asString()
+
+      if (hfResponse.getStatus != 200) {
+        return Response
+          .status(hfResponse.getStatus)
+          .entity(s"""{"error":"Hugging Face API error: 
${hfResponse.getStatusText}"}""")
+          .build()
+      }
+
+      // /api/tasks returns a JSON object: { "<pipeline_tag>": { "label": 
"...", ... }, ... }
+      // Using readTree so no entry is dropped regardless of its value type 
(null, array, etc.)
+      val root: JsonNode = objectMapper.readTree(hfResponse.getBody)
+      val taskList = new java.util.ArrayList[java.util.Map[String, Object]]()
+      val iter = root.fields()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val tag = entry.getKey
+        val info: JsonNode = entry.getValue
+        val label =
+          if (info != null && info.isObject && info.has("label")) 
info.get("label").asText(tag)
+          else tag
+        val taskEntry = new java.util.LinkedHashMap[String, Object]()
+        taskEntry.put("tag", tag)
+        taskEntry.put("label", label)
+        taskList.add(taskEntry)
+      }
+
+      // Filter out tasks that have no models available with hosted inference
+      val availableTasks = taskList
+        .parallelStream()
+        .filter(task => hasModelsForTask(task.get("tag").toString, hfToken))
+        .collect(Collectors.toList())
+
+      val json = objectMapper.writeValueAsString(availableTasks)
+      taskCache.put("all", json)
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch tasks: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  /**
+    * Fetch ALL models for a given task by paginating through the HF Hub API.
+    * HF Hub uses a Link header with rel="next" for pagination.
+    * We fetch pages of 1000 models at a time, up to MAX_PAGES pages.
+    */
+  private def fetchAllModelsForTask(
+      task: String,
+      hfToken: String
+  ): java.util.List[java.util.Map[String, Object]] = {
+    val allResults = new java.util.ArrayList[java.util.Map[String, Object]]()
+    var nextUrl: String = null
+    var pageCount = 0
+
+    // First request — inference=warm limits to models with hosted Inference 
API
+    var request = Unirest
+      .get("https://huggingface.co/api/models";)
+      .queryString("pipeline_tag", task)
+      .queryString("sort", "downloads")
+      .queryString("direction", "-1")
+      .queryString("limit", PAGE_SIZE.toString)
+      .queryString("filter", task)
+      .queryString("inference", "warm")
+      .connectTimeout(10000)
+      .socketTimeout(30000)
+
+    if (hfToken.nonEmpty) {
+      request = request.header("Authorization", s"Bearer $hfToken")
+    }
+
+    var hfResponse = request.asString()
+
+    if (hfResponse.getStatus != 200) {
+      throw new RuntimeException(
+        s"HF API returned ${hfResponse.getStatus}: ${hfResponse.getStatusText}"
+      )
+    }
+
+    var rawModels = objectMapper.readValue(hfResponse.getBody, listOfMapsType)
+    allResults.addAll(buildSimplifiedList(rawModels))
+    pageCount += 1
+
+    // Extract next page URL from Link header
+    nextUrl = extractNextLink(hfResponse.getHeaders.getFirst("Link"))
+
+    // Fetch remaining pages until exhausted
+    while (nextUrl != null) {
+      var nextRequest = Unirest
+        .get(nextUrl)
+        .connectTimeout(10000)
+        .socketTimeout(30000)
+      if (hfToken.nonEmpty) {
+        nextRequest = nextRequest.header("Authorization", s"Bearer $hfToken")
+      }
+
+      hfResponse = nextRequest.asString()
+
+      if (hfResponse.getStatus != 200) {
+        // Stop paginating on error, return what we have so far
+        return allResults
+      }

Review Comment:
   Changes:
   
   - Added a WARN log whenever pagination dies mid-flight, includes which page 
failed, which task it was for, and what status code HF returned. Operators can 
now see when this happens in production logs.
   - Changed the return type from a plain list to PageResult(models, truncated) 
- so the caller can no longer pretend the list is complete by accident.
   - Surfaced the truncation to clients via an X-Texera-Truncated: true HTTP 
header on the response - so the frontend can show users a "list may be 
incomplete" hint.
   
   On the "rather than caching" point - we deliberately kept caching truncated 
results, with the truncation flag visible to clients via the header. Reasoning: 
pagination errors are usually transient (5xx blips, brief rate-limit pressure). 
Not caching means every subsequent browse for that task re-paginates 30–50 HF 
calls during a flaky episode - burning HF API quota on retries that may also 
fail. Caching the partial bounds staleness at the 60-min TTL, lets careful 
clients refresh on demand using the header, and keeps the quota healthy during 
outages.
   
   If you'd prefer fail-and-retry semantics over honest-partial-with-TTL, the 
change is a one-line edit:
   
     if (!isUserToken && !pageResult.truncated) modelCache.put(task, json)
   
   Happy to flip it - just let me know which behavior you'd rather have.



##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+
+      // ── Search mode: forward query to HF Hub, return results directly ──
+      if (search != null && search.trim.nonEmpty) {
+        return fetchSearchResults(task, search.trim, hfToken)
+      }
+
+      // ── Browse mode: return ALL models for this task (cached) ──
+      val cached = modelCache.get(task)
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      // Not cached — fetch all pages from HF Hub API
+      val allModels = fetchAllModelsForTask(task, hfToken)
+      val json = objectMapper.writeValueAsString(allModels)
+      modelCache.put(task, json)
+
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @POST
+  @Path("/upload-audio")
+  @Consumes(Array(MediaType.WILDCARD))
+  def uploadAudioReference(
+      @QueryParam("filename") filename: String,
+      bytes: Array[Byte]

Review Comment:
   Fixes:
   
   1. Switched the endpoint signature from `bytes: Array[Byte]` to `stream: 
InputStream` so Jersey hands us a stream instead of buffering the entire body 
in RAM upfront. Our memory footprint during an upload is now ~8 KB regardless 
of upload size.
   
   2. Added a streaming size check: bytes are copied to a temp file in 8 KB 
chunks while a running counter tracks total bytes written. If the counter 
exceeds MAX_AUDIO_BYTES (25 MiB), we close the stream, delete the partial file, 
and return HTTP 413 with a generic error.
   
   25 MiB comfortably covers realistic audio clips (~10–30 minutes depending on 
bitrate) while staying well below HF Hub's own inference-API limits. Hardcoded 
for now; happy to make it config-driven in a follow-up if any deployment needs 
a different value.



##########
amber/src/main/scala/org/apache/texera/web/resource/HuggingFaceModelResource.scala:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import kong.unirest.Unirest
+
+import javax.ws.rs._
+import javax.ws.rs.core.{MediaType, Response}
+import java.nio.file.{Files, Path => NioPath, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+/**
+  * REST resource that proxies the Hugging Face Hub API to list
+  * models for the HuggingFace operator.
+  *
+  * Browse mode:  GET /api/huggingface/models?task=text-generation
+  *   Fetches ALL models for the task from HF Hub (paginated internally),
+  *   caches the full list server-side, and returns it.
+  *
+  * Search mode:  GET /api/huggingface/models?task=text-generation&search=bert
+  *   Forwards the search query to HF Hub API (searches all models).
+  */
+@Path("/huggingface")
+@Produces(Array(MediaType.APPLICATION_JSON))
+class HuggingFaceModelResource {
+
+  import HuggingFaceModelResource._
+
+  @GET
+  @Path("/models")
+  def listModels(
+      @QueryParam("task") @DefaultValue("text-generation") task: String,
+      @QueryParam("search") search: String
+  ): Response = {
+    try {
+      val hfToken = Option(System.getenv("HF_TOKEN")).getOrElse("")
+
+      // ── Search mode: forward query to HF Hub, return results directly ──
+      if (search != null && search.trim.nonEmpty) {
+        return fetchSearchResults(task, search.trim, hfToken)
+      }
+
+      // ── Browse mode: return ALL models for this task (cached) ──
+      val cached = modelCache.get(task)
+      if (cached != null) {
+        return Response.ok(cached).build()
+      }
+
+      // Not cached — fetch all pages from HF Hub API
+      val allModels = fetchAllModelsForTask(task, hfToken)
+      val json = objectMapper.writeValueAsString(allModels)
+      modelCache.put(task, json)
+
+      Response.ok(json).build()
+
+    } catch {
+      case e: Exception =>
+        Response
+          .status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(s"""{"error":"Failed to fetch models: ${e.getMessage}"}""")
+          .build()
+    }
+  }
+
+  @POST
+  @Path("/upload-audio")
+  @Consumes(Array(MediaType.WILDCARD))
+  def uploadAudioReference(
+      @QueryParam("filename") filename: String,
+      bytes: Array[Byte]
+  ): Response = {
+    try {
+      if (bytes == null || bytes.isEmpty) {
+        return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("""{"error":"Audio payload is empty."}""")
+          .build()
+      }
+
+      val safeFileName = Option(filename)
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(name => Paths.get(name).getFileName.toString)
+        .getOrElse("audio.bin")
+      val extension = {
+        val idx = safeFileName.lastIndexOf('.')
+        if (idx >= 0 && idx < safeFileName.length - 1) 
safeFileName.substring(idx) else ".bin"
+      }
+
+      val tempDir = Paths.get(System.getProperty("java.io.tmpdir"), 
"texera-hf-audio")

Review Comment:
   Added two cleanup mechanisms:
   
   - Sweep on every upload. Before writing a new file, we scan the directory 
and delete anything older than 1 hour. So uploads that have been sitting around 
for over an hour automatically age out as new uploads come in.
   
   - Cleanup on graceful shutdown. Each new file is registered with Java's 
deleteOnExit so when the server is shut down cleanly, it tries to delete the 
active session's files too.
   
   The directory now stays bounded - old files age out within an hour of 
inactivity, and the upload -> run cycle (typically seconds-to-minutes) has 
plenty of headroom before files expire.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to