This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch refactor_tm in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
The following commit(s) were added to refs/heads/refactor_tm by this push: new 603832b Factor out diffing of imported and possible 603832b is described below commit 603832bc31dc8c9d2468313695c1fcf3cf535f14 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Fri Nov 15 13:34:46 2019 +0100 Factor out diffing of imported and possible --- .../rsa/topologymanager/importer/ImportDiff.java | 63 ++++++++++++++++++++++ .../importer/TopologyManagerImport.java | 53 +++++------------- 2 files changed, 77 insertions(+), 39 deletions(-) diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java new file mode 100644 index 0000000..05d8e90 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +import java.util.Objects; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.ImportReference; +import org.osgi.service.remoteserviceadmin.ImportRegistration; + +public class ImportDiff { + private Set<EndpointDescription> possible; + private Set<ImportRegistration> imported; + + public ImportDiff(Set<EndpointDescription> possible, Set<ImportRegistration> imported) { + this.possible = possible; + this.imported = imported; + } + + public Stream<ImportReference> getRemoved() { + return imported.stream() + .map(ImportRegistration::getImportReference) + .filter(Objects::nonNull) + .filter(ir -> !possible.contains(ir.getImportedEndpoint())); + } + + private Set<EndpointDescription> importedEndpoints() { + return imported.stream() + .map(ImportRegistration::getImportReference).filter(Objects::nonNull) + .map(ImportReference::getImportedEndpoint).filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + public Stream<EndpointDescription> getAdded() { + Set<EndpointDescription> importedEndpoints = importedEndpoints(); + return possible.stream() + .filter(not(importedEndpoints::contains)); + } + + public static <T> Predicate<T> not(Predicate<T> t) { + return t.negate(); + } +} diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java index 95d3aee..2543da9 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.apache.aries.rsa.topologymanager.NamedThreadFactory; import org.osgi.framework.BundleContext; @@ -114,17 +115,20 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi * Synchronizes the actual imports with the possible imports for the given filter, * i.e. unimports previously imported endpoints that are no longer possible, * and imports new possible endpoints that are not already imported. + * + * TODO but optional: if the service is already imported and the endpoint is still + * in the list of possible imports check if a "better" endpoint is now in the list * * @param filter the filter whose endpoints are synchronized */ private void synchronizeImports(final String filter) { try { - // unimport endpoints that are no longer possible - unimportRemovedServices(filter); - // import new endpoints - importAddedServices(filter); - // TODO but optional: if the service is already imported and the endpoint is still - // in the list of possible imports check if a "better" endpoint is now in the list + ImportDiff diff = new ImportDiff(importPossibilities.get(filter), importedServices.get(filter)); + diff.getRemoved() + .forEach(this::unimportService); + diff.getAdded() + .flatMap(this::importService) + .forEach(ir -> importedServices.put(filter, ir)); } catch (Exception e) { LOG.error(e.getMessage(), e); } @@ -132,54 +136,25 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi } /** - * Checks if an endpoint is included in a set of import registrations. - * - * @param imported the import registrations that may include the endpoint - * @param endpoint the endpoint to check - * @return whether the given endpoint is imported - */ - private boolean isImported(Set<ImportRegistration> imported, EndpointDescription endpoint) { - return imported.stream() - .map(ImportRegistration::getImportReference) - .anyMatch(ir -> ir != null && endpoint.equals(ir.getImportedEndpoint())); - } - - private void importAddedServices(String filter) { - Set<EndpointDescription> possible = importPossibilities.get(filter); - Set<ImportRegistration> imported = importedServices.get(filter); - possible.stream() - .filter(endpoint -> !isImported(imported, endpoint)) // filter out already imported - .forEach(endpoint -> importService(filter, endpoint)); // import the new endpoints - } - - private void unimportRemovedServices(String filter) { - Set<EndpointDescription> possible = importPossibilities.get(filter); - Set<ImportRegistration> imported = importedServices.get(filter); - imported.stream() - .map(ImportRegistration::getImportReference) - .filter(ir -> ir != null && !possible.contains(ir.getImportedEndpoint())) // filter out possibles - .forEach(this::unimportService); // unimport the non-possibles - } - - /** * Tries to import the service with each rsa until one import is successful. * * @param filter the filter that matched the endpoint * @param endpoint endpoint to import + * @return */ - private void importService(String filter, EndpointDescription endpoint) { + private Stream<ImportRegistration> importService(EndpointDescription endpoint) { for (RemoteServiceAdmin rsa : rsaSet) { ImportRegistration ir = rsa.importService(endpoint); if (ir != null) { if (ir.getException() == null) { LOG.debug("Service import was successful {}", ir); - importedServices.put(filter, ir); - return; + return Stream.of(ir); } else { LOG.info("Error importing service " + endpoint, ir.getException()); } } } + return Stream.empty(); } private void unimportService(ImportReference ref) {