This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-rsa.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f48071  ARIES-1943 Changed TopologyManager importer logic to not 
assume all referenced ImportRegistrations are 'open'. (#36)
0f48071 is described below

commit 0f48071f60263d35e55dcf7f510f06dd75018da5
Author: Arnoud Glimmerveen <arn...@glimmerveen.org>
AuthorDate: Wed Nov 13 15:04:16 2019 +0100

    ARIES-1943 Changed TopologyManager importer logic to not assume all 
referenced ImportRegistrations are 'open'. (#36)
    
    * ARIES-1943 Changed TopologyManager importer logic to not assume all 
referenced ImportRegistrations are 'open'.
    
    In addition assigned functional thread names to the ExecutorServices used 
by Importer and Exporter logic.
    
    * Based threadName pattern on using class.
    
    * Changed approach to naming threads, as discussed in the comments of PR 
#36.
---
 .../aries/rsa/topologymanager/Activator.java       |  2 +-
 .../rsa/topologymanager/NamedThreadFactory.java    | 46 ++++++++++++++++++++++
 .../importer/TopologyManagerImport.java            | 34 +++++++++++++---
 3 files changed, 75 insertions(+), 7 deletions(-)

diff --git 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
index 3ff832f..352a294 100644
--- 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
+++ 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
@@ -119,7 +119,7 @@ public class Activator implements BundleActivator {
     public void doStart(final BundleContext bc, ExportPolicy policy) {
         LOG.debug("TopologyManager: start()");
         notifier = new EndpointListenerNotifier();
-        exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, 
new LinkedBlockingQueue<Runnable>());
+        exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, 
new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(TopologyManagerExport.class));
         exportManager = new TopologyManagerExport(notifier, exportExecutor, 
policy);
         epeListenerTracker = new EndpointEventListenerTracker(bc, 
exportManager);
         importManager = new TopologyManagerImport(bc);
diff --git 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/NamedThreadFactory.java
 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/NamedThreadFactory.java
new file mode 100644
index 0000000..724bede
--- /dev/null
+++ 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/NamedThreadFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NamedThreadFactory implements ThreadFactory {
+    /**
+     * Counter used when constructing unique Thread names
+     */
+    private final AtomicInteger count = new AtomicInteger(0);
+    /**
+     * Pattern used when constructing Thread names. This pattern must include 
the sequence
+     * %d which will be used to replace with a Thread count.
+     */
+    private final String namePattern;
+
+    /**
+     * @param context Class name will be used to name threads created by this 
ThreadFactory.
+     */
+    public NamedThreadFactory(Class<?> context) {
+        this.namePattern = context.getSimpleName() + "-%d";
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        return new Thread(r, String.format(namePattern, 
count.getAndIncrement()));
+    }
+}
diff --git 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index e28089b..1225a6e 100644
--- 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.aries.rsa.topologymanager.NamedThreadFactory;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
@@ -66,7 +67,7 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
     public TopologyManagerImport(BundleContext bc) {
         this.rsaSet = new CopyOnWriteArraySet<>();
         bctx = bc;
-        execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>());
+        execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(getClass()));
     }
     
     public void start() {
@@ -80,6 +81,8 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
         try {
             execService.awaitTermination(10, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
+            LOG.info("Interrupted while waiting for {} to terminate", 
execService);
+            Thread.currentThread().interrupt();
         }
         closeAllImports();
     }
@@ -147,7 +150,12 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
 
     private boolean alreadyImported(EndpointDescription endpoint, 
Set<ImportRegistration> importRegistrations) {
         for (ImportRegistration ir : importRegistrations) {
-            if 
(endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
+            final ImportReference importReference = ir.getImportReference();
+            if (importReference == null) {
+                LOG.debug("ImportRegistration {} already closed", ir);
+                continue;
+            }
+            if (endpoint.equals(importReference.getImportedEndpoint())) {
                 return true;
             }
         }
@@ -179,9 +187,18 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
         Set<ImportRegistration> importRegistrations = 
importedServices.get(filter);
         Set<EndpointDescription> endpoints = importPossibilities.get(filter);
         for (ImportRegistration ir : importRegistrations) {
-            EndpointDescription endpoint = 
ir.getImportReference().getImportedEndpoint();
+            final ImportReference importReference = ir.getImportReference();
+            if (importReference == null) {
+                LOG.debug("Unable to get ImportReference for 
ImportRegistration {}: already closed", ir);
+                continue;
+            }
+            EndpointDescription endpoint = 
importReference.getImportedEndpoint();
+            if (endpoint == null) {
+                LOG.debug("Unable to get EndpointDescription of 
ImportReference for ImportRegistration {}: already closed", ir);
+                continue;
+            }
             if (!endpoints.contains(endpoint)) {
-                unImport(ir.getImportReference());
+                unImport(importReference);
             }
         }
     }
@@ -191,7 +208,12 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
         Set<String> imported = importedServices.keySet();
         for (String key : imported) {
             for (ImportRegistration ir : importedServices.get(key)) {
-                if (ir.getImportReference().equals(ref)) {
+                final ImportReference importReference = 
ir.getImportReference();
+                if (importReference == null) {
+                    LOG.debug("Unable to get ImportReference for 
ImportRegistration {}: already closed", ir);
+                    continue;
+                }
+                if (importReference.equals(ref)) {
                     removed.add(ir);
                 }
             }
@@ -230,5 +252,5 @@ public class TopologyManagerImport implements 
EndpointEventListener, RemoteServi
         }
         triggerSynchronizeImports(filter);
     }
-    
+
 }

Reply via email to