This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 308aa0f33bf188b90c923e896a817e7870f6b8b6
Author: Dream95 <[email protected]>
AuthorDate: Fri May 15 19:39:29 2026 +0800

    [improve][fn] make built-in connector reload incremental (#25773)
    
    Signed-off-by: Dream95 <[email protected]>
    (cherry picked from commit 02cab7abea62740d9a415f16fc45bcb6af963175)
---
 .../org/apache/pulsar/common/nar/FileUtils.java    |  31 +++++
 .../org/apache/pulsar/common/nar/NarUnpacker.java  |  33 +----
 .../pulsar/functions/worker/ConnectorsManager.java |  28 +++-
 .../ConnectorsManagerReloadConnectorsTest.java     |  82 ++++++++++++
 .../pulsar/functions/utils/io/Connector.java       |  26 ++++
 .../pulsar/functions/utils/io/ConnectorUtils.java  |  78 +++++++++++-
 .../functions/utils/io/ReloadConnectorsResult.java |  29 +++++
 .../utils/io/ConnectorUtilsReloadTest.java         | 141 +++++++++++++++++++++
 8 files changed, 407 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
index 7f313676345..55711850022 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
@@ -25,8 +25,11 @@
 package org.apache.pulsar.common.nar;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.zip.ZipEntry;
@@ -44,6 +47,34 @@ public class FileUtils {
 
     public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
 
+    /**
+     * Calculates an md5 sum of the specified file.
+     *
+     * @param file
+     *            to calculate the md5sum of
+     * @return the md5sum bytes
+     * @throws IOException
+     *             if cannot read file
+     */
+    public static byte[] calculateMd5sum(final File file) throws IOException {
+        try (final FileInputStream inputStream = new FileInputStream(file)) {
+            // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient 
for this use case
+            final MessageDigest md5 = MessageDigest.getInstance("md5");
+
+            final byte[] buffer = new byte[1024];
+            int read = inputStream.read(buffer);
+
+            while (read > -1) {
+                md5.update(buffer, 0, read);
+                read = inputStream.read(buffer);
+            }
+
+            return md5.digest();
+        } catch (NoSuchAlgorithmException nsae) {
+            throw new IllegalArgumentException(nsae);
+        }
+    }
+
     public static void ensureDirectoryExistAndCanReadAndWrite(final File dir) 
throws IOException {
         if (dir.exists() && !dir.isDirectory()) {
             throw new IOException(dir.getAbsolutePath() + " is not a 
directory");
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index ef802674b42..2ea35f64180 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -25,7 +25,6 @@ package org.apache.pulsar.common.nar;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -35,8 +34,6 @@ import java.nio.channels.FileLock;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.Base64;
 import java.util.Enumeration;
 import java.util.concurrent.ConcurrentHashMap;
@@ -77,7 +74,7 @@ public class NarUnpacker {
                 throw new IOException("Cannot create " + parentDirectory);
             }
         }
-        String md5Sum = 
Base64.getUrlEncoder().withoutPadding().encodeToString(calculateMd5sum(nar));
+        String md5Sum = 
Base64.getUrlEncoder().withoutPadding().encodeToString(FileUtils.calculateMd5sum(nar));
         // ensure that one process can extract the files
         File lockFile = new File(parentDirectory, "." + md5Sum + ".lock");
         // prevent OverlappingFileLockException by ensuring that one thread 
tries to create a lock in this JVM
@@ -171,32 +168,4 @@ public class NarUnpacker {
             }
         }
     }
-
-    /**
-     * Calculates an md5 sum of the specified file.
-     *
-     * @param file
-     *            to calculate the md5sum of
-     * @return the md5sum bytes
-     * @throws IOException
-     *             if cannot read file
-     */
-    protected static byte[] calculateMd5sum(final File file) throws 
IOException {
-        try (final FileInputStream inputStream = new FileInputStream(file)) {
-            // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient 
for this use case
-            final MessageDigest md5 = MessageDigest.getInstance("md5");
-
-            final byte[] buffer = new byte[1024];
-            int read = inputStream.read(buffer);
-
-            while (read > -1) {
-                md5.update(buffer, 0, read);
-                read = inputStream.read(buffer);
-            }
-
-            return md5.digest();
-        } catch (NoSuchAlgorithmException nsae) {
-            throw new IllegalArgumentException(nsae);
-        }
-    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 19d31d0f63b..a8ee6f3ce20 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.utils.io.Connector;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.ReloadConnectorsResult;
 
 @Slf4j
 public class ConnectorsManager implements AutoCloseable {
@@ -48,12 +50,16 @@ public class ConnectorsManager implements AutoCloseable {
     }
 
     private static TreeMap<String, Connector> createConnectors(WorkerConfig 
workerConfig) throws IOException {
-        boolean enableClassloading = 
workerConfig.getEnableClassloadingOfBuiltinFiles()
-                || 
ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+        boolean enableClassloading = isEnableClassloading(workerConfig);
         return 
ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(),
                 workerConfig.getNarExtractionDirectory(), enableClassloading);
     }
 
+    private static boolean isEnableClassloading(WorkerConfig workerConfig) {
+        return workerConfig.getEnableClassloadingOfBuiltinFiles()
+                || 
ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+    }
+
     @VisibleForTesting
     public void addConnector(String connectorType, Connector connector) {
         connectors.put(connectorType, connector);
@@ -89,9 +95,13 @@ public class ConnectorsManager implements AutoCloseable {
     }
 
     public void reloadConnectors(WorkerConfig workerConfig) throws IOException 
{
-        TreeMap<String, Connector> oldConnectors = connectors;
-        this.connectors = createConnectors(workerConfig);
-        closeConnectors(oldConnectors);
+        ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
+                this.connectors,
+                workerConfig.getConnectorsDirectory(),
+                workerConfig.getNarExtractionDirectory(),
+                isEnableClassloading(workerConfig));
+        this.connectors = reload.connectors();
+        closeConnectors(reload.connectorsToClose());
     }
 
     @Override
@@ -99,14 +109,18 @@ public class ConnectorsManager implements AutoCloseable {
         closeConnectors(connectors);
     }
 
-    private void closeConnectors(TreeMap<String, Connector> connectorMap) {
-        connectorMap.values().forEach(connector -> {
+    private void closeConnectors(Collection<Connector> connectors) {
+        connectors.forEach(connector -> {
             try {
                 connector.close();
             } catch (Exception e) {
                 log.warn("Failed to close connector", e);
             }
         });
+    }
+
+    private void closeConnectors(TreeMap<String, Connector> connectorMap) {
+        closeConnectors(connectorMap.values());
         connectorMap.clear();
     }
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java
new file mode 100644
index 00000000000..82c280f97ad
--- /dev/null
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static org.testng.Assert.assertSame;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.utils.io.Connector;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link ConnectorsManager#reloadConnectors(WorkerConfig)} for 
incremental reload behavior,
+ * ensuring unchanged connectors are reused instead of being recreated.
+ */
+public class ConnectorsManagerReloadConnectorsTest {
+
+    private static void writeMinimalNar(Path narPath, ConnectorDefinition def) 
throws IOException {
+        byte[] yaml = 
ObjectMapperFactory.getYamlMapper().getObjectMapper().writeValueAsBytes(def);
+        try (OutputStream os = Files.newOutputStream(narPath);
+                ZipOutputStream zos = new ZipOutputStream(os)) {
+            ZipEntry entry = new ZipEntry("META-INF/services/pulsar-io.yaml");
+            zos.putNextEntry(entry);
+            zos.write(yaml);
+            zos.closeEntry();
+        }
+    }
+
+    private static ConnectorDefinition sampleDefinition(String name) {
+        ConnectorDefinition def = new ConnectorDefinition();
+        def.setName(name);
+        def.setSinkClass("org.example.Sink");
+        def.setSourceClass("org.example.Source");
+        return def;
+    }
+
+    @Test
+    public void reloadWhenNarUnchangedReusesSameConnectorInstance() throws 
Exception {
+        Path dir = Files.createTempDirectory("mgr-conn-reload-");
+        Path nar = dir.resolve("c1.nar");
+        writeMinimalNar(nar, sampleDefinition("c-one"));
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setConnectorsDirectory(dir.toString());
+        
workerConfig.setNarExtractionDirectory(NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
+        workerConfig.setEnableClassloadingOfBuiltinFiles(false);
+
+        try (ConnectorsManager manager = new ConnectorsManager(workerConfig)) {
+            Connector before = manager.getConnector("c-one");
+            before.getConnectorFunctionPackage();
+
+            manager.reloadConnectors(workerConfig);
+
+            Connector after = manager.getConnector("c-one");
+            assertSame(after, before);
+            before.getConnectorFunctionPackage();
+        }
+    }
+
+}
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
index 5fcc22747c5..bff477b40cd 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
@@ -28,6 +28,8 @@ import 
org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
 
 public class Connector implements AutoCloseable {
     private final Path archivePath;
+    /** MD5 hex of archive file contents; empty when {@link #archivePath} is 
null (test doubles). */
+    private final String archiveMd5Hex;
     private final String narExtractionDirectory;
     private final boolean enableClassloading;
     private ValidatableFunctionPackage connectorFunctionPackage;
@@ -38,16 +40,40 @@ public class Connector implements AutoCloseable {
 
     public Connector(Path archivePath, ConnectorDefinition 
connectorDefinition, String narExtractionDirectory,
                      boolean enableClassloading) {
+        this(archivePath, connectorDefinition, narExtractionDirectory, 
enableClassloading, null);
+    }
+
+    /**
+     * @param precomputedArchiveMd5Hex MD5 hex of {@code archivePath} 
contents; if null and path is non-null,
+     *                                   the hash is computed once at 
construction time.
+     */
+    public Connector(Path archivePath, ConnectorDefinition 
connectorDefinition, String narExtractionDirectory,
+                     boolean enableClassloading, String 
precomputedArchiveMd5Hex) {
         this.archivePath = archivePath;
         this.connectorDefinition = connectorDefinition;
         this.narExtractionDirectory = narExtractionDirectory;
         this.enableClassloading = enableClassloading;
+        if (archivePath != null) {
+            try {
+                this.archiveMd5Hex = precomputedArchiveMd5Hex != null
+                        ? precomputedArchiveMd5Hex
+                        : ConnectorUtils.computeArchiveMd5Hex(archivePath);
+            } catch (java.io.IOException e) {
+                throw new java.io.UncheckedIOException(e);
+            }
+        } else {
+            this.archiveMd5Hex = "";
+        }
     }
 
     public Path getArchivePath() {
         return archivePath;
     }
 
+    public String getArchiveMd5Hex() {
+        return archiveMd5Hex;
+    }
+
     public synchronized ValidatableFunctionPackage 
getConnectorFunctionPackage() {
         checkState();
         if (connectorFunctionPackage == null) {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 71cab749ba0..d08a4942884 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -24,7 +24,9 @@ import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HexFormat;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +41,7 @@ import net.bytebuddy.description.type.TypeDefinition;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.io.ConfigFieldDefinition;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.FileUtils;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.utils.Exceptions;
 import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
@@ -52,7 +55,16 @@ import org.apache.pulsar.io.core.annotations.FieldDoc;
 @Slf4j
 public class ConnectorUtils {
 
-    private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";
+    /**
+     * Computes MD5 digest of a file as lower-case hex (for connector archive 
identity on reload).
+     */
+    public static String computeArchiveMd5Hex(Path path) throws IOException {
+        return calculateMd5Hex(path.toAbsolutePath().normalize().toFile());
+    }
+
+    private static String calculateMd5Hex(File file) throws IOException {
+        return HexFormat.of().formatHex(FileUtils.calculateMd5sum(file));
+    }
 
     /**
      * Extract the Pulsar IO Source class from a connector archive.
@@ -171,7 +183,8 @@ public class ConnectorUtils {
                 try {
                     ConnectorDefinition cntDef = 
ConnectorUtils.getConnectorDefinition(archive.toFile());
                     log.info("Found connector {} from {}", cntDef, archive);
-                    Connector connector = new Connector(archive, cntDef, 
narExtractionDirectory, enableClassloading);
+                    Connector connector = new Connector(archive, cntDef, 
narExtractionDirectory,
+                            enableClassloading);
                     connectors.put(cntDef.getName(), connector);
                 } catch (Throwable t) {
                     log.warn("Failed to load connector from {}", archive, t);
@@ -180,4 +193,65 @@ public class ConnectorUtils {
         }
         return connectors;
     }
+
+    /**
+     * Reloads connectors from disk against {@code previous}, reusing {@link 
Connector} instances when path and
+     * archive MD5 are unchanged (keeps class loaders open). New or changed 
archives get new instances.
+     * <p>
+     * {@link ReloadConnectorsResult#connectorsToClose()} lists connectors 
evicted from the active set (replaced or
+     * no longer present on disk); the caller must {@link Connector#close()} 
each (typically via
+     * {@code ConnectorsManager}).
+     *
+     * @param previous                 connectors from the previous scan (may 
be empty, never null)
+     * @param connectorsDirectory      same semantics as {@link 
#searchForConnectors}
+     * @param narExtractionDirectory   same semantics as {@link 
#searchForConnectors}
+     * @param enableClassloading       same semantics as {@link 
#searchForConnectors}
+     * @return new map keyed by connector name (reused values are identical 
instances from {@code previous}) and
+     *         connectors the caller should close
+     */
+    public static ReloadConnectorsResult reloadConnectors(
+            TreeMap<String, Connector> previous,
+            String connectorsDirectory,
+            String narExtractionDirectory,
+            boolean enableClassloading) throws IOException {
+
+        TreeMap<String, Connector> remaining = new TreeMap<>(previous);
+        TreeMap<String, Connector> next = new TreeMap<>();
+        List<Connector> toClose = new ArrayList<>();
+
+        Path dir = Paths.get(connectorsDirectory).toAbsolutePath().normalize();
+        if (!dir.toFile().exists()) {
+            toClose.addAll(remaining.values());
+            return new ReloadConnectorsResult(next, toClose);
+        }
+
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, 
"*.nar")) {
+            for (Path archive : stream) {
+                try {
+                    ConnectorDefinition cntDef = 
ConnectorUtils.getConnectorDefinition(archive.toFile());
+                    String name = cntDef.getName();
+                    String md5Hex = computeArchiveMd5Hex(archive);
+                    Connector prev = remaining.remove(name);
+                    if (prev != null
+                            && prev.getArchivePath() != null
+                            && archive.equals(prev.getArchivePath())
+                            && md5Hex.equals(prev.getArchiveMd5Hex())) {
+                        next.put(name, prev);
+                    } else {
+                        if (prev != null) {
+                            log.info("Reloading changed connector name={} 
archive={} previousArchive={}", name, archive,
+                                    prev.getArchivePath());
+                            toClose.add(prev);
+                        }
+                        next.put(name, new Connector(archive, cntDef, 
narExtractionDirectory, enableClassloading,
+                                md5Hex));
+                    }
+                } catch (Throwable t) {
+                    log.warn("Failed to load connector archive={}", archive, 
t);
+                }
+            }
+        }
+        toClose.addAll(remaining.values());
+        return new ReloadConnectorsResult(next, toClose);
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
new file mode 100644
index 00000000000..cdfab35692c
--- /dev/null
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.io;
+
+import java.util.List;
+import java.util.TreeMap;
+
+/**
+ * Result of {@link ConnectorUtils#reloadConnectors}: the new connector map 
and connectors evicted from the active set
+ * that the caller must close.
+ */
+public record ReloadConnectorsResult(TreeMap<String, Connector> connectors, 
List<Connector> connectorsToClose) {
+}
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
new file mode 100644
index 00000000000..07b6459369a
--- /dev/null
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.TreeMap;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.annotations.Test;
+
+@Test
+public class ConnectorUtilsReloadTest {
+
+    private static void closeEvicted(ReloadConnectorsResult reload) throws 
Exception {
+        for (Connector c : reload.connectorsToClose()) {
+            c.close();
+        }
+    }
+
+    private static void writeMinimalNar(Path narPath, ConnectorDefinition def) 
throws IOException {
+        byte[] yaml = 
ObjectMapperFactory.getYamlMapper().getObjectMapper().writeValueAsBytes(def);
+        try (OutputStream os = Files.newOutputStream(narPath);
+                ZipOutputStream zos = new ZipOutputStream(os)) {
+            ZipEntry entry = new ZipEntry("META-INF/services/pulsar-io.yaml");
+            zos.putNextEntry(entry);
+            zos.write(yaml);
+            zos.closeEntry();
+        }
+    }
+
+    private static ConnectorDefinition sampleDefinition(String name) {
+        ConnectorDefinition def = new ConnectorDefinition();
+        def.setName(name);
+        def.setSinkClass("org.example.Sink");
+        def.setSourceClass("org.example.Source");
+        return def;
+    }
+
+    /**
+     * Historical {@code ConnectorsManager} reload replaced the whole map and 
closed every prior
+     * {@link Connector}, even when NAR files were unchanged. A caller keeping 
a reference to the
+     * pre-reload connector would then hit {@link IllegalStateException} on 
lazy use.
+     * <p>
+     * Incremental reload must evict nothing, reuse the same instance, and 
leave that instance usable
+     * after the caller closes only {@link 
ReloadConnectorsResult#connectorsToClose()}.
+     */
+    @Test
+    public void reloadUnchangedNarEvictsNothingAndKeepsSameConnectorUsable() 
throws Exception {
+        Path dir = Files.createTempDirectory("conn-reload-");
+        Path nar = dir.resolve("c1.nar");
+        writeMinimalNar(nar, sampleDefinition("c-one"));
+
+        TreeMap<String, Connector> first =
+                ConnectorUtils.searchForConnectors(dir.toString(), 
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+        Connector c1 = first.get("c-one");
+        c1.getConnectorFunctionPackage();
+
+        ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
+                first, dir.toString(), 
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+        assertTrue(reload.connectorsToClose().isEmpty());
+        closeEvicted(reload);
+        TreeMap<String, Connector> second = reload.connectors();
+
+        assertSame(second.get("c-one"), c1);
+        c1.getConnectorFunctionPackage();
+    }
+
+    @Test
+    public void reloadReopensConnectorWhenNarContentChanges() throws Exception 
{
+        Path dir = Files.createTempDirectory("conn-reload-");
+        Path nar = dir.resolve("c1.nar");
+        writeMinimalNar(nar, sampleDefinition("c-one"));
+
+        TreeMap<String, Connector> first =
+                ConnectorUtils.searchForConnectors(dir.toString(), 
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+        Connector before = first.get("c-one");
+
+        ConnectorDefinition updated = sampleDefinition("c-one");
+        updated.setDescription("changed");
+        writeMinimalNar(nar, updated);
+
+        ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
+                first, dir.toString(), 
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+        closeEvicted(reload);
+        TreeMap<String, Connector> second = reload.connectors();
+
+        assertNotSame(second.get("c-one"), before);
+        assertThrows(IllegalStateException.class, 
before::getConnectorFunctionPackage);
+    }
+
+    @Test
+    public void reloadClosesConnectorsRemovedFromDirectory() throws Exception {
+        Path dir = Files.createTempDirectory("conn-reload-");
+        Path nar1 = dir.resolve("a.nar");
+        Path nar2 = dir.resolve("b.nar");
+        writeMinimalNar(nar1, sampleDefinition("conn-a"));
+        writeMinimalNar(nar2, sampleDefinition("conn-b"));
+
+        TreeMap<String, Connector> first =
+                ConnectorUtils.searchForConnectors(dir.toString(), 
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+        Connector removed = first.get("conn-b");
+        Files.delete(nar2);
+
+        ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
+                first, dir.toString(), 
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+        closeEvicted(reload);
+        TreeMap<String, Connector> second = reload.connectors();
+
+        assertEquals(second.size(), 1);
+        assertSame(second.get("conn-a"), first.get("conn-a"));
+        assertThrows(IllegalStateException.class, 
removed::getConnectorFunctionPackage);
+    }
+
+}

Reply via email to