aglinxinyuan commented on code in PR #5798: URL: https://github.com/apache/texera/pull/5798#discussion_r3444857447
########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetchUtilSpec.scala: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.fetcher + +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path} + +class URLFetchUtilSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Fixtures — temp files reachable via the JVM's built-in `file:` URL handler + // --------------------------------------------------------------------------- + + private def freshTempFile(contents: String): Path = { + val path = Files.createTempFile("url-fetch-util-spec-", ".bin") + Files.write(path, contents.getBytes(StandardCharsets.UTF_8)) + path.toFile.deleteOnExit() + path + } + + private def fileUrl(path: Path): java.net.URL = path.toUri.toURL + + // --------------------------------------------------------------------------- + // Success path + // --------------------------------------------------------------------------- + + "URLFetchUtil.getInputStreamFromURL" should + "return Some(stream) carrying the URL's bytes on success" in { + val path = freshTempFile("hello-url-fetch") + val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path)) + assert(result.isDefined) + try { + val bytes = result.get.readAllBytes() + assert(new String(bytes, StandardCharsets.UTF_8) == "hello-url-fetch") + } finally { + result.foreach(_.close()) + } + } + + it should "return Some(stream) when explicit retries is supplied (>= 1)" in { + val path = freshTempFile("with-retries") + val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path), retries = 3) + assert(result.isDefined) + try { + val bytes = result.get.readAllBytes() + assert(new String(bytes, StandardCharsets.UTF_8) == "with-retries") + } finally { + result.foreach(_.close()) + } + } + + // --------------------------------------------------------------------------- + // Failure path — non-existent file URL exhausts retries and returns None + // --------------------------------------------------------------------------- + + it should "return None when the URL never produces an input stream (default retries)" in { + val missing = new java.io.File( + System.getProperty("java.io.tmpdir"), + "this-file-must-not-exist-" + System.nanoTime() + ) + val url = missing.toURI.toURL + val result = URLFetchUtil.getInputStreamFromURL(url) + assert(result.isEmpty) + } + + it should "return None immediately when retries is 0 (loop iterates zero times)" in { + val missing = new java.io.File( + System.getProperty("java.io.tmpdir"), + "still-must-not-exist-" + System.nanoTime() + ) + val url = missing.toURI.toURL + val result = URLFetchUtil.getInputStreamFromURL(url, retries = 0) + assert(result.isEmpty) + } + + it should "return None after the requested number of retries on persistent failure" in { + val missing = new java.io.File( + System.getProperty("java.io.tmpdir"), + "absent-" + System.nanoTime() + ) + val url = missing.toURI.toURL + val result = URLFetchUtil.getInputStreamFromURL(url, retries = 2) + assert(result.isEmpty) + } + + // --------------------------------------------------------------------------- + // Default-arg shape — exposed via Scala's synthetic accessor + // --------------------------------------------------------------------------- + + "URLFetchUtil.getInputStreamFromURL$default$2" should "default retries to 5" in { + val cls = URLFetchUtil.getClass + val accessor = cls.getDeclaredMethod("getInputStreamFromURL$default$2") + accessor.setAccessible(true) + val default = accessor.invoke(URLFetchUtil) + assert(default == Integer.valueOf(5)) + } Review Comment: Replaced the `getInputStreamFromURL$default$2` reflection with a counting `URLStreamHandler` (commit c1b3d299c6). The default is now validated behaviorally: calling `getInputStreamFromURL(url)` with no `retries` arg against an always-failing handler asserts exactly 5 `openConnection` calls — no dependency on the synthetic accessor name. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/util/OperatorDescriptorUtilsSpec.scala: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.util + +import org.scalatest.flatspec.AnyFlatSpec + +class OperatorDescriptorUtilsSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // equallyPartitionGoal — shape + invariants + // --------------------------------------------------------------------------- + + "equallyPartitionGoal" should "return a list whose size equals totalNumWorkers" in { + assert(OperatorDescriptorUtils.equallyPartitionGoal(10, 3).size == 3) + assert(OperatorDescriptorUtils.equallyPartitionGoal(0, 5).size == 5) + assert(OperatorDescriptorUtils.equallyPartitionGoal(100, 1).size == 1) + } + + it should "produce slots that sum back to the goal" in { + for { + goal <- 0 to 20 + workers <- 1 to 5 + } { + val parts = OperatorDescriptorUtils.equallyPartitionGoal(goal, workers) + assert(parts.sum == goal, s"sum mismatch for (goal=$goal, workers=$workers): $parts") + } + } + + // --------------------------------------------------------------------------- + // equallyPartitionGoal — distribution semantics (worked cases) + // --------------------------------------------------------------------------- + + it should "partition evenly when goal is a multiple of totalNumWorkers" in { + assert(OperatorDescriptorUtils.equallyPartitionGoal(9, 3) == List(3, 3, 3)) + assert(OperatorDescriptorUtils.equallyPartitionGoal(0, 4) == List(0, 0, 0, 0)) + assert(OperatorDescriptorUtils.equallyPartitionGoal(8, 4) == List(2, 2, 2, 2)) + } + + it should "give the remainder to the FIRST `goal % workers` slots" in { + // 10 = 3*3 + 1 → slot[0] gets the extra + assert(OperatorDescriptorUtils.equallyPartitionGoal(10, 3) == List(4, 3, 3)) + // 11 = 3*3 + 2 → slots[0,1] each get +1 + assert(OperatorDescriptorUtils.equallyPartitionGoal(11, 3) == List(4, 4, 3)) + // 7 = 3*2 + 1 → slot[0] gets +1 + assert(OperatorDescriptorUtils.equallyPartitionGoal(7, 3) == List(3, 2, 2)) + } + + it should "handle the case where goal < totalNumWorkers" in { + // 3 = 5*0 + 3 → first 3 slots get 1 + assert(OperatorDescriptorUtils.equallyPartitionGoal(3, 5) == List(1, 1, 1, 0, 0)) + // 1 worker should get everything + assert(OperatorDescriptorUtils.equallyPartitionGoal(1, 4) == List(1, 0, 0, 0)) Review Comment: Fixed the comment in c1b3d299c6 — it now reads `// 1 = 4*0 + 1 → only the first slot gets the single unit`, matching the `totalNumWorkers = 4` call. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetchUtilSpec.scala: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.fetcher + +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path} + +class URLFetchUtilSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Fixtures — temp files reachable via the JVM's built-in `file:` URL handler + // --------------------------------------------------------------------------- + + private def freshTempFile(contents: String): Path = { + val path = Files.createTempFile("url-fetch-util-spec-", ".bin") + Files.write(path, contents.getBytes(StandardCharsets.UTF_8)) + path.toFile.deleteOnExit() + path + } + + private def fileUrl(path: Path): java.net.URL = path.toUri.toURL + + // --------------------------------------------------------------------------- + // Success path + // --------------------------------------------------------------------------- + + "URLFetchUtil.getInputStreamFromURL" should + "return Some(stream) carrying the URL's bytes on success" in { + val path = freshTempFile("hello-url-fetch") + val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path)) + assert(result.isDefined) + try { + val bytes = result.get.readAllBytes() + assert(new String(bytes, StandardCharsets.UTF_8) == "hello-url-fetch") + } finally { + result.foreach(_.close()) + } + } + + it should "return Some(stream) when explicit retries is supplied (>= 1)" in { + val path = freshTempFile("with-retries") + val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path), retries = 3) + assert(result.isDefined) + try { + val bytes = result.get.readAllBytes() + assert(new String(bytes, StandardCharsets.UTF_8) == "with-retries") + } finally { + result.foreach(_.close()) + } + } + + // --------------------------------------------------------------------------- + // Failure path — non-existent file URL exhausts retries and returns None + // --------------------------------------------------------------------------- + + it should "return None when the URL never produces an input stream (default retries)" in { + val missing = new java.io.File( + System.getProperty("java.io.tmpdir"), + "this-file-must-not-exist-" + System.nanoTime() + ) + val url = missing.toURI.toURL + val result = URLFetchUtil.getInputStreamFromURL(url) + assert(result.isEmpty) + } + + it should "return None immediately when retries is 0 (loop iterates zero times)" in { + val missing = new java.io.File( + System.getProperty("java.io.tmpdir"), + "still-must-not-exist-" + System.nanoTime() + ) + val url = missing.toURI.toURL + val result = URLFetchUtil.getInputStreamFromURL(url, retries = 0) + assert(result.isEmpty) + } Review Comment: Done in c1b3d299c6: the `retries = 0` test now uses a counting `URLStreamHandler` and asserts `openConnectionCount == 0`, proving the loop body never runs and no connection is opened. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/fetcher/URLFetchUtilSpec.scala: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.fetcher + +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path} + +class URLFetchUtilSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Fixtures — temp files reachable via the JVM's built-in `file:` URL handler + // --------------------------------------------------------------------------- + + private def freshTempFile(contents: String): Path = { + val path = Files.createTempFile("url-fetch-util-spec-", ".bin") + Files.write(path, contents.getBytes(StandardCharsets.UTF_8)) + path.toFile.deleteOnExit() + path + } + + private def fileUrl(path: Path): java.net.URL = path.toUri.toURL + + // --------------------------------------------------------------------------- + // Success path + // --------------------------------------------------------------------------- + + "URLFetchUtil.getInputStreamFromURL" should + "return Some(stream) carrying the URL's bytes on success" in { + val path = freshTempFile("hello-url-fetch") + val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path)) + assert(result.isDefined) + try { + val bytes = result.get.readAllBytes() + assert(new String(bytes, StandardCharsets.UTF_8) == "hello-url-fetch") + } finally { + result.foreach(_.close()) + } + } + + it should "return Some(stream) when explicit retries is supplied (>= 1)" in { + val path = freshTempFile("with-retries") + val result = URLFetchUtil.getInputStreamFromURL(fileUrl(path), retries = 3) + assert(result.isDefined) + try { + val bytes = result.get.readAllBytes() + assert(new String(bytes, StandardCharsets.UTF_8) == "with-retries") + } finally { + result.foreach(_.close()) + } + } + + // --------------------------------------------------------------------------- + // Failure path — non-existent file URL exhausts retries and returns None + // --------------------------------------------------------------------------- + + it should "return None when the URL never produces an input stream (default retries)" in { + val missing = new java.io.File( + System.getProperty("java.io.tmpdir"), + "this-file-must-not-exist-" + System.nanoTime() + ) + val url = missing.toURI.toURL + val result = URLFetchUtil.getInputStreamFromURL(url) + assert(result.isEmpty) + } + + it should "return None immediately when retries is 0 (loop iterates zero times)" in { + val missing = new java.io.File( + System.getProperty("java.io.tmpdir"), + "still-must-not-exist-" + System.nanoTime() + ) + val url = missing.toURI.toURL + val result = URLFetchUtil.getInputStreamFromURL(url, retries = 0) + assert(result.isEmpty) + } + + it should "return None after the requested number of retries on persistent failure" in { + val missing = new java.io.File( + System.getProperty("java.io.tmpdir"), + "absent-" + System.nanoTime() + ) + val url = missing.toURI.toURL + val result = URLFetchUtil.getInputStreamFromURL(url, retries = 2) + assert(result.isEmpty) + } Review Comment: Done in c1b3d299c6: the persistent-failure tests now assert the exact attempt count via a counting `URLStreamHandler` (`retries = 1 → 1`, `retries = 2 → 2`, default → 5), so a loop running the wrong number of times would now fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
