This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-rsa.git


The following commit(s) were added to refs/heads/master by this push:
     new db530ba  ARIES-1780 - Improved design
db530ba is described below

commit db530ba5e43aabfd6e60328f9a05043fccbf32e8
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Sun Nov 10 16:32:21 2019 +0100

    ARIES-1780 - Improved design
---
 discovery/zookeeper/bnd.bnd                        |   2 +-
 .../aries/rsa/discovery/zookeeper/Interest.java    | 134 +++++++++++++++
 .../rsa/discovery/zookeeper/InterestManager.java   | 181 +++++----------------
 .../zookeeper/PublishingEndpointListener.java      |   8 +-
 .../zookeeper/{ => client}/ClientManager.java      |   6 +-
 .../{ => client}/ZookeeperEndpointListener.java    |  68 +++++---
 .../ZookeeperEndpointRepository.java}              |  22 ++-
 .../zookeeper/server/MyZooKeeperServerMain.java    |   2 +-
 .../discovery/zookeeper/InterestManagerTest.java   |  65 +++++---
 .../zookeeper/PublishingEndpointListenerTest.java  |   3 +-
 .../zookeeper/ZookeeperDiscoveryTest.java          |  29 +++-
 .../zookeeper/{ => client}/ClientManagerTest.java  |   5 +-
 .../ZookeeperEndpointRepositoryPathTest.java}      |  11 +-
 .../rsa/itests/felix/tcp/TestDiscoveryExport.java  |   6 +-
 .../rsa/itests/felix/tcp/TestDiscoveryImport.java  |   4 +-
 .../aries/rsa/itests/felix/tcp/TestFindHook.java   |  10 +-
 .../rsa/topologymanager/importer/MultiMap.java     |   2 +-
 17 files changed, 335 insertions(+), 223 deletions(-)

diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd
index 4f06393..b7d484e 100644
--- a/discovery/zookeeper/bnd.bnd
+++ b/discovery/zookeeper/bnd.bnd
@@ -16,4 +16,4 @@
 #    under the License.
 Provide-Capability: osgi.remoteserviceadmin.discovery;\
        protocols:List<String>="zookeeper"; version:Version=1.1.0
-Export-Package: org.apache.aries.rsa.discovery.zookeeper
\ No newline at end of file
+Export-Package: org.apache.aries.rsa.discovery.zookeeper.client
\ No newline at end of file
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Interest.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Interest.java
new file mode 100644
index 0000000..a74324b
--- /dev/null
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Interest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import static 
org.osgi.service.remoteserviceadmin.EndpointEventListener.ENDPOINT_LISTENER_SCOPE;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.aries.rsa.util.StringPlus;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("deprecation")
+public class Interest {
+    private static final Logger LOG = LoggerFactory.getLogger(Interest.class);
+    
+    private final ServiceReference<?> sref;
+    private final List<String> scopes;
+    private final Object epListener;
+    
+    public Interest(ServiceReference<?> sref) {
+        this(sref, null);
+    }
+    
+    public Interest(ServiceReference<?> sref, Object epListener) {
+        this.sref = sref;
+        this.scopes = 
StringPlus.normalize(sref.getProperty(ENDPOINT_LISTENER_SCOPE));
+        this.epListener = epListener;
+    }
+    
+    public List<String> getScopes() {
+        return scopes;
+    }
+
+    public Object getEpListener() {
+        return epListener;
+    }
+    
+    public void notifyListener(EndpointEvent event) {
+        EndpointDescription endpoint = event.getEndpoint();
+        Optional<String> currentScope = getFirstMatch(endpoint);
+        if (currentScope.isPresent()) {
+            LOG.debug("Matched {} against {}", endpoint, currentScope);
+            Object service = getEpListener();
+            if (service instanceof EndpointEventListener) {
+                notifyEEListener(event, currentScope.get(), 
(EndpointEventListener)service);
+            } else if (service instanceof EndpointListener) {
+                notifyEListener(event, currentScope.get(), 
(EndpointListener)service);
+            }
+        }
+    }
+    
+    private Optional<String> getFirstMatch(EndpointDescription endpoint) {
+        return scopes.stream().filter(endpoint::matches).findFirst();
+    }
+
+    private void notifyEEListener(EndpointEvent event, String currentScope, 
EndpointEventListener listener) {
+        EndpointDescription endpoint = event.getEndpoint();
+        LOG.info("Calling endpointchanged on class {} for filter {}, type {}, 
endpoint {} ", listener, currentScope, endpoint);
+        listener.endpointChanged(event, currentScope);
+    }
+    
+    private void notifyEListener(EndpointEvent event, String currentScope, 
EndpointListener listener) {
+        EndpointDescription endpoint = event.getEndpoint();
+        LOG.info("Calling old listener on class {} for filter {}, type {}, 
endpoint {} ", listener, currentScope, endpoint);
+        switch (event.getType()) {
+        case EndpointEvent.ADDED:
+            listener.endpointAdded(endpoint, currentScope);
+            break;
+
+        case EndpointEvent.MODIFIED:
+            listener.endpointRemoved(endpoint, currentScope);
+            listener.endpointAdded(endpoint, currentScope);
+            break;
+
+        case EndpointEvent.REMOVED:
+            listener.endpointRemoved(endpoint, currentScope);
+            break;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((sref == null) ? 0 : sref.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Interest other = (Interest) obj;
+        if (sref == null) {
+            if (other.sref != null)
+                return false;
+        } else if (!sref.equals(other.sref))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "Interest [scopes=" + scopes + ", epListener=" + 
epListener.getClass() + "]";
+    }
+    
+}
\ No newline at end of file
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
index 41bce86..d475c40 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
@@ -18,18 +18,19 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper;
 
-import java.util.List;
-import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.aries.rsa.util.StringPlus;
+import org.apache.aries.rsa.discovery.zookeeper.client.ClientManager;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointListener;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository;
 import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.component.annotations.ReferencePolicy;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
@@ -37,24 +38,38 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Manages the EndpointEventListeners and the scopes they are interested in.
- * Establishes a listener with the repository to be called back on all changes 
in the repo.
- * Events from repository are then forwarded to all interested 
EndpointEventListeners.
+ * Manages the {@link EndpointEventListener}s and the scopes they are 
interested in.
+ * Establishes a listener with the {@link ZookeeperEndpointRepository} to be 
called back on all changes in the repository.
+ * Events from repository are then forwarded to all interested {@link 
EndpointEventListener}s.
  */
-@SuppressWarnings({"deprecation", "rawtypes"})
-@Component(service = InterestManager.class)
+@SuppressWarnings("deprecation")
+@Component(immediate = true)
 public class InterestManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(InterestManager.class);
 
-    private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<>();
+    private Set<Interest> interests = ConcurrentHashMap.newKeySet();
     
-    private final Map<ServiceReference, Interest> interests = new 
ConcurrentHashMap<>();
+    @Reference
+    private ZookeeperEndpointRepository repository;
 
-    protected static class Interest {
-        List<String> scopes;
-        Object epListener;
+    private ZookeeperEndpointListener listener;
+    
+    public InterestManager() {
     }
-
+    
+    public InterestManager(ZookeeperEndpointRepository repository) {
+        this.repository = repository;
+    }
+    
+    @Activate
+    public void activate() {
+        this.listener = repository.createListener(this::onEndpointChanged);
+    }
+    
+    private void onEndpointChanged(EndpointEvent event, String filter) {
+        interests.forEach(interest -> interest.notifyListener(event));
+    }
+    
     @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = 
ReferencePolicy.DYNAMIC)
     public void 
bindEndpointEventListener(ServiceReference<EndpointEventListener> sref, 
EndpointEventListener epListener) {
         addInterest(sref, epListener);
@@ -65,7 +80,7 @@ public class InterestManager {
     }
     
     public void 
unbindEndpointEventListener(ServiceReference<EndpointEventListener> sref) {
-        removeInterest(sref);
+        interests.remove(new Interest(sref));
     }
 
     @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = 
ReferencePolicy.DYNAMIC)
@@ -78,25 +93,13 @@ public class InterestManager {
     }
     
     public void unbindEndpointListener(ServiceReference<EndpointListener> 
sref) {
-        removeInterest(sref);
+        interests.remove(new Interest(sref));
     }
 
-    private void removeInterest(ServiceReference<?> sref) {
-        if (interests.containsKey(sref)) {
-            List<String> scopes = getScopes(sref);
-            LOG.info("removing interests: {}", scopes);
-            interests.remove(sref);
-        }
-    }
-    
-    /**
-     * Read current endpoint stored at a znode
-     * 
-     * @param path
-     * @return
-     */
-    EndpointDescription read(String path) {
-        return nodes.get(path);
+    @Deactivate
+    public void close() {
+        this.listener.close();
+        interests.clear();
     }
 
     private void addInterest(ServiceReference<?> sref, Object epListener) {
@@ -104,29 +107,15 @@ public class InterestManager {
             LOG.debug("Skipping our own EndpointEventListener");
             return;
         }
-        List<String> scopes = getScopes(sref);
-        LOG.debug("adding Interests: {}", scopes);
-        
-        // get or create interest for given scope and add listener to it
-        Interest interest = interests.get(epListener);
-        if (interest == null) {
-            // create interest, add listener and start monitor
-            interest = new Interest();
-            interest.epListener = epListener;
-            interest.scopes = scopes;
-            interests.put(sref, interest);
-            sendExistingEndpoints(scopes, epListener);
-        } else {
-            interest.scopes = scopes;
-            sendExistingEndpoints(scopes, epListener);
-        }
+        Interest interest = new Interest(sref, epListener);
+        update(interest);
+        listener.sendExistingEndpoints(interest);
     }
 
-    private void sendExistingEndpoints(List<String> scopes, Object epListener) 
{
-        for (EndpointDescription endpoint : nodes.values()) {
-            EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, 
endpoint);
-            notifyListener(event, scopes, epListener);
-        }
+    private void update(Interest interest) {
+        boolean present = interests.remove(interest);
+        LOG.debug("{} Interest: {}", present ? "Adding" : "Updating", 
interest);
+        interests.add(interest);
     }
 
     private static boolean isOurOwnEndpointEventListener(ServiceReference<?> 
endpointEventListener) {
@@ -134,90 +123,8 @@ public class InterestManager {
                 
endpointEventListener.getProperty(ClientManager.DISCOVERY_ZOOKEEPER_ID)));
     }
     
-    public void handleRemoved(String path) {
-        EndpointDescription endpoint = nodes.remove(path);
-        if (endpoint != null) {
-            EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, 
endpoint);
-            endpointChanged(event);
-        }
-    }
-
-    public void handleChanged(String path, EndpointDescription endpoint) {
-        EndpointDescription old = nodes.put(path, endpoint);
-        int type = old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED;
-        EndpointEvent event = new EndpointEvent(type, endpoint);
-        endpointChanged(event);
-    }
-
-    private void endpointChanged(EndpointEvent event) {
-        for (Interest interest : interests.values()) {
-            notifyListener(event, interest.scopes, interest.epListener);
-        }
-    }
-
-    private void notifyListener(EndpointEvent event, List<String> scopes, 
Object service) {
-        EndpointDescription endpoint = event.getEndpoint();
-        String currentScope = getFirstMatch(scopes, endpoint);
-        if (currentScope == null) {
-            return;
-        }
-        LOG.debug("Matched {} against {}", endpoint, currentScope);
-        if (service instanceof EndpointEventListener) {
-            notifyEEListener(event, currentScope, 
(EndpointEventListener)service);
-        } else if (service instanceof EndpointListener) {
-            notifyEListener(event, currentScope, (EndpointListener)service);
-        }
-    }
-    
-    private String getFirstMatch(List<String> scopes, EndpointDescription 
endpoint) {
-        for (String scope : scopes) {
-            if (endpoint.matches(scope)) {
-                return scope;
-            }
-        }
-        return null;
-    }
-
-    private void notifyEEListener(EndpointEvent event, String currentScope, 
EndpointEventListener listener) {
-        EndpointDescription endpoint = event.getEndpoint();
-        LOG.info("Calling endpointchanged on class {} for filter {}, type {}, 
endpoint {} ", listener, currentScope, endpoint);
-        listener.endpointChanged(event, currentScope);
-    }
-    
-    private void notifyEListener(EndpointEvent event, String currentScope, 
EndpointListener listener) {
-        EndpointDescription endpoint = event.getEndpoint();
-        LOG.info("Calling old listener on class {} for filter {}, type {}, 
endpoint {} ", listener, currentScope, endpoint);
-        switch (event.getType()) {
-        case EndpointEvent.ADDED:
-            listener.endpointAdded(endpoint, currentScope);
-            break;
-
-        case EndpointEvent.MODIFIED:
-            listener.endpointAdded(endpoint, currentScope);
-            listener.endpointRemoved(endpoint, currentScope);
-            break;
-
-        case EndpointEvent.REMOVED:
-            listener.endpointRemoved(endpoint, currentScope);
-            break;
-        }
-    }
-
-    @Deactivate
-    public synchronized void close() {
-        nodes.clear();
-        interests.clear();
-    }
-
-    /**
-     * Only for test case!
-     */
-    protected synchronized Map<ServiceReference, Interest> getInterests() {
+    Set<Interest> getInterests() {
         return interests;
     }
 
-    protected List<String> getScopes(ServiceReference<?> sref) {
-        return 
StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
-    }
-
 }
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java
index 89e1556..24df4a9 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java
@@ -21,6 +21,8 @@ package org.apache.aries.rsa.discovery.zookeeper;
 import java.util.Dictionary;
 import java.util.Hashtable;
 
+import org.apache.aries.rsa.discovery.zookeeper.client.ClientManager;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
@@ -35,8 +37,8 @@ import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
 
 /**
- * Listens for local EndpointEvents using old and new style listeners and 
publishes changes to 
- * the ZooKeeperEndpointRepository
+ * Listens for local {@link EndpointEvent}s using {@link 
EndpointEventListener} and old style {@link EndpointListener} 
+ * and publishes changes to the {@link ZooKeeperEndpointRepository}
  */
 @SuppressWarnings("deprecation")
 @Component(service = {}, immediate = true)
@@ -45,7 +47,7 @@ public class PublishingEndpointListener implements 
EndpointEventListener, Endpoi
     private ServiceRegistration<?> listenerReg;
     
     @Reference
-    private ZookeeperEndpointPublisher repository;
+    private ZookeeperEndpointRepository repository;
 
     @Activate
     public void start(BundleContext bctx) {
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManager.java
similarity index 94%
rename from 
discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
rename to 
discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManager.java
index e6bcf6f..e614ef4 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManager.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.zookeeper;
+package org.apache.aries.rsa.discovery.zookeeper.client;
 
 import static java.util.concurrent.CompletableFuture.runAsync;
 
@@ -38,6 +38,10 @@ import 
org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Creates a Zookeeper client from a config. The Zookeeper service is 
published when the connection
+ * has been established and will be unpublished when the connection goes away.
+ */
 @Component(//
         service = ClientManager.class,
         immediate = true,
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointListener.java
similarity index 62%
rename from 
discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java
rename to 
discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointListener.java
index 37fdada..3b7ddd4 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointListener.java
@@ -16,11 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.zookeeper;
+package org.apache.aries.rsa.discovery.zookeeper.client;
 
 import java.io.ByteArrayInputStream;
+import java.io.Closeable;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.aries.rsa.discovery.zookeeper.Interest;
 import org.apache.aries.rsa.spi.EndpointDescriptionParser;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -29,44 +33,45 @@ import 
org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(immediate = true)
-public class ZookeeperEndpointListener {
+/**
+ * Listens to endpoint changes in Zookeeper and forwards changes in Endpoints 
to InterestManager. 
+ */
+public class ZookeeperEndpointListener implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperEndpointListener.class);
     
-    @Reference
+    private Map<String, EndpointDescription> endpoints = new 
ConcurrentHashMap<>();
+    
     private ZooKeeper zk;
     
-    @Reference
     private EndpointDescriptionParser parser;
     
-    @Reference
-    private InterestManager listener;
-    
-    @Reference
-    private ZookeeperEndpointPublisher publisher;
+    private EndpointEventListener listener;
     
-    public ZookeeperEndpointListener() {
-    }
-    
-    public ZookeeperEndpointListener(ZooKeeper zk, EndpointDescriptionParser 
parser, InterestManager listener) {
+    ZookeeperEndpointListener(ZooKeeper zk, EndpointDescriptionParser parser, 
EndpointEventListener listener) {
         this.zk = zk;
         this.parser = parser;
         this.listener = listener;
-        activate();
+        watchRecursive(ZookeeperEndpointRepository.PATH_PREFIX);
     }
     
-    @Activate
-    public void activate() {
-        watchRecursive(ZookeeperEndpointPublisher.PATH_PREFIX);
+    @Override
+    public void close() {
+        // TODO unregister watchers
+        endpoints.clear();
     }
     
+    public void sendExistingEndpoints(Interest interest) {
+        endpoints.values().stream()
+            .map(endpoint -> new EndpointEvent(EndpointEvent.ADDED, endpoint))
+            .forEach(interest::notifyListener);
+    }
+
     private void process(WatchedEvent event) {
         String path = event.getPath();
         LOG.info("Received event {}", event);
@@ -77,7 +82,7 @@ public class ZookeeperEndpointListener {
             watchRecursive(path);
             break;
         case NodeDeleted:
-            listener.handleRemoved(path);
+            onRemoved(path);
             break;
         default:
             break;
@@ -89,7 +94,7 @@ public class ZookeeperEndpointListener {
         try {
             EndpointDescription endpoint = read(path);
             if (endpoint != null) {
-                listener.handleChanged(path, endpoint);
+                onChanged(path, endpoint);
             }
             List<String> children = zk.getChildren(path, this::process);
             if (children == null) {
@@ -106,8 +111,23 @@ public class ZookeeperEndpointListener {
             LOG.info(e.getMessage(), e);
         }
     }
+    
+    private void onChanged(String path, EndpointDescription endpoint) {
+        EndpointDescription old = endpoints.put(path, endpoint);
+        int type = old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED;
+        EndpointEvent event = new EndpointEvent(type, endpoint);
+        listener.endpointChanged(event, null);
+    }
+
+    private void onRemoved(String path) {
+        EndpointDescription endpoint = endpoints.remove(path);
+        if (endpoint != null) {
+            EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, 
endpoint);
+            listener.endpointChanged(event, null);
+        }
+    }
 
-    EndpointDescription read(String path) throws KeeperException, 
InterruptedException {
+    private EndpointDescription read(String path) throws KeeperException, 
InterruptedException {
         Stat stat = new Stat();
         byte[] data = zk.getData(path, this::process, stat);
         if (data == null || data.length == 0) {
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepository.java
similarity index 90%
rename from 
discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java
rename to 
discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepository.java
index 7757396..fdb046f 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepository.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.zookeeper;
+package org.apache.aries.rsa.discovery.zookeeper.client;
 
 import static java.util.Arrays.asList;
 import static java.util.stream.Collectors.toList;
@@ -38,13 +38,17 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(service = ZookeeperEndpointPublisher.class)
-public class ZookeeperEndpointPublisher {
+/**
+ * Is called by PublishingEndpointListener with local Endpoint changes and 
forward the changes to Zookeeper. 
+ */
+@Component(service = ZookeeperEndpointRepository.class)
+public class ZookeeperEndpointRepository {
     public static final String PATH_PREFIX = "/osgi/service_registry";
-    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperEndpointPublisher.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperEndpointRepository.class);
     private final Map<Integer, String> typeNames = new HashMap<>();
     
     @Reference
@@ -53,14 +57,14 @@ public class ZookeeperEndpointPublisher {
     @Reference
     private EndpointDescriptionParser parser;
     
-    public ZookeeperEndpointPublisher() {
+    public ZookeeperEndpointRepository() {
         typeNames.put(EndpointEvent.ADDED, "added");
         typeNames.put(EndpointEvent.MODIFIED, "modified");
         typeNames.put(EndpointEvent.MODIFIED_ENDMATCH, "modified");
         typeNames.put(EndpointEvent.REMOVED, "removed");
     }
     
-    public ZookeeperEndpointPublisher(ZooKeeper zk, EndpointDescriptionParser 
parser) {
+    public ZookeeperEndpointRepository(ZooKeeper zk, EndpointDescriptionParser 
parser) {
         this();
         this.zk = zk;
         this.parser = parser;
@@ -74,6 +78,10 @@ public class ZookeeperEndpointPublisher {
             throw new IllegalStateException("Unable to create base path");
         }
     }
+    
+    public ZookeeperEndpointListener createListener(EndpointEventListener 
listener) {
+        return new ZookeeperEndpointListener(zk, parser, listener);
+    }
 
     public void endpointChanged(EndpointEvent event) {
         try {
@@ -134,7 +142,7 @@ public class ZookeeperEndpointPublisher {
     }
 
     private void createBasePath() throws KeeperException, InterruptedException 
{
-        String path = ZookeeperEndpointPublisher.getZooKeeperPath("");
+        String path = ZookeeperEndpointRepository.getZooKeeperPath("");
         createPath(path);
     }
 
diff --git 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/MyZooKeeperServerMain.java
 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/MyZooKeeperServerMain.java
index 33097b1..bb7e5e3 100644
--- 
a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/MyZooKeeperServerMain.java
+++ 
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/MyZooKeeperServerMain.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class MyZooKeeperServerMain extends ZooKeeperServerMain implements 
ZookeeperServer {
-       private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperStarter.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(MyZooKeeperServerMain.class);
 
     private QuorumPeerConfig config;
 
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java
index 985f1b0..a701b50 100644
--- 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java
+++ 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java
@@ -18,57 +18,74 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper;
 
-import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
 
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointListener;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 
+@RunWith(MockitoJUnitRunner.class)
 public class InterestManagerTest {
+    
+    @Mock
+    private ZookeeperEndpointRepository repository;
+    
+    @Mock
+    private EndpointEventListener epListener1;
+    
+    @Mock
+    private EndpointEventListener epListener2;
 
+    @Mock
+    private ZookeeperEndpointListener listener;
+
+    @InjectMocks
+    private InterestManager im;
+    
     @Test
     public void testEndpointListenerTrackerCustomizer() {
-        IMocksControl c = EasyMock.createControl();
-        ServiceReference<EndpointEventListener> sref = createService(c, 
"(objectClass=mine)");
-        ServiceReference<EndpointEventListener> sref2 = createService(c, 
"(objectClass=mine)");
-        EndpointEventListener epListener1 = 
c.createMock(EndpointEventListener.class); 
-        EndpointEventListener epListener2 = 
c.createMock(EndpointEventListener.class); 
-
-        c.replay();
-
-        InterestManager im = new InterestManager();
+        when(repository.createListener(Mockito.any())).thenReturn(listener);
+        im.activate();
+        ServiceReference<EndpointEventListener> sref = 
createService("(objectClass=mine)");
+        ServiceReference<EndpointEventListener> sref2 = 
createService("(objectClass=mine)");
         // sref has no scope -> nothing should happen
-        assertEquals(0, im.getInterests().size());
+        assertNumInterests(0);
 
         im.bindEndpointEventListener(sref, epListener1);
-        assertEquals(1, im.getInterests().size());
+        assertNumInterests(1);
 
         im.bindEndpointEventListener(sref, epListener1);
-        assertEquals(1, im.getInterests().size());
+        assertNumInterests(1);
 
         im.bindEndpointEventListener(sref2, epListener2);
-        assertEquals(2, im.getInterests().size());
+        assertNumInterests(2);
 
         im.unbindEndpointEventListener(sref);
-        assertEquals(1, im.getInterests().size());
+        assertNumInterests(1);
 
         im.unbindEndpointEventListener(sref);
-        assertEquals(1, im.getInterests().size());
+        assertNumInterests(1);
 
         im.unbindEndpointEventListener(sref2);
-        assertEquals(0, im.getInterests().size());
+        assertNumInterests(0);
+    }
 
-        c.verify();
+    private void assertNumInterests(int expectedNum) {
+        assertEquals(expectedNum, im.getInterests().size());
     }
 
     @SuppressWarnings("unchecked")
-    private ServiceReference<EndpointEventListener> 
createService(IMocksControl c, String scope) {
-        ServiceReference<EndpointEventListener> sref = 
c.createMock(ServiceReference.class);
-        
expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce();
-        
expect(sref.getProperty(ClientManager.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce();
+    private ServiceReference<EndpointEventListener> createService(String 
scope) {
+        ServiceReference<EndpointEventListener> sref = 
Mockito.mock(ServiceReference.class);
+        
when(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).thenReturn(scope);
         return sref;
     }
 
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java
index 815c42a..fa8244f 100644
--- 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java
+++ 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.verify;
 import java.util.HashMap;
 import java.util.Map;
 
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,7 +38,7 @@ import org.osgi.service.remoteserviceadmin.RemoteConstants;
 @RunWith(MockitoJUnitRunner.class)
 public class PublishingEndpointListenerTest {
     @Mock
-    ZookeeperEndpointPublisher repository;
+    ZookeeperEndpointRepository repository;
 
     @InjectMocks
     PublishingEndpointListener eli;
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
index 02f6093..8edccb1 100644
--- 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
+++ 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -36,10 +37,12 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParserImpl;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -57,6 +60,10 @@ import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
 
+/**
+ * Tests a complete cycle of publishing an endpoint and getting notified
+ * including zookeeper.
+ */
 @RunWith(MockitoJUnitRunner.class)
 public class ZookeeperDiscoveryTest {
     final Semaphore semConnected = new Semaphore(0);
@@ -65,6 +72,8 @@ public class ZookeeperDiscoveryTest {
     private ZooKeeper zk;
     private ServerCnxnFactory factory;
     private List<EndpointEvent> events = new ArrayList<>();
+    private EndpointDescriptionParserImpl parser = new 
EndpointDescriptionParserImpl();
+    
     @Mock
     private ServiceReference<EndpointEventListener> sref;
 
@@ -83,15 +92,14 @@ public class ZookeeperDiscoveryTest {
 
     @Test
     public void test() throws IOException, URISyntaxException, 
KeeperException, InterruptedException {
-        EndpointDescriptionParserImpl parser = new 
EndpointDescriptionParserImpl();
-        ZookeeperEndpointPublisher repository = new 
ZookeeperEndpointPublisher(zk, parser);
+        ZookeeperEndpointRepository repository = new 
ZookeeperEndpointRepository(zk, parser);
         repository.activate();
-        InterestManager im = new InterestManager();
+        InterestManager im = new InterestManager(repository);
+        im.activate();
         
         String scope = "("+ Constants.OBJECTCLASS +"=*)";
         
Mockito.when(sref.getProperty(Mockito.eq(EndpointEventListener.ENDPOINT_LISTENER_SCOPE))).thenReturn(scope);
         im.bindEndpointEventListener(sref, this::onEndpointChanged);
-        ZookeeperEndpointListener zklistener = new 
ZookeeperEndpointListener(zk, parser, im);
         
         assertThat(semConnected.tryAcquire(1, SECONDS), equalTo(true));
         
@@ -101,7 +109,7 @@ public class ZookeeperDiscoveryTest {
         assertThat(sem.tryAcquire(100, SECONDS), equalTo(true));
     
         String path = "/osgi/service_registry/http:##test.de#service1";
-        EndpointDescription ep2 = zklistener.read(path);
+        EndpointDescription ep2 = read(path);
         assertNotNull(ep2);
 
         repository.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, 
endpoint));
@@ -111,6 +119,17 @@ public class ZookeeperDiscoveryTest {
         assertThat(events.get(1).getType(), equalTo(EndpointEvent.REMOVED));
         assertThat(events.get(0).getEndpoint(), equalTo(endpoint));
         assertThat(events.get(1).getEndpoint(), equalTo(endpoint));
+        im.close();
+    }
+    
+    private EndpointDescription read(String path) throws KeeperException, 
InterruptedException {
+        Stat stat = new Stat();
+        byte[] data = zk.getData(path, this::process, stat);
+        if (data == null || data.length == 0) {
+            return null;
+        } else {
+            return parser.readEndpoint(new ByteArrayInputStream(data));
+        }
     }
 
     private void process(WatchedEvent event) {
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManagerTest.java
similarity index 93%
rename from 
discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java
rename to 
discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManagerTest.java
index 360bf88..9113d5e 100644
--- 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java
+++ 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManagerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.zookeeper;
+package org.apache.aries.rsa.discovery.zookeeper.client;
 
 import static org.junit.Assert.assertEquals;
 
@@ -25,7 +25,8 @@ import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.aries.rsa.discovery.zookeeper.ClientManager.DiscoveryConfig;
+import org.apache.aries.rsa.discovery.zookeeper.client.ClientManager;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ClientManager.DiscoveryConfig;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.Before;
 import org.junit.Rule;
diff --git 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepositoryPathTest.java
similarity index 65%
rename from 
discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
rename to 
discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepositoryPathTest.java
index 31441d3..ac50df7 100644
--- 
a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
+++ 
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepositoryPathTest.java
@@ -16,20 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.zookeeper;
+package org.apache.aries.rsa.discovery.zookeeper.client;
 
 import static org.junit.Assert.assertEquals;
 
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository;
 import org.junit.Test;
 
-public class ZookeeperEndpointPublisherPathTest {
+public class ZookeeperEndpointRepositoryPathTest {
     
     @Test
     public void testGetZooKeeperPath() {
-        assertEquals(ZookeeperEndpointPublisher.PATH_PREFIX + '/' + 
"http:##org.example.Test",
-            
ZookeeperEndpointPublisher.getZooKeeperPath("http://org.example.Test";));
+        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + 
"http:##org.example.Test",
+            
ZookeeperEndpointRepository.getZooKeeperPath("http://org.example.Test";));
 
-        assertEquals(ZookeeperEndpointPublisher.PATH_PREFIX, 
ZookeeperEndpointPublisher.getZooKeeperPath(""));
+        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, 
ZookeeperEndpointRepository.getZooKeeperPath(""));
     }
     
 }
diff --git 
a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
 
b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
index a24b140..774ec80 100644
--- 
a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
+++ 
b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
@@ -24,7 +24,7 @@ import java.io.ByteArrayInputStream;
 
 import javax.inject.Inject;
 
-import org.apache.aries.rsa.discovery.zookeeper.ZookeeperEndpointPublisher;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.examples.echotcp.api.EchoService;
 import org.apache.aries.rsa.itests.felix.RsaTestBase;
 import org.apache.aries.rsa.spi.DistributionProvider;
@@ -75,7 +75,7 @@ public class TestDiscoveryExport extends RsaTestBase {
 
     private EndpointDescription getEndpoint() throws Exception {
         String endpointName = await("Node 
exists").until(this::getEndpointPath, Matchers.notNullValue());
-        return getEndpointDescription(zookeeper, 
ZookeeperEndpointPublisher.PATH_PREFIX + "/" + endpointName);
+        return getEndpointDescription(zookeeper, 
ZookeeperEndpointRepository.PATH_PREFIX + "/" + endpointName);
     }
 
     private EndpointDescription getEndpointDescription(ZooKeeper zk, String 
endpointPath)
@@ -86,7 +86,7 @@ public class TestDiscoveryExport extends RsaTestBase {
     }
 
     private String getEndpointPath() throws KeeperException, 
InterruptedException {
-        return zookeeper.getChildren(ZookeeperEndpointPublisher.PATH_PREFIX, 
false).stream()
+        return zookeeper.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, 
false).stream()
                 .findFirst()
                 .orElse(null);
     }
diff --git 
a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
 
b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
index 4aea83f..b819872 100644
--- 
a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
+++ 
b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 
-import org.apache.aries.rsa.discovery.zookeeper.ZookeeperEndpointPublisher;
+import 
org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.itests.felix.RsaTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -48,7 +48,7 @@ import org.osgi.service.remoteserviceadmin.RemoteConstants;
 @RunWith(PaxExam.class)
 public class TestDiscoveryImport extends RsaTestBase {
     @Inject
-    ZookeeperEndpointPublisher publisher;
+    ZookeeperEndpointRepository publisher;
     
     @Inject
     BundleContext context;
diff --git 
a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
 
b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
index 09aa6fd..2590041 100644
--- 
a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
+++ 
b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
@@ -20,9 +20,9 @@ package org.apache.aries.rsa.itests.felix.tcp;
 
 
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
 
 import java.io.IOException;
-import java.util.Collection;
 
 import javax.inject.Inject;
 
@@ -30,14 +30,12 @@ import 
org.apache.aries.rsa.examples.echotcp.api.EchoService;
 import org.apache.aries.rsa.itests.felix.RsaTestBase;
 import org.apache.aries.rsa.itests.felix.ServerConfiguration;
 import org.apache.aries.rsa.itests.felix.TwoContainerPaxExam;
-import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Configuration;
 import org.ops4j.pax.exam.Option;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
 
 @RunWith(TwoContainerPaxExam.class)
 public class TestFindHook extends RsaTestBase {
@@ -70,11 +68,11 @@ public class TestFindHook extends RsaTestBase {
 
     @Test
     public void testFind() throws Exception {
-        await().until(() -> getEchoServices().size(), Matchers.equalTo(1));
+        await().until(this::numEchoServices, equalTo(1));
     }
 
-    private Collection<ServiceReference<EchoService>> getEchoServices() throws 
InvalidSyntaxException {
-        return context.getServiceReferences(EchoService.class, null);
+    private int numEchoServices() throws InvalidSyntaxException {
+        return context.getServiceReferences(EchoService.class, null).size();
     }
 
 }
diff --git 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
index 37cbcf7..ffd8e8c 100644
--- 
a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
+++ 
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
@@ -21,7 +21,7 @@ package org.apache.aries.rsa.topologymanager.importer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;;
+import java.util.Map;
 import java.util.Set;
 
 /**

Reply via email to