This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
The following commit(s) were added to refs/heads/master by this push: new 0f48071 ARIES-1943 Changed TopologyManager importer logic to not assume all referenced ImportRegistrations are 'open'. (#36) 0f48071 is described below commit 0f48071f60263d35e55dcf7f510f06dd75018da5 Author: Arnoud Glimmerveen <arn...@glimmerveen.org> AuthorDate: Wed Nov 13 15:04:16 2019 +0100 ARIES-1943 Changed TopologyManager importer logic to not assume all referenced ImportRegistrations are 'open'. (#36) * ARIES-1943 Changed TopologyManager importer logic to not assume all referenced ImportRegistrations are 'open'. In addition assigned functional thread names to the ExecutorServices used by Importer and Exporter logic. * Based threadName pattern on using class. * Changed approach to naming threads, as discussed in the comments of PR #36. --- .../aries/rsa/topologymanager/Activator.java | 2 +- .../rsa/topologymanager/NamedThreadFactory.java | 46 ++++++++++++++++++++++ .../importer/TopologyManagerImport.java | 34 +++++++++++++--- 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java index 3ff832f..352a294 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java @@ -119,7 +119,7 @@ public class Activator implements BundleActivator { public void doStart(final BundleContext bc, ExportPolicy policy) { LOG.debug("TopologyManager: start()"); notifier = new EndpointListenerNotifier(); - exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(TopologyManagerExport.class)); exportManager = new TopologyManagerExport(notifier, exportExecutor, policy); epeListenerTracker = new EndpointEventListenerTracker(bc, exportManager); importManager = new TopologyManagerImport(bc); diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/NamedThreadFactory.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/NamedThreadFactory.java new file mode 100644 index 0000000..724bede --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/NamedThreadFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { + /** + * Counter used when constructing unique Thread names + */ + private final AtomicInteger count = new AtomicInteger(0); + /** + * Pattern used when constructing Thread names. This pattern must include the sequence + * %d which will be used to replace with a Thread count. + */ + private final String namePattern; + + /** + * @param context Class name will be used to name threads created by this ThreadFactory. + */ + public NamedThreadFactory(Class<?> context) { + this.namePattern = context.getSimpleName() + "-%d"; + } + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format(namePattern, count.getAndIncrement())); + } +} diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java index e28089b..1225a6e 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.aries.rsa.topologymanager.NamedThreadFactory; import org.osgi.framework.BundleContext; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; @@ -66,7 +67,7 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi public TopologyManagerImport(BundleContext bc) { this.rsaSet = new CopyOnWriteArraySet<>(); bctx = bc; - execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(getClass())); } public void start() { @@ -80,6 +81,8 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi try { execService.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for {} to terminate", execService); + Thread.currentThread().interrupt(); } closeAllImports(); } @@ -147,7 +150,12 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi private boolean alreadyImported(EndpointDescription endpoint, Set<ImportRegistration> importRegistrations) { for (ImportRegistration ir : importRegistrations) { - if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) { + final ImportReference importReference = ir.getImportReference(); + if (importReference == null) { + LOG.debug("ImportRegistration {} already closed", ir); + continue; + } + if (endpoint.equals(importReference.getImportedEndpoint())) { return true; } } @@ -179,9 +187,18 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi Set<ImportRegistration> importRegistrations = importedServices.get(filter); Set<EndpointDescription> endpoints = importPossibilities.get(filter); for (ImportRegistration ir : importRegistrations) { - EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint(); + final ImportReference importReference = ir.getImportReference(); + if (importReference == null) { + LOG.debug("Unable to get ImportReference for ImportRegistration {}: already closed", ir); + continue; + } + EndpointDescription endpoint = importReference.getImportedEndpoint(); + if (endpoint == null) { + LOG.debug("Unable to get EndpointDescription of ImportReference for ImportRegistration {}: already closed", ir); + continue; + } if (!endpoints.contains(endpoint)) { - unImport(ir.getImportReference()); + unImport(importReference); } } } @@ -191,7 +208,12 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi Set<String> imported = importedServices.keySet(); for (String key : imported) { for (ImportRegistration ir : importedServices.get(key)) { - if (ir.getImportReference().equals(ref)) { + final ImportReference importReference = ir.getImportReference(); + if (importReference == null) { + LOG.debug("Unable to get ImportReference for ImportRegistration {}: already closed", ir); + continue; + } + if (importReference.equals(ref)) { removed.add(ir); } } @@ -230,5 +252,5 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi } triggerSynchronizeImports(filter); } - + }