aglinxinyuan commented on code in PR #5798:
URL: https://github.com/apache/texera/pull/5798#discussion_r3444857447


##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetchUtilSpec.scala:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.fetcher
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path}
+
+class URLFetchUtilSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixtures — temp files reachable via the JVM's built-in `file:` URL handler
+  // 
---------------------------------------------------------------------------
+
+  private def freshTempFile(contents: String): Path = {
+    val path = Files.createTempFile("url-fetch-util-spec-", ".bin")
+    Files.write(path, contents.getBytes(StandardCharsets.UTF_8))
+    path.toFile.deleteOnExit()
+    path
+  }
+
+  private def fileUrl(path: Path): java.net.URL = path.toUri.toURL
+
+  // 
---------------------------------------------------------------------------
+  // Success path
+  // 
---------------------------------------------------------------------------
+
+  "URLFetchUtil.getInputStreamFromURL" should
+    "return Some(stream) carrying the URL's bytes on success" in {
+    val path = freshTempFile("hello-url-fetch")
+    val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path))
+    assert(result.isDefined)
+    try {
+      val bytes = result.get.readAllBytes()
+      assert(new String(bytes, StandardCharsets.UTF_8) == "hello-url-fetch")
+    } finally {
+      result.foreach(_.close())
+    }
+  }
+
+  it should "return Some(stream) when explicit retries is supplied (>= 1)" in {
+    val path = freshTempFile("with-retries")
+    val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path), retries = 3)
+    assert(result.isDefined)
+    try {
+      val bytes = result.get.readAllBytes()
+      assert(new String(bytes, StandardCharsets.UTF_8) == "with-retries")
+    } finally {
+      result.foreach(_.close())
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Failure path — non-existent file URL exhausts retries and returns None
+  // 
---------------------------------------------------------------------------
+
+  it should "return None when the URL never produces an input stream (default 
retries)" in {
+    val missing = new java.io.File(
+      System.getProperty("java.io.tmpdir"),
+      "this-file-must-not-exist-" + System.nanoTime()
+    )
+    val url = missing.toURI.toURL
+    val result = URLFetchUtil.getInputStreamFromURL(url)
+    assert(result.isEmpty)
+  }
+
+  it should "return None immediately when retries is 0 (loop iterates zero 
times)" in {
+    val missing = new java.io.File(
+      System.getProperty("java.io.tmpdir"),
+      "still-must-not-exist-" + System.nanoTime()
+    )
+    val url = missing.toURI.toURL
+    val result = URLFetchUtil.getInputStreamFromURL(url, retries = 0)
+    assert(result.isEmpty)
+  }
+
+  it should "return None after the requested number of retries on persistent 
failure" in {
+    val missing = new java.io.File(
+      System.getProperty("java.io.tmpdir"),
+      "absent-" + System.nanoTime()
+    )
+    val url = missing.toURI.toURL
+    val result = URLFetchUtil.getInputStreamFromURL(url, retries = 2)
+    assert(result.isEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Default-arg shape — exposed via Scala's synthetic accessor
+  // 
---------------------------------------------------------------------------
+
+  "URLFetchUtil.getInputStreamFromURL$default$2" should "default retries to 5" 
in {
+    val cls = URLFetchUtil.getClass
+    val accessor = cls.getDeclaredMethod("getInputStreamFromURL$default$2")
+    accessor.setAccessible(true)
+    val default = accessor.invoke(URLFetchUtil)
+    assert(default == Integer.valueOf(5))
+  }

Review Comment:
   Replaced the `getInputStreamFromURL$default$2` reflection with a counting 
`URLStreamHandler` (commit c1b3d299c6). The default is now validated 
behaviorally: calling `getInputStreamFromURL(url)` with no `retries` arg 
against an always-failing handler asserts exactly 5 `openConnection` calls — no 
dependency on the synthetic accessor name.



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/util/OperatorDescriptorUtilsSpec.scala:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.util
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+class OperatorDescriptorUtilsSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // equallyPartitionGoal — shape + invariants
+  // 
---------------------------------------------------------------------------
+
+  "equallyPartitionGoal" should "return a list whose size equals 
totalNumWorkers" in {
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(10, 3).size == 3)
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(0, 5).size == 5)
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(100, 1).size == 1)
+  }
+
+  it should "produce slots that sum back to the goal" in {
+    for {
+      goal <- 0 to 20
+      workers <- 1 to 5
+    } {
+      val parts = OperatorDescriptorUtils.equallyPartitionGoal(goal, workers)
+      assert(parts.sum == goal, s"sum mismatch for (goal=$goal, 
workers=$workers): $parts")
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // equallyPartitionGoal — distribution semantics (worked cases)
+  // 
---------------------------------------------------------------------------
+
+  it should "partition evenly when goal is a multiple of totalNumWorkers" in {
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(9, 3) == List(3, 3, 3))
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(0, 4) == List(0, 0, 0, 
0))
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(8, 4) == List(2, 2, 2, 
2))
+  }
+
+  it should "give the remainder to the FIRST `goal % workers` slots" in {
+    // 10 = 3*3 + 1 → slot[0] gets the extra
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(10, 3) == List(4, 3, 
3))
+    // 11 = 3*3 + 2 → slots[0,1] each get +1
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(11, 3) == List(4, 4, 
3))
+    // 7 = 3*2 + 1 → slot[0] gets +1
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(7, 3) == List(3, 2, 2))
+  }
+
+  it should "handle the case where goal < totalNumWorkers" in {
+    // 3 = 5*0 + 3 → first 3 slots get 1
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(3, 5) == List(1, 1, 1, 
0, 0))
+    // 1 worker should get everything
+    assert(OperatorDescriptorUtils.equallyPartitionGoal(1, 4) == List(1, 0, 0, 
0))

Review Comment:
   Fixed the comment in c1b3d299c6 — it now reads `// 1 = 4*0 + 1 → only the 
first slot gets the single unit`, matching the `totalNumWorkers = 4` call.



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetchUtilSpec.scala:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.fetcher
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path}
+
+class URLFetchUtilSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixtures — temp files reachable via the JVM's built-in `file:` URL handler
+  // 
---------------------------------------------------------------------------
+
+  private def freshTempFile(contents: String): Path = {
+    val path = Files.createTempFile("url-fetch-util-spec-", ".bin")
+    Files.write(path, contents.getBytes(StandardCharsets.UTF_8))
+    path.toFile.deleteOnExit()
+    path
+  }
+
+  private def fileUrl(path: Path): java.net.URL = path.toUri.toURL
+
+  // 
---------------------------------------------------------------------------
+  // Success path
+  // 
---------------------------------------------------------------------------
+
+  "URLFetchUtil.getInputStreamFromURL" should
+    "return Some(stream) carrying the URL's bytes on success" in {
+    val path = freshTempFile("hello-url-fetch")
+    val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path))
+    assert(result.isDefined)
+    try {
+      val bytes = result.get.readAllBytes()
+      assert(new String(bytes, StandardCharsets.UTF_8) == "hello-url-fetch")
+    } finally {
+      result.foreach(_.close())
+    }
+  }
+
+  it should "return Some(stream) when explicit retries is supplied (>= 1)" in {
+    val path = freshTempFile("with-retries")
+    val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path), retries = 3)
+    assert(result.isDefined)
+    try {
+      val bytes = result.get.readAllBytes()
+      assert(new String(bytes, StandardCharsets.UTF_8) == "with-retries")
+    } finally {
+      result.foreach(_.close())
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Failure path — non-existent file URL exhausts retries and returns None
+  // 
---------------------------------------------------------------------------
+
+  it should "return None when the URL never produces an input stream (default 
retries)" in {
+    val missing = new java.io.File(
+      System.getProperty("java.io.tmpdir"),
+      "this-file-must-not-exist-" + System.nanoTime()
+    )
+    val url = missing.toURI.toURL
+    val result = URLFetchUtil.getInputStreamFromURL(url)
+    assert(result.isEmpty)
+  }
+
+  it should "return None immediately when retries is 0 (loop iterates zero 
times)" in {
+    val missing = new java.io.File(
+      System.getProperty("java.io.tmpdir"),
+      "still-must-not-exist-" + System.nanoTime()
+    )
+    val url = missing.toURI.toURL
+    val result = URLFetchUtil.getInputStreamFromURL(url, retries = 0)
+    assert(result.isEmpty)
+  }

Review Comment:
   Done in c1b3d299c6: the `retries = 0` test now uses a counting 
`URLStreamHandler` and asserts `openConnectionCount == 0`, proving the loop 
body never runs and no connection is opened.



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetchUtilSpec.scala:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.fetcher
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path}
+
+class URLFetchUtilSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // Fixtures — temp files reachable via the JVM's built-in `file:` URL handler
+  // 
---------------------------------------------------------------------------
+
+  private def freshTempFile(contents: String): Path = {
+    val path = Files.createTempFile("url-fetch-util-spec-", ".bin")
+    Files.write(path, contents.getBytes(StandardCharsets.UTF_8))
+    path.toFile.deleteOnExit()
+    path
+  }
+
+  private def fileUrl(path: Path): java.net.URL = path.toUri.toURL
+
+  // 
---------------------------------------------------------------------------
+  // Success path
+  // 
---------------------------------------------------------------------------
+
+  "URLFetchUtil.getInputStreamFromURL" should
+    "return Some(stream) carrying the URL's bytes on success" in {
+    val path = freshTempFile("hello-url-fetch")
+    val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path))
+    assert(result.isDefined)
+    try {
+      val bytes = result.get.readAllBytes()
+      assert(new String(bytes, StandardCharsets.UTF_8) == "hello-url-fetch")
+    } finally {
+      result.foreach(_.close())
+    }
+  }
+
+  it should "return Some(stream) when explicit retries is supplied (>= 1)" in {
+    val path = freshTempFile("with-retries")
+    val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path), retries = 3)
+    assert(result.isDefined)
+    try {
+      val bytes = result.get.readAllBytes()
+      assert(new String(bytes, StandardCharsets.UTF_8) == "with-retries")
+    } finally {
+      result.foreach(_.close())
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Failure path — non-existent file URL exhausts retries and returns None
+  // 
---------------------------------------------------------------------------
+
+  it should "return None when the URL never produces an input stream (default 
retries)" in {
+    val missing = new java.io.File(
+      System.getProperty("java.io.tmpdir"),
+      "this-file-must-not-exist-" + System.nanoTime()
+    )
+    val url = missing.toURI.toURL
+    val result = URLFetchUtil.getInputStreamFromURL(url)
+    assert(result.isEmpty)
+  }
+
+  it should "return None immediately when retries is 0 (loop iterates zero 
times)" in {
+    val missing = new java.io.File(
+      System.getProperty("java.io.tmpdir"),
+      "still-must-not-exist-" + System.nanoTime()
+    )
+    val url = missing.toURI.toURL
+    val result = URLFetchUtil.getInputStreamFromURL(url, retries = 0)
+    assert(result.isEmpty)
+  }
+
+  it should "return None after the requested number of retries on persistent 
failure" in {
+    val missing = new java.io.File(
+      System.getProperty("java.io.tmpdir"),
+      "absent-" + System.nanoTime()
+    )
+    val url = missing.toURI.toURL
+    val result = URLFetchUtil.getInputStreamFromURL(url, retries = 2)
+    assert(result.isEmpty)
+  }

Review Comment:
   Done in c1b3d299c6: the persistent-failure tests now assert the exact 
attempt count via a counting `URLStreamHandler` (`retries = 1 → 1`, `retries = 
2 → 2`, default → 5), so a loop running the wrong number of times would now 
fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to