This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new e62fe7c40 [#2068] feat: Introduce PortRegistry and applied to JettyServerTest. (#2070) e62fe7c40 is described below commit e62fe7c40cbaca139676f851efc4e420098eeeeb Author: maobaolong <baoloong...@tencent.com> AuthorDate: Wed Aug 28 14:17:45 2024 +0800 [#2068] feat: Introduce PortRegistry and applied to JettyServerTest. (#2070) ### What changes were proposed in this pull request? Introduce PortRegistry and applied to JettyServerTest ### Why are the changes needed? Fix: #2068 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. --- .../apache/uniffle/common/port/PortRegistry.java | 178 +++++++++++++++++++++ .../apache/uniffle/common/web/JettyServerTest.java | 21 ++- 2 files changed, 196 insertions(+), 3 deletions(-) diff --git a/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java b/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java new file mode 100644 index 000000000..2a02390a4 --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.port; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.ServerSocket; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Registry for reserving ports during tests. + * + * <p>The registry reserves ports by taking file locks on files in the port coordination directory. + * This doesn't prevent external processes from stealing our ports, but it will prevent us from + * conflicting with ourselves. We can then run tests in a dockerized environment to completely + * prevent conflicts. + * + * <p>The default coordination directory is determined by the "user.dir" jvm property. The + * coordination directory can be overridden by setting the UNIFFLE_PORT_COORDINATION_DIR environment + * variable. + */ +public final class PortRegistry { + private static final String PORT_COORDINATION_DIR_PROPERTY = "UNIFFLE_PORT_COORDINATION_DIR"; + + private static final Registry INSTANCE = new Registry(); + + private PortRegistry() {} // Class should not be instantiated. + + /** + * Reserves a free port so that other tests will not take it. + * + * @return the free port + */ + public static int reservePort() { + return INSTANCE.reservePort(); + } + + /** @param port the port to release */ + public static void release(int port) { + INSTANCE.release(port); + } + + /** Clears the registry. */ + public static void clear() { + INSTANCE.clear(); + } + + /** + * @return a port that is currently free. This does not reserve the port, so the port may be taken + * by the time this method returns. + */ + public static int getFreePort() { + int port; + try { + ServerSocket socket = new ServerSocket(0); + port = socket.getLocalPort(); + socket.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return port; + } + + private static class Registry { + // Map from port number to the reservation for that port. + private final Map<Integer, Reservation> reserved = new ConcurrentHashMap<>(); + private final File coordinationDir; + + private Registry() { + String dir = System.getenv(PORT_COORDINATION_DIR_PROPERTY); + if (dir == null) { + dir = System.getProperty("user.dir"); + } + coordinationDir = new File(dir, ".port_coordination"); + coordinationDir.mkdirs(); + } + + /** + * Reserves a free port so that other tests will not take it. + * + * @return the free port + */ + public int reservePort() { + for (int i = 0; i < 1000; i++) { + int port = getFreePort(); + if (lockPort(port)) { + return port; + } + } + throw new RuntimeException("Failed to acquire port"); + } + + /** + * Attempts to lock the given port. + * + * @param port the port to lock + * @return whether the locking succeeded + */ + public boolean lockPort(int port) { + File portFile = portFile(port); + try { + FileChannel channel = new RandomAccessFile(portFile, "rw").getChannel(); + FileLock lock = channel.tryLock(); + if (lock == null) { + channel.close(); + return false; + } + reserved.put(port, new Reservation(portFile, lock)); + return true; + } catch (IOException | OverlappingFileLockException e) { + return false; + } + } + + /** @param port the port to release */ + public void release(int port) { + Reservation r = reserved.remove(port); + if (r != null) { + // If delete fails, we may leave a file behind. However, the file will be unlocked, so + // another process can still take the port. + r.file.delete(); + try { + r.lock.release(); + r.lock.channel().close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + /** Clears the registry. */ + public void clear() { + new HashSet<>(reserved.keySet()).forEach(this::release); + } + + /** + * Creates a file in coordination dir to lock the port. + * + * @param port the port to lock + * @return the created file + */ + public File portFile(int port) { + return new File(coordinationDir, Integer.toString(port)); + } + + /** Resources used to reserve a port. */ + private static class Reservation { + private final File file; + private final FileLock lock; + + private Reservation(File file, FileLock lock) { + this.file = file; + this.lock = lock; + } + } + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java b/common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java index b981c2091..a168f111f 100644 --- a/common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java +++ b/common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java @@ -24,9 +24,12 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.port.PortRegistry; import org.apache.uniffle.common.util.ExitUtils; import org.apache.uniffle.common.util.ExitUtils.ExitException; @@ -35,10 +38,22 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class JettyServerTest { + int port; + + @BeforeEach + public void beforeEach() { + port = PortRegistry.reservePort(); + } + + @AfterEach + public void afterEach() { + PortRegistry.release(port); + } + @Test public void jettyServerTest() throws FileNotFoundException { RssBaseConf conf = new RssBaseConf(); - conf.setString("rss.jetty.http.port", "9527"); + conf.setInteger("rss.jetty.http.port", port); JettyServer jettyServer = new JettyServer(conf); Server server = jettyServer.getServer(); @@ -50,7 +65,7 @@ public class JettyServerTest { assertEquals(server, server.getHandler().getServer()); assertTrue(server.getConnectors()[0] instanceof ServerConnector); ServerConnector connector = (ServerConnector) server.getConnectors()[0]; - assertEquals(9527, connector.getPort()); + assertEquals(port, connector.getPort()); assertEquals(1, server.getHandlers().length); Handler handler = server.getHandler(); @@ -60,7 +75,7 @@ public class JettyServerTest { @Test public void jettyServerStartTest() throws Exception { RssBaseConf conf = new RssBaseConf(); - conf.setString("rss.jetty.http.port", "9527"); + conf.setInteger("rss.jetty.http.port", port); JettyServer jettyServer1 = new JettyServer(conf); JettyServer jettyServer2 = new JettyServer(conf); jettyServer1.start();