This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
The following commit(s) were added to refs/heads/master by this push: new db530ba ARIES-1780 - Improved design db530ba is described below commit db530ba5e43aabfd6e60328f9a05043fccbf32e8 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Sun Nov 10 16:32:21 2019 +0100 ARIES-1780 - Improved design --- discovery/zookeeper/bnd.bnd | 2 +- .../aries/rsa/discovery/zookeeper/Interest.java | 134 +++++++++++++++ .../rsa/discovery/zookeeper/InterestManager.java | 181 +++++---------------- .../zookeeper/PublishingEndpointListener.java | 8 +- .../zookeeper/{ => client}/ClientManager.java | 6 +- .../{ => client}/ZookeeperEndpointListener.java | 68 +++++--- .../ZookeeperEndpointRepository.java} | 22 ++- .../zookeeper/server/MyZooKeeperServerMain.java | 2 +- .../discovery/zookeeper/InterestManagerTest.java | 65 +++++--- .../zookeeper/PublishingEndpointListenerTest.java | 3 +- .../zookeeper/ZookeeperDiscoveryTest.java | 29 +++- .../zookeeper/{ => client}/ClientManagerTest.java | 5 +- .../ZookeeperEndpointRepositoryPathTest.java} | 11 +- .../rsa/itests/felix/tcp/TestDiscoveryExport.java | 6 +- .../rsa/itests/felix/tcp/TestDiscoveryImport.java | 4 +- .../aries/rsa/itests/felix/tcp/TestFindHook.java | 10 +- .../rsa/topologymanager/importer/MultiMap.java | 2 +- 17 files changed, 335 insertions(+), 223 deletions(-) diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd index 4f06393..b7d484e 100644 --- a/discovery/zookeeper/bnd.bnd +++ b/discovery/zookeeper/bnd.bnd @@ -16,4 +16,4 @@ # under the License. Provide-Capability: osgi.remoteserviceadmin.discovery;\ protocols:List<String>="zookeeper"; version:Version=1.1.0 -Export-Package: org.apache.aries.rsa.discovery.zookeeper \ No newline at end of file +Export-Package: org.apache.aries.rsa.discovery.zookeeper.client \ No newline at end of file diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Interest.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Interest.java new file mode 100644 index 0000000..a74324b --- /dev/null +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/Interest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.zookeeper; + +import static org.osgi.service.remoteserviceadmin.EndpointEventListener.ENDPOINT_LISTENER_SCOPE; + +import java.util.List; +import java.util.Optional; + +import org.apache.aries.rsa.util.StringPlus; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.osgi.service.remoteserviceadmin.EndpointListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("deprecation") +public class Interest { + private static final Logger LOG = LoggerFactory.getLogger(Interest.class); + + private final ServiceReference<?> sref; + private final List<String> scopes; + private final Object epListener; + + public Interest(ServiceReference<?> sref) { + this(sref, null); + } + + public Interest(ServiceReference<?> sref, Object epListener) { + this.sref = sref; + this.scopes = StringPlus.normalize(sref.getProperty(ENDPOINT_LISTENER_SCOPE)); + this.epListener = epListener; + } + + public List<String> getScopes() { + return scopes; + } + + public Object getEpListener() { + return epListener; + } + + public void notifyListener(EndpointEvent event) { + EndpointDescription endpoint = event.getEndpoint(); + Optional<String> currentScope = getFirstMatch(endpoint); + if (currentScope.isPresent()) { + LOG.debug("Matched {} against {}", endpoint, currentScope); + Object service = getEpListener(); + if (service instanceof EndpointEventListener) { + notifyEEListener(event, currentScope.get(), (EndpointEventListener)service); + } else if (service instanceof EndpointListener) { + notifyEListener(event, currentScope.get(), (EndpointListener)service); + } + } + } + + private Optional<String> getFirstMatch(EndpointDescription endpoint) { + return scopes.stream().filter(endpoint::matches).findFirst(); + } + + private void notifyEEListener(EndpointEvent event, String currentScope, EndpointEventListener listener) { + EndpointDescription endpoint = event.getEndpoint(); + LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint); + listener.endpointChanged(event, currentScope); + } + + private void notifyEListener(EndpointEvent event, String currentScope, EndpointListener listener) { + EndpointDescription endpoint = event.getEndpoint(); + LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint); + switch (event.getType()) { + case EndpointEvent.ADDED: + listener.endpointAdded(endpoint, currentScope); + break; + + case EndpointEvent.MODIFIED: + listener.endpointRemoved(endpoint, currentScope); + listener.endpointAdded(endpoint, currentScope); + break; + + case EndpointEvent.REMOVED: + listener.endpointRemoved(endpoint, currentScope); + break; + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((sref == null) ? 0 : sref.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Interest other = (Interest) obj; + if (sref == null) { + if (other.sref != null) + return false; + } else if (!sref.equals(other.sref)) + return false; + return true; + } + + @Override + public String toString() { + return "Interest [scopes=" + scopes + ", epListener=" + epListener.getClass() + "]"; + } + +} \ No newline at end of file diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java index 41bce86..d475c40 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java @@ -18,18 +18,19 @@ */ package org.apache.aries.rsa.discovery.zookeeper; -import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.aries.rsa.util.StringPlus; +import org.apache.aries.rsa.discovery.zookeeper.client.ClientManager; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointListener; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository; import org.osgi.framework.ServiceReference; +import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; import org.osgi.service.component.annotations.ReferenceCardinality; import org.osgi.service.component.annotations.ReferencePolicy; -import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; import org.osgi.service.remoteserviceadmin.EndpointEventListener; import org.osgi.service.remoteserviceadmin.EndpointListener; @@ -37,24 +38,38 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Manages the EndpointEventListeners and the scopes they are interested in. - * Establishes a listener with the repository to be called back on all changes in the repo. - * Events from repository are then forwarded to all interested EndpointEventListeners. + * Manages the {@link EndpointEventListener}s and the scopes they are interested in. + * Establishes a listener with the {@link ZookeeperEndpointRepository} to be called back on all changes in the repository. + * Events from repository are then forwarded to all interested {@link EndpointEventListener}s. */ -@SuppressWarnings({"deprecation", "rawtypes"}) -@Component(service = InterestManager.class) +@SuppressWarnings("deprecation") +@Component(immediate = true) public class InterestManager { private static final Logger LOG = LoggerFactory.getLogger(InterestManager.class); - private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<>(); + private Set<Interest> interests = ConcurrentHashMap.newKeySet(); - private final Map<ServiceReference, Interest> interests = new ConcurrentHashMap<>(); + @Reference + private ZookeeperEndpointRepository repository; - protected static class Interest { - List<String> scopes; - Object epListener; + private ZookeeperEndpointListener listener; + + public InterestManager() { } - + + public InterestManager(ZookeeperEndpointRepository repository) { + this.repository = repository; + } + + @Activate + public void activate() { + this.listener = repository.createListener(this::onEndpointChanged); + } + + private void onEndpointChanged(EndpointEvent event, String filter) { + interests.forEach(interest -> interest.notifyListener(event)); + } + @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC) public void bindEndpointEventListener(ServiceReference<EndpointEventListener> sref, EndpointEventListener epListener) { addInterest(sref, epListener); @@ -65,7 +80,7 @@ public class InterestManager { } public void unbindEndpointEventListener(ServiceReference<EndpointEventListener> sref) { - removeInterest(sref); + interests.remove(new Interest(sref)); } @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC) @@ -78,25 +93,13 @@ public class InterestManager { } public void unbindEndpointListener(ServiceReference<EndpointListener> sref) { - removeInterest(sref); + interests.remove(new Interest(sref)); } - private void removeInterest(ServiceReference<?> sref) { - if (interests.containsKey(sref)) { - List<String> scopes = getScopes(sref); - LOG.info("removing interests: {}", scopes); - interests.remove(sref); - } - } - - /** - * Read current endpoint stored at a znode - * - * @param path - * @return - */ - EndpointDescription read(String path) { - return nodes.get(path); + @Deactivate + public void close() { + this.listener.close(); + interests.clear(); } private void addInterest(ServiceReference<?> sref, Object epListener) { @@ -104,29 +107,15 @@ public class InterestManager { LOG.debug("Skipping our own EndpointEventListener"); return; } - List<String> scopes = getScopes(sref); - LOG.debug("adding Interests: {}", scopes); - - // get or create interest for given scope and add listener to it - Interest interest = interests.get(epListener); - if (interest == null) { - // create interest, add listener and start monitor - interest = new Interest(); - interest.epListener = epListener; - interest.scopes = scopes; - interests.put(sref, interest); - sendExistingEndpoints(scopes, epListener); - } else { - interest.scopes = scopes; - sendExistingEndpoints(scopes, epListener); - } + Interest interest = new Interest(sref, epListener); + update(interest); + listener.sendExistingEndpoints(interest); } - private void sendExistingEndpoints(List<String> scopes, Object epListener) { - for (EndpointDescription endpoint : nodes.values()) { - EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint); - notifyListener(event, scopes, epListener); - } + private void update(Interest interest) { + boolean present = interests.remove(interest); + LOG.debug("{} Interest: {}", present ? "Adding" : "Updating", interest); + interests.add(interest); } private static boolean isOurOwnEndpointEventListener(ServiceReference<?> endpointEventListener) { @@ -134,90 +123,8 @@ public class InterestManager { endpointEventListener.getProperty(ClientManager.DISCOVERY_ZOOKEEPER_ID))); } - public void handleRemoved(String path) { - EndpointDescription endpoint = nodes.remove(path); - if (endpoint != null) { - EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint); - endpointChanged(event); - } - } - - public void handleChanged(String path, EndpointDescription endpoint) { - EndpointDescription old = nodes.put(path, endpoint); - int type = old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED; - EndpointEvent event = new EndpointEvent(type, endpoint); - endpointChanged(event); - } - - private void endpointChanged(EndpointEvent event) { - for (Interest interest : interests.values()) { - notifyListener(event, interest.scopes, interest.epListener); - } - } - - private void notifyListener(EndpointEvent event, List<String> scopes, Object service) { - EndpointDescription endpoint = event.getEndpoint(); - String currentScope = getFirstMatch(scopes, endpoint); - if (currentScope == null) { - return; - } - LOG.debug("Matched {} against {}", endpoint, currentScope); - if (service instanceof EndpointEventListener) { - notifyEEListener(event, currentScope, (EndpointEventListener)service); - } else if (service instanceof EndpointListener) { - notifyEListener(event, currentScope, (EndpointListener)service); - } - } - - private String getFirstMatch(List<String> scopes, EndpointDescription endpoint) { - for (String scope : scopes) { - if (endpoint.matches(scope)) { - return scope; - } - } - return null; - } - - private void notifyEEListener(EndpointEvent event, String currentScope, EndpointEventListener listener) { - EndpointDescription endpoint = event.getEndpoint(); - LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint); - listener.endpointChanged(event, currentScope); - } - - private void notifyEListener(EndpointEvent event, String currentScope, EndpointListener listener) { - EndpointDescription endpoint = event.getEndpoint(); - LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint); - switch (event.getType()) { - case EndpointEvent.ADDED: - listener.endpointAdded(endpoint, currentScope); - break; - - case EndpointEvent.MODIFIED: - listener.endpointAdded(endpoint, currentScope); - listener.endpointRemoved(endpoint, currentScope); - break; - - case EndpointEvent.REMOVED: - listener.endpointRemoved(endpoint, currentScope); - break; - } - } - - @Deactivate - public synchronized void close() { - nodes.clear(); - interests.clear(); - } - - /** - * Only for test case! - */ - protected synchronized Map<ServiceReference, Interest> getInterests() { + Set<Interest> getInterests() { return interests; } - protected List<String> getScopes(ServiceReference<?> sref) { - return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)); - } - } diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java index 89e1556..24df4a9 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java @@ -21,6 +21,8 @@ package org.apache.aries.rsa.discovery.zookeeper; import java.util.Dictionary; import java.util.Hashtable; +import org.apache.aries.rsa.discovery.zookeeper.client.ClientManager; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; import org.osgi.framework.ServiceRegistration; @@ -35,8 +37,8 @@ import org.osgi.service.remoteserviceadmin.EndpointListener; import org.osgi.service.remoteserviceadmin.RemoteConstants; /** - * Listens for local EndpointEvents using old and new style listeners and publishes changes to - * the ZooKeeperEndpointRepository + * Listens for local {@link EndpointEvent}s using {@link EndpointEventListener} and old style {@link EndpointListener} + * and publishes changes to the {@link ZooKeeperEndpointRepository} */ @SuppressWarnings("deprecation") @Component(service = {}, immediate = true) @@ -45,7 +47,7 @@ public class PublishingEndpointListener implements EndpointEventListener, Endpoi private ServiceRegistration<?> listenerReg; @Reference - private ZookeeperEndpointPublisher repository; + private ZookeeperEndpointRepository repository; @Activate public void start(BundleContext bctx) { diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManager.java similarity index 94% rename from discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java rename to discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManager.java index e6bcf6f..e614ef4 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManager.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.aries.rsa.discovery.zookeeper; +package org.apache.aries.rsa.discovery.zookeeper.client; import static java.util.concurrent.CompletableFuture.runAsync; @@ -38,6 +38,10 @@ import org.osgi.service.metatype.annotations.ObjectClassDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Creates a Zookeeper client from a config. The Zookeeper service is published when the connection + * has been established and will be unpublished when the connection goes away. + */ @Component(// service = ClientManager.class, immediate = true, diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointListener.java similarity index 62% rename from discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java rename to discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointListener.java index 37fdada..3b7ddd4 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointListener.java @@ -16,11 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.aries.rsa.discovery.zookeeper; +package org.apache.aries.rsa.discovery.zookeeper.client; import java.io.ByteArrayInputStream; +import java.io.Closeable; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.aries.rsa.discovery.zookeeper.Interest; import org.apache.aries.rsa.spi.EndpointDescriptionParser; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; @@ -29,44 +33,45 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Reference; import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Component(immediate = true) -public class ZookeeperEndpointListener { +/** + * Listens to endpoint changes in Zookeeper and forwards changes in Endpoints to InterestManager. + */ +public class ZookeeperEndpointListener implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointListener.class); - @Reference + private Map<String, EndpointDescription> endpoints = new ConcurrentHashMap<>(); + private ZooKeeper zk; - @Reference private EndpointDescriptionParser parser; - @Reference - private InterestManager listener; - - @Reference - private ZookeeperEndpointPublisher publisher; + private EndpointEventListener listener; - public ZookeeperEndpointListener() { - } - - public ZookeeperEndpointListener(ZooKeeper zk, EndpointDescriptionParser parser, InterestManager listener) { + ZookeeperEndpointListener(ZooKeeper zk, EndpointDescriptionParser parser, EndpointEventListener listener) { this.zk = zk; this.parser = parser; this.listener = listener; - activate(); + watchRecursive(ZookeeperEndpointRepository.PATH_PREFIX); } - @Activate - public void activate() { - watchRecursive(ZookeeperEndpointPublisher.PATH_PREFIX); + @Override + public void close() { + // TODO unregister watchers + endpoints.clear(); } + public void sendExistingEndpoints(Interest interest) { + endpoints.values().stream() + .map(endpoint -> new EndpointEvent(EndpointEvent.ADDED, endpoint)) + .forEach(interest::notifyListener); + } + private void process(WatchedEvent event) { String path = event.getPath(); LOG.info("Received event {}", event); @@ -77,7 +82,7 @@ public class ZookeeperEndpointListener { watchRecursive(path); break; case NodeDeleted: - listener.handleRemoved(path); + onRemoved(path); break; default: break; @@ -89,7 +94,7 @@ public class ZookeeperEndpointListener { try { EndpointDescription endpoint = read(path); if (endpoint != null) { - listener.handleChanged(path, endpoint); + onChanged(path, endpoint); } List<String> children = zk.getChildren(path, this::process); if (children == null) { @@ -106,8 +111,23 @@ public class ZookeeperEndpointListener { LOG.info(e.getMessage(), e); } } + + private void onChanged(String path, EndpointDescription endpoint) { + EndpointDescription old = endpoints.put(path, endpoint); + int type = old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED; + EndpointEvent event = new EndpointEvent(type, endpoint); + listener.endpointChanged(event, null); + } + + private void onRemoved(String path) { + EndpointDescription endpoint = endpoints.remove(path); + if (endpoint != null) { + EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint); + listener.endpointChanged(event, null); + } + } - EndpointDescription read(String path) throws KeeperException, InterruptedException { + private EndpointDescription read(String path) throws KeeperException, InterruptedException { Stat stat = new Stat(); byte[] data = zk.getData(path, this::process, stat); if (data == null || data.length == 0) { diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepository.java similarity index 90% rename from discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java rename to discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepository.java index 7757396..fdb046f 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepository.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.aries.rsa.discovery.zookeeper; +package org.apache.aries.rsa.discovery.zookeeper.client; import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; @@ -38,13 +38,17 @@ import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Component(service = ZookeeperEndpointPublisher.class) -public class ZookeeperEndpointPublisher { +/** + * Is called by PublishingEndpointListener with local Endpoint changes and forward the changes to Zookeeper. + */ +@Component(service = ZookeeperEndpointRepository.class) +public class ZookeeperEndpointRepository { public static final String PATH_PREFIX = "/osgi/service_registry"; - private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointPublisher.class); + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointRepository.class); private final Map<Integer, String> typeNames = new HashMap<>(); @Reference @@ -53,14 +57,14 @@ public class ZookeeperEndpointPublisher { @Reference private EndpointDescriptionParser parser; - public ZookeeperEndpointPublisher() { + public ZookeeperEndpointRepository() { typeNames.put(EndpointEvent.ADDED, "added"); typeNames.put(EndpointEvent.MODIFIED, "modified"); typeNames.put(EndpointEvent.MODIFIED_ENDMATCH, "modified"); typeNames.put(EndpointEvent.REMOVED, "removed"); } - public ZookeeperEndpointPublisher(ZooKeeper zk, EndpointDescriptionParser parser) { + public ZookeeperEndpointRepository(ZooKeeper zk, EndpointDescriptionParser parser) { this(); this.zk = zk; this.parser = parser; @@ -74,6 +78,10 @@ public class ZookeeperEndpointPublisher { throw new IllegalStateException("Unable to create base path"); } } + + public ZookeeperEndpointListener createListener(EndpointEventListener listener) { + return new ZookeeperEndpointListener(zk, parser, listener); + } public void endpointChanged(EndpointEvent event) { try { @@ -134,7 +142,7 @@ public class ZookeeperEndpointPublisher { } private void createBasePath() throws KeeperException, InterruptedException { - String path = ZookeeperEndpointPublisher.getZooKeeperPath(""); + String path = ZookeeperEndpointRepository.getZooKeeperPath(""); createPath(path); } diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/MyZooKeeperServerMain.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/MyZooKeeperServerMain.java index 33097b1..bb7e5e3 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/MyZooKeeperServerMain.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/server/MyZooKeeperServerMain.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; class MyZooKeeperServerMain extends ZooKeeperServerMain implements ZookeeperServer { - private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStarter.class); + private static final Logger LOG = LoggerFactory.getLogger(MyZooKeeperServerMain.class); private QuorumPeerConfig config; diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java index 985f1b0..a701b50 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java @@ -18,57 +18,74 @@ */ package org.apache.aries.rsa.discovery.zookeeper; -import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; -import org.easymock.EasyMock; -import org.easymock.IMocksControl; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointListener; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; import org.osgi.framework.ServiceReference; import org.osgi.service.remoteserviceadmin.EndpointEventListener; +@RunWith(MockitoJUnitRunner.class) public class InterestManagerTest { + + @Mock + private ZookeeperEndpointRepository repository; + + @Mock + private EndpointEventListener epListener1; + + @Mock + private EndpointEventListener epListener2; + @Mock + private ZookeeperEndpointListener listener; + + @InjectMocks + private InterestManager im; + @Test public void testEndpointListenerTrackerCustomizer() { - IMocksControl c = EasyMock.createControl(); - ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)"); - ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)"); - EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class); - EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class); - - c.replay(); - - InterestManager im = new InterestManager(); + when(repository.createListener(Mockito.any())).thenReturn(listener); + im.activate(); + ServiceReference<EndpointEventListener> sref = createService("(objectClass=mine)"); + ServiceReference<EndpointEventListener> sref2 = createService("(objectClass=mine)"); // sref has no scope -> nothing should happen - assertEquals(0, im.getInterests().size()); + assertNumInterests(0); im.bindEndpointEventListener(sref, epListener1); - assertEquals(1, im.getInterests().size()); + assertNumInterests(1); im.bindEndpointEventListener(sref, epListener1); - assertEquals(1, im.getInterests().size()); + assertNumInterests(1); im.bindEndpointEventListener(sref2, epListener2); - assertEquals(2, im.getInterests().size()); + assertNumInterests(2); im.unbindEndpointEventListener(sref); - assertEquals(1, im.getInterests().size()); + assertNumInterests(1); im.unbindEndpointEventListener(sref); - assertEquals(1, im.getInterests().size()); + assertNumInterests(1); im.unbindEndpointEventListener(sref2); - assertEquals(0, im.getInterests().size()); + assertNumInterests(0); + } - c.verify(); + private void assertNumInterests(int expectedNum) { + assertEquals(expectedNum, im.getInterests().size()); } @SuppressWarnings("unchecked") - private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) { - ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class); - expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce(); - expect(sref.getProperty(ClientManager.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce(); + private ServiceReference<EndpointEventListener> createService(String scope) { + ServiceReference<EndpointEventListener> sref = Mockito.mock(ServiceReference.class); + when(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).thenReturn(scope); return sref; } diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java index 815c42a..fa8244f 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.verify; import java.util.HashMap; import java.util.Map; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository; import org.apache.zookeeper.KeeperException; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,7 +38,7 @@ import org.osgi.service.remoteserviceadmin.RemoteConstants; @RunWith(MockitoJUnitRunner.class) public class PublishingEndpointListenerTest { @Mock - ZookeeperEndpointPublisher repository; + ZookeeperEndpointRepository repository; @InjectMocks PublishingEndpointListener eli; diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java index 02f6093..8edccb1 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -36,10 +37,12 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParserImpl; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; @@ -57,6 +60,10 @@ import org.osgi.service.remoteserviceadmin.EndpointEvent; import org.osgi.service.remoteserviceadmin.EndpointEventListener; import org.osgi.service.remoteserviceadmin.RemoteConstants; +/** + * Tests a complete cycle of publishing an endpoint and getting notified + * including zookeeper. + */ @RunWith(MockitoJUnitRunner.class) public class ZookeeperDiscoveryTest { final Semaphore semConnected = new Semaphore(0); @@ -65,6 +72,8 @@ public class ZookeeperDiscoveryTest { private ZooKeeper zk; private ServerCnxnFactory factory; private List<EndpointEvent> events = new ArrayList<>(); + private EndpointDescriptionParserImpl parser = new EndpointDescriptionParserImpl(); + @Mock private ServiceReference<EndpointEventListener> sref; @@ -83,15 +92,14 @@ public class ZookeeperDiscoveryTest { @Test public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException { - EndpointDescriptionParserImpl parser = new EndpointDescriptionParserImpl(); - ZookeeperEndpointPublisher repository = new ZookeeperEndpointPublisher(zk, parser); + ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, parser); repository.activate(); - InterestManager im = new InterestManager(); + InterestManager im = new InterestManager(repository); + im.activate(); String scope = "("+ Constants.OBJECTCLASS +"=*)"; Mockito.when(sref.getProperty(Mockito.eq(EndpointEventListener.ENDPOINT_LISTENER_SCOPE))).thenReturn(scope); im.bindEndpointEventListener(sref, this::onEndpointChanged); - ZookeeperEndpointListener zklistener = new ZookeeperEndpointListener(zk, parser, im); assertThat(semConnected.tryAcquire(1, SECONDS), equalTo(true)); @@ -101,7 +109,7 @@ public class ZookeeperDiscoveryTest { assertThat(sem.tryAcquire(100, SECONDS), equalTo(true)); String path = "/osgi/service_registry/http:##test.de#service1"; - EndpointDescription ep2 = zklistener.read(path); + EndpointDescription ep2 = read(path); assertNotNull(ep2); repository.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint)); @@ -111,6 +119,17 @@ public class ZookeeperDiscoveryTest { assertThat(events.get(1).getType(), equalTo(EndpointEvent.REMOVED)); assertThat(events.get(0).getEndpoint(), equalTo(endpoint)); assertThat(events.get(1).getEndpoint(), equalTo(endpoint)); + im.close(); + } + + private EndpointDescription read(String path) throws KeeperException, InterruptedException { + Stat stat = new Stat(); + byte[] data = zk.getData(path, this::process, stat); + if (data == null || data.length == 0) { + return null; + } else { + return parser.readEndpoint(new ByteArrayInputStream(data)); + } } private void process(WatchedEvent event) { diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManagerTest.java similarity index 93% rename from discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java rename to discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManagerTest.java index 360bf88..9113d5e 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ClientManagerTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ClientManagerTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.aries.rsa.discovery.zookeeper; +package org.apache.aries.rsa.discovery.zookeeper.client; import static org.junit.Assert.assertEquals; @@ -25,7 +25,8 @@ import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import org.apache.aries.rsa.discovery.zookeeper.ClientManager.DiscoveryConfig; +import org.apache.aries.rsa.discovery.zookeeper.client.ClientManager; +import org.apache.aries.rsa.discovery.zookeeper.client.ClientManager.DiscoveryConfig; import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Rule; diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepositoryPathTest.java similarity index 65% rename from discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java rename to discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepositoryPathTest.java index 31441d3..ac50df7 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/client/ZookeeperEndpointRepositoryPathTest.java @@ -16,20 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.aries.rsa.discovery.zookeeper; +package org.apache.aries.rsa.discovery.zookeeper.client; import static org.junit.Assert.assertEquals; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository; import org.junit.Test; -public class ZookeeperEndpointPublisherPathTest { +public class ZookeeperEndpointRepositoryPathTest { @Test public void testGetZooKeeperPath() { - assertEquals(ZookeeperEndpointPublisher.PATH_PREFIX + '/' + "http:##org.example.Test", - ZookeeperEndpointPublisher.getZooKeeperPath("http://org.example.Test")); + assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + "http:##org.example.Test", + ZookeeperEndpointRepository.getZooKeeperPath("http://org.example.Test")); - assertEquals(ZookeeperEndpointPublisher.PATH_PREFIX, ZookeeperEndpointPublisher.getZooKeeperPath("")); + assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath("")); } } diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java index a24b140..774ec80 100644 --- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java +++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java @@ -24,7 +24,7 @@ import java.io.ByteArrayInputStream; import javax.inject.Inject; -import org.apache.aries.rsa.discovery.zookeeper.ZookeeperEndpointPublisher; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository; import org.apache.aries.rsa.examples.echotcp.api.EchoService; import org.apache.aries.rsa.itests.felix.RsaTestBase; import org.apache.aries.rsa.spi.DistributionProvider; @@ -75,7 +75,7 @@ public class TestDiscoveryExport extends RsaTestBase { private EndpointDescription getEndpoint() throws Exception { String endpointName = await("Node exists").until(this::getEndpointPath, Matchers.notNullValue()); - return getEndpointDescription(zookeeper, ZookeeperEndpointPublisher.PATH_PREFIX + "/" + endpointName); + return getEndpointDescription(zookeeper, ZookeeperEndpointRepository.PATH_PREFIX + "/" + endpointName); } private EndpointDescription getEndpointDescription(ZooKeeper zk, String endpointPath) @@ -86,7 +86,7 @@ public class TestDiscoveryExport extends RsaTestBase { } private String getEndpointPath() throws KeeperException, InterruptedException { - return zookeeper.getChildren(ZookeeperEndpointPublisher.PATH_PREFIX, false).stream() + return zookeeper.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, false).stream() .findFirst() .orElse(null); } diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java index 4aea83f..b819872 100644 --- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java +++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit; import javax.inject.Inject; -import org.apache.aries.rsa.discovery.zookeeper.ZookeeperEndpointPublisher; +import org.apache.aries.rsa.discovery.zookeeper.client.ZookeeperEndpointRepository; import org.apache.aries.rsa.itests.felix.RsaTestBase; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,7 +48,7 @@ import org.osgi.service.remoteserviceadmin.RemoteConstants; @RunWith(PaxExam.class) public class TestDiscoveryImport extends RsaTestBase { @Inject - ZookeeperEndpointPublisher publisher; + ZookeeperEndpointRepository publisher; @Inject BundleContext context; diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java index 09aa6fd..2590041 100644 --- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java +++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java @@ -20,9 +20,9 @@ package org.apache.aries.rsa.itests.felix.tcp; import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; import java.io.IOException; -import java.util.Collection; import javax.inject.Inject; @@ -30,14 +30,12 @@ import org.apache.aries.rsa.examples.echotcp.api.EchoService; import org.apache.aries.rsa.itests.felix.RsaTestBase; import org.apache.aries.rsa.itests.felix.ServerConfiguration; import org.apache.aries.rsa.itests.felix.TwoContainerPaxExam; -import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; import org.ops4j.pax.exam.Configuration; import org.ops4j.pax.exam.Option; import org.osgi.framework.BundleContext; import org.osgi.framework.InvalidSyntaxException; -import org.osgi.framework.ServiceReference; @RunWith(TwoContainerPaxExam.class) public class TestFindHook extends RsaTestBase { @@ -70,11 +68,11 @@ public class TestFindHook extends RsaTestBase { @Test public void testFind() throws Exception { - await().until(() -> getEchoServices().size(), Matchers.equalTo(1)); + await().until(this::numEchoServices, equalTo(1)); } - private Collection<ServiceReference<EchoService>> getEchoServices() throws InvalidSyntaxException { - return context.getServiceReferences(EchoService.class, null); + private int numEchoServices() throws InvalidSyntaxException { + return context.getServiceReferences(EchoService.class, null).size(); } } diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java index 37cbcf7..ffd8e8c 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java @@ -21,7 +21,7 @@ package org.apache.aries.rsa.topologymanager.importer; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Map;; +import java.util.Map; import java.util.Set; /**