This is an automated email from the ASF dual-hosted git repository. amichair pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
commit 50fa0de23d47acc29d0e23bab3729f22be2919e4 Author: Amichai Rothman <[email protected]> AuthorDate: Sun Mar 29 19:55:32 2026 +0300 ARIES-2219 Fix imported services not reflecting updated service properties --- .../importer/TopologyManagerImport.java | 49 ++++++++++++++++++---- .../importer/TopologyManagerImportTest.java | 41 +++++++++++++++--- 2 files changed, 76 insertions(+), 14 deletions(-) diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java index b57d494b..873f6415 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -19,12 +19,16 @@ package org.apache.aries.rsa.topologymanager.importer; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; @@ -132,27 +136,42 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi try { // we have a set of all current imports, and a set of all possible imports (with overlap) Set<ImportRegistration> imported = importedServices.get(filter); - Set<EndpointDescription> possible = importPossibilities.get(filter); - // first we iterate over all current imports, and split them into two groups: + Set<EndpointDescription> possibleSet = importPossibilities.get(filter); + Map<EndpointDescription, EndpointDescription> possible = possibleSet.stream() + .collect(Collectors.toMap(e -> e, e -> e)); // convert to map for getting the value + // first we iterate over all current imports, and split them into three groups: // - still valid (no null references) and possible (in possible set) + // - still valid and possible but with changed properties // - invalid (contain null references) or no longer possible (not in possible set) // note that this part should be concurrency-safe (get every reference only once and don't modify anything) Set<EndpointDescription> valid = new HashSet<>(); // imports that are still valid and possible Set<ImportRegistration> invalid = new LinkedHashSet<>(); // imports that are no longer valid/possible + Map<ImportRegistration, EndpointDescription> updated = new LinkedHashMap<>(); // valid with changed props for (ImportRegistration reg : imported) { ImportReference ref = reg.getImportReference(); EndpointDescription endpoint = ref == null ? null : ref.getImportedEndpoint(); // check if the currently imported endpoint is still valid and possible - if (endpoint != null && possible.contains(endpoint)) { - valid.add(endpoint); // valid and possible + EndpointDescription pe = possible.get(endpoint); // get the new (maybe modified) possible endpoint + if (pe != null) { + if (getChangedProps(endpoint.getProperties(), pe.getProperties()).isEmpty()) + valid.add(endpoint); // valid and possible + else + updated.put(reg, pe); // valid and possible and changed properties } else { invalid.add(reg); // invalid (reg or ref or endpoint is null) or no longer possible } } - // now that we figured out what needs to be done, apply the changes + // now that we figured out what needs to be done, apply the changes to each group invalid.forEach(this::unimportRegistration); // remove invalid/non-possible imports - possible.forEach(endpoint -> { // import all possible endpoints that are not already imported - if (!valid.contains(endpoint)) { + updated.forEach((reg, e) -> { // update modified properties for existing imports + try { + reg.update(e); + } catch (IllegalStateException ise) { + LOG.warn("can't update closed endpoint {}", e, ise); + } + }); + possible.keySet().forEach(endpoint -> { // import all possible endpoints that are not already imported + if (!valid.contains(endpoint) && !updated.containsValue(endpoint)) { importService(filter, endpoint); } }); @@ -188,7 +207,21 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi importedServices.remove(reg); reg.close(); } - + + private static Set<String> getChangedProps(Map<String, Object> p1, Map<String, Object> p2) { + Set<String> changed = new LinkedHashSet<>(); + for (Map.Entry<String, Object> entry : p1.entrySet()) { + Object v = p2.get(entry.getKey()); + if (!Objects.deepEquals(entry.getValue(), v) || v == null && !p2.containsKey(entry.getKey())) + changed.add(entry.getKey()); + } + for (String k : p2.keySet()) { + if (!p1.containsKey(k)) + changed.add(k); + } + return changed; + } + @Override public void endpointChanged(EndpointEvent event, String filter) { if (stopped) { diff --git a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java index 857729a9..af29da5c 100644 --- a/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java +++ b/topology-manager/src/test/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImportTest.java @@ -22,6 +22,7 @@ import static org.easymock.EasyMock.*; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -31,11 +32,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; import org.osgi.framework.ServiceRegistration; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; import org.osgi.service.remoteserviceadmin.ImportReference; import org.osgi.service.remoteserviceadmin.ImportRegistration; +import org.osgi.service.remoteserviceadmin.RemoteConstants; import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; import org.osgi.service.remoteserviceadmin.RemoteServiceAdminListener; @@ -52,19 +55,26 @@ public class TopologyManagerImportTest { return bc; } - private ImportRegistration mockImportRegistration(IMocksControl c, EndpointDescription endpoint) { + private ImportRegistration mockImportRegistration(IMocksControl c, EndpointDescription endpoint, boolean expectUpdate) { final ImportRegistration ireg = c.createMock(ImportRegistration.class); expect(ireg.getException()).andReturn(null).anyTimes(); - expect(ireg.update(anyObject())).andReturn(true).anyTimes(); + if (expectUpdate) { + expect(ireg.update(anyObject())).andReturn(true); + } ImportReference iref = c.createMock(ImportReference.class); expect(ireg.getImportReference()).andReturn(iref).anyTimes(); expect(iref.getImportedEndpoint()).andReturn(endpoint).anyTimes(); return ireg; } - private EndpointDescription createEndpoint() { - EndpointDescription endpoint = c.createMock(EndpointDescription.class); - final ImportRegistration ir = mockImportRegistration(c, endpoint); + private EndpointDescription createEndpoint(boolean expectUpdate, String id) { + HashMap<String, Object> props = new HashMap<>(); + props.put(RemoteConstants.ENDPOINT_ID, id); + props.put(Constants.OBJECTCLASS, new String[]{String.class.getName()}); + props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "config1"); + EndpointDescription endpoint = new EndpointDescription(props); + + final ImportRegistration ir = mockImportRegistration(c, endpoint, expectUpdate); ir.close(); // must be closed expectLastCall().andAnswer(() -> { endpoints.get(endpoint).decrementAndGet(); @@ -78,6 +88,10 @@ public class TopologyManagerImportTest { return endpoint; } + private EndpointDescription createEndpoint() { + return createEndpoint(false, "id1"); + } + IMocksControl c; BundleContext bc; RemoteServiceAdmin rsa; @@ -144,7 +158,7 @@ public class TopologyManagerImportTest { tm.endpointChanged(event, "myFilter"); assertImports(endpoint, 1); tm.endpointChanged(event, "myFilter"); - assertImports(endpoint, 1); // still one one import + assertImports(endpoint, 1); // still one import } @Test @@ -158,4 +172,19 @@ public class TopologyManagerImportTest { tm.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint), "myFilter"); assertImports(endpoint, 0); } + + @Test + public void testModifyEndpoint() throws InterruptedException { + EndpointDescription endpoint = createEndpoint(true, "id1"); + start(); + + tm.add(rsa); + tm.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), "myFilter"); + assertImports(endpoint, 1); + Map<String, Object> props = new HashMap<>(endpoint.getProperties()); + props.put("newProp", "newValue"); + endpoint = new EndpointDescription(props); + tm.endpointChanged(new EndpointEvent(EndpointEvent.MODIFIED, endpoint), "myFilter"); + assertImports(endpoint, 1); + } }
