This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
The following commit(s) were added to refs/heads/master by this push: new a00d663 ARIES-1944 - Make sure closed ImportRegistrations are removed (#40) a00d663 is described below commit a00d663c9586990eaea35e066cdc11390adfe913 Author: Christian Schneider <ch...@die-schneider.net> AuthorDate: Wed Nov 20 13:30:03 2019 +0100 ARIES-1944 - Make sure closed ImportRegistrations are removed (#40) --- .../rsa/topologymanager/importer/ImportDiff.java | 19 +++++++++++----- .../importer/TopologyManagerImport.java | 25 +++++++++++----------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java index 4f7dc72..468d235 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java @@ -37,19 +37,28 @@ public class ImportDiff { this.imported = imported; } - public Stream<ImportReference> getRemoved() { + public Stream<ImportRegistration> getRemoved() { return imported.stream() - .map(ImportRegistration::getImportReference) - .filter(Objects::nonNull) - .filter(ir -> !possible.contains(ir.getImportedEndpoint())); + .filter(this::toRemove); } - + public Stream<EndpointDescription> getAdded() { Set<EndpointDescription> importedEndpoints = importedEndpoints(); return possible.stream() .filter(not(importedEndpoints::contains)); } + /** + * Checks if the import registration is not possible anymore or closed + * + * @param ireg registration to check + * @return + */ + private boolean toRemove(ImportRegistration ireg) { + ImportReference iref = ireg != null ? ireg.getImportReference() : null; + return iref == null || !possible.contains(iref.getImportedEndpoint()); + } + private Set<EndpointDescription> importedEndpoints() { return imported.stream() .map(ImportRegistration::getImportReference).filter(Objects::nonNull) diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java index 2543da9..5e4baf8 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -85,7 +85,7 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi } // close all imports importPossibilities.clear(); - importedServices.allValues().forEach(ir -> unimportService(ir.getImportReference())); + importedServices.allValues().forEach(this::unimportRegistration); } public void add(RemoteServiceAdmin rsa) { @@ -99,8 +99,11 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi @Override public void remoteAdminEvent(RemoteServiceAdminEvent event) { - if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) { - unimportService(event.getImportReference()); + ImportReference ref = event.getImportReference(); + if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION && ref != null) { + importedServices.allValues().stream() + .filter(ir -> ref.equals(ir.getImportReference())) + .forEach(this::unimportRegistration); } } @@ -125,7 +128,7 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi try { ImportDiff diff = new ImportDiff(importPossibilities.get(filter), importedServices.get(filter)); diff.getRemoved() - .forEach(this::unimportService); + .forEach(this::unimportRegistration); diff.getAdded() .flatMap(this::importService) .forEach(ir -> importedServices.put(filter, ir)); @@ -156,16 +159,12 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi } return Stream.empty(); } - - private void unimportService(ImportReference ref) { - importedServices.allValues().stream() - .filter(ir -> ref != null && ref.equals(ir.getImportReference())) - .forEach(ir -> { - importedServices.remove(ir); - ir.close(); - }); + + private void unimportRegistration(ImportRegistration reg) { + importedServices.remove(reg); + reg.close(); } - + @Override public void endpointChanged(EndpointEvent event, String filter) { if (stopped) {