This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 18dd778f18b6da50bcb6427d3fbbdca263c5043d Author: Dream95 <[email protected]> AuthorDate: Sat May 16 16:58:59 2026 +0800 [refactor][fn] Use Map instead of TreeMap for connector/function API types (#25790) Signed-off-by: Dream95 <[email protected]> (cherry picked from commit eba873d9c3926b806f5462af834867ccb692fdfb) --- .../main/java/org/apache/pulsar/functions/LocalRunner.java | 11 +++++------ .../apache/pulsar/functions/worker/ConnectorsManager.java | 7 ++++--- .../apache/pulsar/functions/worker/FunctionsManager.java | 9 +++++---- .../pulsar/functions/utils/functions/FunctionUtils.java | 3 ++- .../apache/pulsar/functions/utils/io/ConnectorUtils.java | 4 ++-- .../pulsar/functions/utils/io/ReloadConnectorsResult.java | 4 ++-- .../functions/utils/io/ConnectorUtilsReloadTest.java | 14 +++++++------- 7 files changed, 27 insertions(+), 25 deletions(-) diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 1e2d4e0b21b..bfb3eab6ff3 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -38,7 +38,6 @@ import java.util.Map; import java.util.Optional; import java.util.Timer; import java.util.TimerTask; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -724,7 +723,7 @@ public class LocalRunner implements AutoCloseable { private ClassLoader isBuiltInFunction(String functionType) throws IOException { // Validate the connector type from the locally available connectors - TreeMap<String, FunctionArchive> functions = getFunctions(); + Map<String, FunctionArchive> functions = getFunctions(); String functionName = functionType.replaceFirst("^builtin://", ""); FunctionArchive function = functions.get(functionName); @@ -738,7 +737,7 @@ public class LocalRunner implements AutoCloseable { private ClassLoader isBuiltInSource(String sourceType) throws IOException { // Validate the connector type from the locally available connectors - TreeMap<String, Connector> connectors = getConnectors(); + Map<String, Connector> connectors = getConnectors(); String source = sourceType.replaceFirst("^builtin://", ""); Connector connector = connectors.get(source); @@ -752,7 +751,7 @@ public class LocalRunner implements AutoCloseable { private ClassLoader isBuiltInSink(String sinkType) throws IOException { // Validate the connector type from the locally available connectors - TreeMap<String, Connector> connectors = getConnectors(); + Map<String, Connector> connectors = getConnectors(); String sink = sinkType.replaceFirst("^builtin://", ""); Connector connector = connectors.get(sink); @@ -764,11 +763,11 @@ public class LocalRunner implements AutoCloseable { } } - private TreeMap<String, FunctionArchive> getFunctions() throws IOException { + private Map<String, FunctionArchive> getFunctions() throws IOException { return FunctionUtils.searchForFunctions(functionsDir, narExtractionDirectory, true); } - private TreeMap<String, Connector> getConnectors() throws IOException { + private Map<String, Connector> getConnectors() throws IOException { return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory, true); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java index a8ee6f3ce20..74435f99bcf 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; import lombok.Getter; @@ -38,7 +39,7 @@ import org.apache.pulsar.functions.utils.io.ReloadConnectorsResult; public class ConnectorsManager implements AutoCloseable { @Getter - private volatile TreeMap<String, Connector> connectors; + private volatile Map<String, Connector> connectors; @VisibleForTesting public ConnectorsManager() { @@ -49,7 +50,7 @@ public class ConnectorsManager implements AutoCloseable { this.connectors = createConnectors(workerConfig); } - private static TreeMap<String, Connector> createConnectors(WorkerConfig workerConfig) throws IOException { + private static Map<String, Connector> createConnectors(WorkerConfig workerConfig) throws IOException { boolean enableClassloading = isEnableClassloading(workerConfig); return ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory(), enableClassloading); @@ -119,7 +120,7 @@ public class ConnectorsManager implements AutoCloseable { }); } - private void closeConnectors(TreeMap<String, Connector> connectorMap) { + private void closeConnectors(Map<String, Connector> connectorMap) { closeConnectors(connectorMap.values()); connectorMap.clear(); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java index 5ab7ff7221a..cdd77249502 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.file.Path; import java.util.List; +import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -32,7 +33,7 @@ import org.apache.pulsar.functions.utils.functions.FunctionUtils; @Slf4j public class FunctionsManager implements AutoCloseable { - private TreeMap<String, FunctionArchive> functions; + private Map<String, FunctionArchive> functions; @VisibleForTesting public FunctionsManager() { @@ -61,12 +62,12 @@ public class FunctionsManager implements AutoCloseable { } public void reloadFunctions(WorkerConfig workerConfig) throws IOException { - TreeMap<String, FunctionArchive> oldFunctions = functions; + Map<String, FunctionArchive> oldFunctions = functions; this.functions = createFunctions(workerConfig); closeFunctions(oldFunctions); } - private static TreeMap<String, FunctionArchive> createFunctions(WorkerConfig workerConfig) throws IOException { + private static Map<String, FunctionArchive> createFunctions(WorkerConfig workerConfig) throws IOException { boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles() || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName()); return FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory(), @@ -79,7 +80,7 @@ public class FunctionsManager implements AutoCloseable { closeFunctions(functions); } - private void closeFunctions(TreeMap<String, FunctionArchive> functionMap) { + private void closeFunctions(Map<String, FunctionArchive> functionMap) { functionMap.values().forEach(functionArchive -> { try { functionArchive.close(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java index 24f801f3da2..f4d45edf363 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java @@ -25,6 +25,7 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Map; import java.util.TreeMap; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -75,7 +76,7 @@ public class FunctionUtils { .readValue(narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME), valueType); } - public static TreeMap<String, FunctionArchive> searchForFunctions(String functionsDirectory, + public static Map<String, FunctionArchive> searchForFunctions(String functionsDirectory, String narExtractionDirectory, boolean enableClassloading) throws IOException { Path path = Paths.get(functionsDirectory).toAbsolutePath().normalize(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java index d08a4942884..0ff7e9427b3 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java @@ -165,7 +165,7 @@ public class ConnectorUtils { return fields; } - public static TreeMap<String, Connector> searchForConnectors(String connectorsDirectory, + public static Map<String, Connector> searchForConnectors(String connectorsDirectory, String narExtractionDirectory, boolean enableClassloading) throws IOException { Path path = Paths.get(connectorsDirectory).toAbsolutePath().normalize(); @@ -210,7 +210,7 @@ public class ConnectorUtils { * connectors the caller should close */ public static ReloadConnectorsResult reloadConnectors( - TreeMap<String, Connector> previous, + Map<String, Connector> previous, String connectorsDirectory, String narExtractionDirectory, boolean enableClassloading) throws IOException { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java index cdfab35692c..b46aa38fead 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java @@ -19,11 +19,11 @@ package org.apache.pulsar.functions.utils.io; import java.util.List; -import java.util.TreeMap; +import java.util.Map; /** * Result of {@link ConnectorUtils#reloadConnectors}: the new connector map and connectors evicted from the active set * that the caller must close. */ -public record ReloadConnectorsResult(TreeMap<String, Connector> connectors, List<Connector> connectorsToClose) { +public record ReloadConnectorsResult(Map<String, Connector> connectors, List<Connector> connectorsToClose) { } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java index 07b6459369a..0e05912d8c9 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java @@ -27,7 +27,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.TreeMap; +import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -77,7 +77,7 @@ public class ConnectorUtilsReloadTest { Path nar = dir.resolve("c1.nar"); writeMinimalNar(nar, sampleDefinition("c-one")); - TreeMap<String, Connector> first = + Map<String, Connector> first = ConnectorUtils.searchForConnectors(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); Connector c1 = first.get("c-one"); c1.getConnectorFunctionPackage(); @@ -86,7 +86,7 @@ public class ConnectorUtilsReloadTest { first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); assertTrue(reload.connectorsToClose().isEmpty()); closeEvicted(reload); - TreeMap<String, Connector> second = reload.connectors(); + Map<String, Connector> second = reload.connectors(); assertSame(second.get("c-one"), c1); c1.getConnectorFunctionPackage(); @@ -98,7 +98,7 @@ public class ConnectorUtilsReloadTest { Path nar = dir.resolve("c1.nar"); writeMinimalNar(nar, sampleDefinition("c-one")); - TreeMap<String, Connector> first = + Map<String, Connector> first = ConnectorUtils.searchForConnectors(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); Connector before = first.get("c-one"); @@ -109,7 +109,7 @@ public class ConnectorUtilsReloadTest { ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors( first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); closeEvicted(reload); - TreeMap<String, Connector> second = reload.connectors(); + Map<String, Connector> second = reload.connectors(); assertNotSame(second.get("c-one"), before); assertThrows(IllegalStateException.class, before::getConnectorFunctionPackage); @@ -123,7 +123,7 @@ public class ConnectorUtilsReloadTest { writeMinimalNar(nar1, sampleDefinition("conn-a")); writeMinimalNar(nar2, sampleDefinition("conn-b")); - TreeMap<String, Connector> first = + Map<String, Connector> first = ConnectorUtils.searchForConnectors(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); Connector removed = first.get("conn-b"); Files.delete(nar2); @@ -131,7 +131,7 @@ public class ConnectorUtilsReloadTest { ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors( first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); closeEvicted(reload); - TreeMap<String, Connector> second = reload.connectors(); + Map<String, Connector> second = reload.connectors(); assertEquals(second.size(), 1); assertSame(second.get("conn-a"), first.get("conn-a"));
