This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eba873d9c39 [refactor][fn] Use Map instead of TreeMap for
connector/function API types (#25790)
eba873d9c39 is described below
commit eba873d9c3926b806f5462af834867ccb692fdfb
Author: Dream95 <[email protected]>
AuthorDate: Sat May 16 16:58:59 2026 +0800
[refactor][fn] Use Map instead of TreeMap for connector/function API types
(#25790)
Signed-off-by: Dream95 <[email protected]>
---
.../main/java/org/apache/pulsar/functions/LocalRunner.java | 11 +++++------
.../apache/pulsar/functions/worker/ConnectorsManager.java | 7 ++++---
.../apache/pulsar/functions/worker/FunctionsManager.java | 9 +++++----
.../pulsar/functions/utils/functions/FunctionUtils.java | 3 ++-
.../apache/pulsar/functions/utils/io/ConnectorUtils.java | 4 ++--
.../pulsar/functions/utils/io/ReloadConnectorsResult.java | 4 ++--
.../functions/utils/io/ConnectorUtilsReloadTest.java | 14 +++++++-------
7 files changed, 27 insertions(+), 25 deletions(-)
diff --git
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 53a98b084ae..f7f524aa495 100644
---
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -38,7 +38,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -730,7 +729,7 @@ public class LocalRunner implements AutoCloseable {
private ClassLoader isBuiltInFunction(String functionType) throws
IOException {
// Validate the connector type from the locally available connectors
- TreeMap<String, FunctionArchive> functions = getFunctions();
+ Map<String, FunctionArchive> functions = getFunctions();
String functionName = functionType.replaceFirst("^builtin://", "");
FunctionArchive function = functions.get(functionName);
@@ -744,7 +743,7 @@ public class LocalRunner implements AutoCloseable {
private ClassLoader isBuiltInSource(String sourceType) throws IOException {
// Validate the connector type from the locally available connectors
- TreeMap<String, Connector> connectors = getConnectors();
+ Map<String, Connector> connectors = getConnectors();
String source = sourceType.replaceFirst("^builtin://", "");
Connector connector = connectors.get(source);
@@ -758,7 +757,7 @@ public class LocalRunner implements AutoCloseable {
private ClassLoader isBuiltInSink(String sinkType) throws IOException {
// Validate the connector type from the locally available connectors
- TreeMap<String, Connector> connectors = getConnectors();
+ Map<String, Connector> connectors = getConnectors();
String sink = sinkType.replaceFirst("^builtin://", "");
Connector connector = connectors.get(sink);
@@ -770,11 +769,11 @@ public class LocalRunner implements AutoCloseable {
}
}
- private TreeMap<String, FunctionArchive> getFunctions() throws IOException
{
+ private Map<String, FunctionArchive> getFunctions() throws IOException {
return FunctionUtils.searchForFunctions(functionsDir,
narExtractionDirectory, true);
}
- private TreeMap<String, Connector> getConnectors() throws IOException {
+ private Map<String, Connector> getConnectors() throws IOException {
return ConnectorUtils.searchForConnectors(connectorsDir,
narExtractionDirectory, true);
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 79c56823c5b..c6bca860297 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import lombok.CustomLog;
@@ -38,7 +39,7 @@ import
org.apache.pulsar.functions.utils.io.ReloadConnectorsResult;
public class ConnectorsManager implements AutoCloseable {
@Getter
- private volatile TreeMap<String, Connector> connectors;
+ private volatile Map<String, Connector> connectors;
@VisibleForTesting
public ConnectorsManager() {
@@ -49,7 +50,7 @@ public class ConnectorsManager implements AutoCloseable {
this.connectors = createConnectors(workerConfig);
}
- private static TreeMap<String, Connector> createConnectors(WorkerConfig
workerConfig) throws IOException {
+ private static Map<String, Connector> createConnectors(WorkerConfig
workerConfig) throws IOException {
boolean enableClassloading = isEnableClassloading(workerConfig);
return
ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(),
workerConfig.getNarExtractionDirectory(), enableClassloading);
@@ -119,7 +120,7 @@ public class ConnectorsManager implements AutoCloseable {
});
}
- private void closeConnectors(TreeMap<String, Connector> connectorMap) {
+ private void closeConnectors(Map<String, Connector> connectorMap) {
closeConnectors(connectorMap.values());
connectorMap.clear();
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
index 98aff915417..c15b7989b03 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
+import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import lombok.CustomLog;
@@ -32,7 +33,7 @@ import
org.apache.pulsar.functions.utils.functions.FunctionUtils;
@CustomLog
public class FunctionsManager implements AutoCloseable {
- private TreeMap<String, FunctionArchive> functions;
+ private Map<String, FunctionArchive> functions;
@VisibleForTesting
public FunctionsManager() {
@@ -61,12 +62,12 @@ public class FunctionsManager implements AutoCloseable {
}
public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
- TreeMap<String, FunctionArchive> oldFunctions = functions;
+ Map<String, FunctionArchive> oldFunctions = functions;
this.functions = createFunctions(workerConfig);
closeFunctions(oldFunctions);
}
- private static TreeMap<String, FunctionArchive>
createFunctions(WorkerConfig workerConfig) throws IOException {
+ private static Map<String, FunctionArchive> createFunctions(WorkerConfig
workerConfig) throws IOException {
boolean enableClassloading =
workerConfig.getEnableClassloadingOfBuiltinFiles()
||
ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
return
FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory(),
@@ -79,7 +80,7 @@ public class FunctionsManager implements AutoCloseable {
closeFunctions(functions);
}
- private void closeFunctions(TreeMap<String, FunctionArchive> functionMap) {
+ private void closeFunctions(Map<String, FunctionArchive> functionMap) {
functionMap.values().forEach(functionArchive -> {
try {
functionArchive.close();
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
index 86280883999..d03ba6253c8 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
@@ -25,6 +25,7 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Map;
import java.util.TreeMap;
import lombok.CustomLog;
import lombok.experimental.UtilityClass;
@@ -75,7 +76,7 @@ public class FunctionUtils {
.readValue(narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME),
valueType);
}
- public static TreeMap<String, FunctionArchive> searchForFunctions(String
functionsDirectory,
+ public static Map<String, FunctionArchive> searchForFunctions(String
functionsDirectory,
String
narExtractionDirectory,
boolean
enableClassloading) throws IOException {
Path path = Paths.get(functionsDirectory).toAbsolutePath().normalize();
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 7595a17b48c..c263c99088e 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -165,7 +165,7 @@ public class ConnectorUtils {
return fields;
}
- public static TreeMap<String, Connector> searchForConnectors(String
connectorsDirectory,
+ public static Map<String, Connector> searchForConnectors(String
connectorsDirectory,
String
narExtractionDirectory,
boolean
enableClassloading) throws IOException {
Path path =
Paths.get(connectorsDirectory).toAbsolutePath().normalize();
@@ -216,7 +216,7 @@ public class ConnectorUtils {
* connectors the caller should close
*/
public static ReloadConnectorsResult reloadConnectors(
- TreeMap<String, Connector> previous,
+ Map<String, Connector> previous,
String connectorsDirectory,
String narExtractionDirectory,
boolean enableClassloading) throws IOException {
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
index cdfab35692c..b46aa38fead 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
@@ -19,11 +19,11 @@
package org.apache.pulsar.functions.utils.io;
import java.util.List;
-import java.util.TreeMap;
+import java.util.Map;
/**
* Result of {@link ConnectorUtils#reloadConnectors}: the new connector map
and connectors evicted from the active set
* that the caller must close.
*/
-public record ReloadConnectorsResult(TreeMap<String, Connector> connectors,
List<Connector> connectorsToClose) {
+public record ReloadConnectorsResult(Map<String, Connector> connectors,
List<Connector> connectorsToClose) {
}
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
index 07b6459369a..0e05912d8c9 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
@@ -27,7 +27,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.TreeMap;
+import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -77,7 +77,7 @@ public class ConnectorUtilsReloadTest {
Path nar = dir.resolve("c1.nar");
writeMinimalNar(nar, sampleDefinition("c-one"));
- TreeMap<String, Connector> first =
+ Map<String, Connector> first =
ConnectorUtils.searchForConnectors(dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
Connector c1 = first.get("c-one");
c1.getConnectorFunctionPackage();
@@ -86,7 +86,7 @@ public class ConnectorUtilsReloadTest {
first, dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
assertTrue(reload.connectorsToClose().isEmpty());
closeEvicted(reload);
- TreeMap<String, Connector> second = reload.connectors();
+ Map<String, Connector> second = reload.connectors();
assertSame(second.get("c-one"), c1);
c1.getConnectorFunctionPackage();
@@ -98,7 +98,7 @@ public class ConnectorUtilsReloadTest {
Path nar = dir.resolve("c1.nar");
writeMinimalNar(nar, sampleDefinition("c-one"));
- TreeMap<String, Connector> first =
+ Map<String, Connector> first =
ConnectorUtils.searchForConnectors(dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
Connector before = first.get("c-one");
@@ -109,7 +109,7 @@ public class ConnectorUtilsReloadTest {
ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
first, dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
closeEvicted(reload);
- TreeMap<String, Connector> second = reload.connectors();
+ Map<String, Connector> second = reload.connectors();
assertNotSame(second.get("c-one"), before);
assertThrows(IllegalStateException.class,
before::getConnectorFunctionPackage);
@@ -123,7 +123,7 @@ public class ConnectorUtilsReloadTest {
writeMinimalNar(nar1, sampleDefinition("conn-a"));
writeMinimalNar(nar2, sampleDefinition("conn-b"));
- TreeMap<String, Connector> first =
+ Map<String, Connector> first =
ConnectorUtils.searchForConnectors(dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
Connector removed = first.get("conn-b");
Files.delete(nar2);
@@ -131,7 +131,7 @@ public class ConnectorUtilsReloadTest {
ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
first, dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
closeEvicted(reload);
- TreeMap<String, Connector> second = reload.connectors();
+ Map<String, Connector> second = reload.connectors();
assertEquals(second.size(), 1);
assertSame(second.get("conn-a"), first.get("conn-a"));