lhotari commented on code in PR #25773:
URL: https://github.com/apache/pulsar/pull/25773#discussion_r3246742126
##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -186,4 +199,66 @@ public static TreeMap<String, Connector>
searchForConnectors(String connectorsDi
}
return connectors;
}
+
+ /**
+ * Reloads connectors from disk against {@code previous}, reusing {@link
Connector} instances when path and
+ * archive MD5 are unchanged (keeps class loaders open). New or changed
archives get new instances.
+ * <p>
+ * {@link ReloadConnectorsResult#connectorsToClose()} lists connectors
evicted from the active set (replaced or
+ * no longer present on disk); the caller must {@link Connector#close()}
each (typically via
+ * {@code ConnectorsManager}).
+ *
+ * @param previous connectors from the previous scan (may
be empty, never null)
+ * @param connectorsDirectory same semantics as {@link
#searchForConnectors}
+ * @param narExtractionDirectory same semantics as {@link
#searchForConnectors}
+ * @param enableClassloading same semantics as {@link
#searchForConnectors}
+ * @return new map keyed by connector name (reused values are identical
instances from {@code previous}) and
+ * connectors the caller should close
+ */
+ public static ReloadConnectorsResult reloadConnectors(
+ TreeMap<String, Connector> previous,
+ String connectorsDirectory,
+ String narExtractionDirectory,
+ boolean enableClassloading) throws IOException {
+
+ TreeMap<String, Connector> remaining = new TreeMap<>(previous);
+ TreeMap<String, Connector> next = new TreeMap<>();
+ List<Connector> toClose = new ArrayList<>();
+
+ Path dir = Paths.get(connectorsDirectory).toAbsolutePath().normalize();
+ if (!dir.toFile().exists()) {
+ toClose.addAll(remaining.values());
+ return new ReloadConnectorsResult(next, toClose);
+ }
+
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir,
"*.nar")) {
+ for (Path archive : stream) {
+ try {
+ ConnectorDefinition cntDef =
ConnectorUtils.getConnectorDefinition(archive.toFile());
+ String name = cntDef.getName();
+ String md5Hex = computeArchiveMd5Hex(archive);
+ Connector prev = remaining.remove(name);
+ if (prev != null
+ && prev.getArchivePath() != null
+ && archive.equals(prev.getArchivePath())
+ && md5Hex.equals(prev.getArchiveMd5Hex())) {
+ next.put(name, prev);
Review Comment:
add logging about finding a new connector (to be consistent with existing
searchForConnectors)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]