http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/cxf-dsw/src/test/resources/test-resources/sd.xml ---------------------------------------------------------------------- diff --git a/cxf-dsw/src/test/resources/test-resources/sd.xml b/cxf-dsw/src/test/resources/test-resources/sd.xml new file mode 100644 index 0000000..c7cebfb --- /dev/null +++ b/cxf-dsw/src/test/resources/test-resources/sd.xml @@ -0,0 +1,8 @@ +<service-decorations xmlns="http://cxf.apache.org/xmlns/service-decoration/1.0.0"> + <service-decoration> + <match interface="org.acme.foo.*"> + <match-property name="test.prop" value="xyz"/> + <add-property name="test.too" value="ahaha" type="java.lang.String"/> + </match> + </service-decoration> +</service-decorations> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/cxf-dsw/src/test/resources/test-resources/sd0.xml ---------------------------------------------------------------------- diff --git a/cxf-dsw/src/test/resources/test-resources/sd0.xml b/cxf-dsw/src/test/resources/test-resources/sd0.xml new file mode 100644 index 0000000..0ad0ad1 --- /dev/null +++ b/cxf-dsw/src/test/resources/test-resources/sd0.xml @@ -0,0 +1,2 @@ +<service-decorations xmlns="http://cxf.apache.org/xmlns/service-decoration/1.0.0"> +</service-decorations> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/cxf-dsw/src/test/resources/test-resources/sd1.xml ---------------------------------------------------------------------- diff --git a/cxf-dsw/src/test/resources/test-resources/sd1.xml b/cxf-dsw/src/test/resources/test-resources/sd1.xml new file mode 100644 index 0000000..6a5e811 --- /dev/null +++ b/cxf-dsw/src/test/resources/test-resources/sd1.xml @@ -0,0 +1,8 @@ +<service-decorations xmlns="http://cxf.apache.org/xmlns/service-decoration/1.0.0"> + <service-decoration> + <match interface="org.test.A"> + <add-property name="A" value="B"/> + <add-property name="C" value="2" type="java.lang.Integer"/> + </match> + </service-decoration> +</service-decorations> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/cxf-dsw/src/test/resources/test-resources/sd2.xml ---------------------------------------------------------------------- diff --git a/cxf-dsw/src/test/resources/test-resources/sd2.xml b/cxf-dsw/src/test/resources/test-resources/sd2.xml new file mode 100644 index 0000000..fb6a93a --- /dev/null +++ b/cxf-dsw/src/test/resources/test-resources/sd2.xml @@ -0,0 +1,14 @@ +<service-decorations xmlns="http://cxf.apache.org/xmlns/service-decoration/1.0.0"> + <service-decoration> + <match interface="org.test.(B|C)"> + <match-property name="x" value="y"/> + <add-property name="bool" value="true" type="java.lang.Boolean"/> + </match> + </service-decoration> + <service-decoration> + <match interface="org.test.(B|C)"> + <match-property name="x" value="z"/> + <add-property name="bool" value="false" type="java.lang.Boolean"/> + </match> + </service-decoration> +</service-decorations> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/pom.xml ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/pom.xml b/discovery/distributed/cxf-discovery/pom.xml deleted file mode 100644 index b34a1db..0000000 --- a/discovery/distributed/cxf-discovery/pom.xml +++ /dev/null @@ -1,120 +0,0 @@ -<?xml version='1.0' encoding='UTF-8' ?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - <artifactId>cxf-dosgi-ri-discovery-distributed</artifactId> - <packaging>bundle</packaging> - <name>CXF DOSGi ZooKeeper-based Discovery Service Bundle</name> - <description>An implementation of the Distributed OSGi Discovery Service using ZooKeeper</description> - - <parent> - <groupId>org.apache.cxf.dosgi</groupId> - <artifactId>cxf-dosgi-ri-parent</artifactId> - <version>1.8-SNAPSHOT</version> - <relativePath>../../../parent/pom.xml</relativePath> - </parent> - - <properties> - <topDirectoryLocation>../../..</topDirectoryLocation> - </properties> - - <dependencies> - <dependency> - <groupId>org.osgi</groupId> - <artifactId>org.osgi.core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.osgi</groupId> - <artifactId>org.osgi.compendium</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>${zookeeper.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- We need the newer log4j as the one from zookeeper has some ugly dependencies --> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>${log4j.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.cxf.dosgi</groupId> - <artifactId>cxf-dosgi-ri-discovery-local</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymockclassextension</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <configuration> - <instructions> - <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName> - <Bundle-Activator>org.apache.cxf.dosgi.discovery.zookeeper.Activator</Bundle-Activator> - <Export-Package> - !* - </Export-Package> - </instructions> - </configuration> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java deleted file mode 100644 index cbbea58..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/Activator.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper; - -import java.util.Dictionary; -import java.util.Hashtable; - -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; -import org.osgi.service.cm.ManagedService; - -public class Activator implements BundleActivator { - - private ZooKeeperDiscovery zkd; - - public synchronized void start(BundleContext bc) throws Exception { - zkd = new ZooKeeperDiscovery(bc); - Dictionary<String, String> props = new Hashtable<String, String>(); - props.put(Constants.SERVICE_PID, "org.apache.cxf.dosgi.discovery.zookeeper"); - bc.registerService(ManagedService.class.getName(), zkd, props); - } - - public synchronized void stop(BundleContext bc) throws Exception { - zkd.stop(true); - } -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java deleted file mode 100644 index 33e2da4..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper; - -import java.io.IOException; -import java.util.Dictionary; - -import org.apache.cxf.dosgi.discovery.zookeeper.publish.PublishingEndpointListenerFactory; -import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.EndpointListenerTracker; -import org.apache.cxf.dosgi.discovery.zookeeper.subscribe.InterfaceMonitorManager; -import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.BundleContext; -import org.osgi.service.cm.ConfigurationException; -import org.osgi.service.cm.ManagedService; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.util.tracker.ServiceTracker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ZooKeeperDiscovery implements Watcher, ManagedService { - - public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper"; - - private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class); - - private final BundleContext bctx; - - private PublishingEndpointListenerFactory endpointListenerFactory; - private ServiceTracker<EndpointListener, EndpointListener> endpointListenerTracker; - private InterfaceMonitorManager imManager; - private ZooKeeper zk; - private boolean closed; - private boolean started; - - private Dictionary<String, ?> curConfiguration; - - public ZooKeeperDiscovery(BundleContext bctx) { - this.bctx = bctx; - } - - private void setDefaults(Dictionary<String, String> configuration) { - Utils.setDefault(configuration, "zookeeper.host", "localhost"); - Utils.setDefault(configuration, "zookeeper.port", "2181"); - Utils.setDefault(configuration, "zookeeper.timeout", "3000"); - } - - @SuppressWarnings("unchecked") - public synchronized void updated(Dictionary<String, ?> configuration) throws ConfigurationException { - LOG.debug("Received configuration update for Zookeeper Discovery: {}", configuration); - if (configuration != null) { - setDefaults((Dictionary<String, String>)configuration); - } - // make changes only if config actually changed, to prevent unnecessary ZooKeeper reconnections - if (!Utils.toMap(configuration).equals(Utils.toMap(curConfiguration))) { - stop(false); - curConfiguration = configuration; - // config is null if it doesn't exist, is being deleted or has not yet been loaded - // in which case we just stop running - if (configuration != null) { - createZooKeeper(configuration); - } - } - } - - private synchronized void start() { - if (closed) { - return; - } - if (started) { - // we must be re-entrant, i.e. can be called when already started - LOG.debug("ZookeeperDiscovery already started"); - return; - } - LOG.debug("starting ZookeeperDiscovery"); - endpointListenerFactory = new PublishingEndpointListenerFactory(zk, bctx); - endpointListenerFactory.start(); - imManager = new InterfaceMonitorManager(bctx, zk); - endpointListenerTracker = new EndpointListenerTracker(bctx, imManager); - endpointListenerTracker.open(); - started = true; - } - - public synchronized void stop(boolean close) { - if (started) { - LOG.debug("stopping ZookeeperDiscovery"); - } - started = false; - closed |= close; - if (endpointListenerFactory != null) { - endpointListenerFactory.stop(); - } - if (endpointListenerTracker != null) { - endpointListenerTracker.close(); - } - if (imManager != null) { - imManager.close(); - } - if (zk != null) { - try { - zk.close(); - } catch (InterruptedException e) { - LOG.error("Error closing ZooKeeper", e); - } - } - } - - private synchronized void createZooKeeper(Dictionary<String, ?> configuration) { - if (closed) { - return; - } - String host = configuration.get("zookeeper.host").toString(); - String port = configuration.get("zookeeper.port").toString(); - int timeout = Integer.parseInt(configuration.get("zookeeper.timeout").toString()); - LOG.debug("ZooKeeper configuration: connecting to {}:{} with timeout {}", - new Object[]{host, port, timeout}); - try { - zk = new ZooKeeper(host + ":" + port, timeout, this); - } catch (IOException e) { - LOG.error("Failed to start the ZooKeeper Discovery component.", e); - } - } - - /* Callback for ZooKeeper */ - public void process(WatchedEvent event) { - LOG.debug("got ZooKeeper event " + event); - switch (event.getState()) { - case SyncConnected: - LOG.info("Connection to ZooKeeper established"); - // this event can be triggered more than once in a row (e.g. after Disconnected event), - // so we must be re-entrant here - start(); - break; - - case Expired: - LOG.info("Connection to ZooKeeper expired. Trying to create a new connection"); - stop(false); - createZooKeeper(curConfiguration); - break; - - default: - // ignore other events - break; - } - } -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java deleted file mode 100644 index 5d46585..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/DiscoveryPlugin.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.publish; - -import java.util.Map; - -/** - * This interface allows transformation of service registration information before it is pushed into the ZooKeeper - * discovery system. - * It can be useful for situations where a host name or port number needs to be changed in cases where the host running - * the service is known differently from the outside to what the local Java process thinks it is. - * Extra service properties can also be added to the registration which can be useful to refine the remote service - * lookup process. <p/> - * - * DiscoveryPlugins use the OSGi WhiteBoard pattern. To add one to the system, register an instance under this interface - * with the OSGi Service Registry. All registered DiscoveryPlugin instances are visited and given a chance to - * process the information before it is pushed into ZooKeeper. <p/> - * - * Note that the changes made using this plugin do not modify the local service registration. - * - */ -public interface DiscoveryPlugin { - - /** - * Process service registration information. Plugins can change this information before it is published into the - * ZooKeeper discovery system. - * - * @param mutableProperties A map of service registration properties. The map is mutable and any changes to the map - * will be reflected in the ZooKeeper registration. - * @param endpointKey The key under which the service is registered in ZooKeeper. This key typically has the - * following format: hostname#port##context. While the actual value of this key is not actually used by the - * system (people can use it as a hint to understand where the service is located), the value <i>must</i> be - * unique for all services of a given type. - * @return The <tt>endpointKey</tt> value to be used. If there is no need to change this simply return the value - * of the <tt>endpointKey</tt> parameter. - */ - String process(Map<String, Object> mutableProperties, String endpointKey); -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java deleted file mode 100644 index c703b9f..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListener.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.publish; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; -import org.apache.cxf.dosgi.endpointdesc.EndpointDescriptionParser; -import org.apache.cxf.dosgi.endpointdesc.PropertiesMapper; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.BundleContext; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.util.tracker.ServiceTracker; -import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType; -import org.osgi.xmlns.rsa.v1_0.PropertyType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Listens for local Endpoints and publishes them to ZooKeeper. - */ -public class PublishingEndpointListener implements EndpointListener { - - private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class); - - private final ZooKeeper zk; - private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker; - private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>(); - private boolean closed; - - private final EndpointDescriptionParser endpointDescriptionParser; - - public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) { - this.zk = zk; - discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx, - DiscoveryPlugin.class, null); - discoveryPluginTracker.open(); - endpointDescriptionParser = new EndpointDescriptionParser(); - } - - public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { - LOG.info("Local EndpointDescription added: {}", endpoint); - - synchronized (endpoints) { - if (closed) { - return; - } - if (endpoints.contains(endpoint)) { - // TODO -> Should the published endpoint be updated here? - return; - } - - try { - addEndpoint(endpoint); - endpoints.add(endpoint); - } catch (Exception ex) { - LOG.error("Exception while processing the addition of an endpoint.", ex); - } - } - } - - private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException, - InterruptedException, IOException { - Collection<String> interfaces = endpoint.getInterfaces(); - String endpointKey = getKey(endpoint.getId()); - Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties()); - - // process plugins - Object[] plugins = discoveryPluginTracker.getServices(); - if (plugins != null) { - for (Object plugin : plugins) { - if (plugin instanceof DiscoveryPlugin) { - endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey); - } - } - } - - for (String name : interfaces) { - String path = Utils.getZooKeeperPath(name); - String fullPath = path + '/' + endpointKey; - LOG.debug("Creating ZooKeeper node: {}", fullPath); - ensurePath(path, zk); - List<PropertyType> propsOut = new PropertiesMapper().fromProps(props); - EndpointDescriptionType epd = new EndpointDescriptionType(); - epd.getProperty().addAll(propsOut); - byte[] epData = endpointDescriptionParser.getData(epd); - createEphemeralNode(fullPath, epData); - } - } - - private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException { - try { - zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } catch (NodeExistsException nee) { - // this sometimes happens after a ZooKeeper node dies and the ephemeral node - // that belonged to the old session was not yet deleted. We need to make our - // session the owner of the node so it won't get deleted automatically - - // we do this by deleting and recreating it ourselves. - LOG.info("node for endpoint already exists, recreating: {}", fullPath); - try { - zk.delete(fullPath, -1); - } catch (NoNodeException nne) { - // it's a race condition, but as long as it got deleted - it's ok - } - zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } - } - - public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { - LOG.info("Local EndpointDescription removed: {}", endpoint); - - synchronized (endpoints) { - if (closed) { - return; - } - if (!endpoints.contains(endpoint)) { - return; - } - - try { - removeEndpoint(endpoint); - endpoints.remove(endpoint); - } catch (Exception ex) { - LOG.error("Exception while processing the removal of an endpoint", ex); - } - } - } - - private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException { - Collection<String> interfaces = endpoint.getInterfaces(); - String endpointKey = getKey(endpoint.getId()); - - for (String name : interfaces) { - String path = Utils.getZooKeeperPath(name); - String fullPath = path + '/' + endpointKey; - LOG.debug("Removing ZooKeeper node: {}", fullPath); - try { - zk.delete(fullPath, -1); - } catch (Exception ex) { - LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired - } - } - } - - private static void ensurePath(String path, ZooKeeper zk) throws KeeperException, InterruptedException { - StringBuilder current = new StringBuilder(); - String[] parts = Utils.removeEmpty(path.split("/")); - for (String part : parts) { - current.append('/'); - current.append(part); - try { - zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (NodeExistsException nee) { - // it's not the first node with this path to ever exist - that's normal - } - } - } - - static String getKey(String endpoint) throws URISyntaxException { - URI uri = new URI(endpoint); - - StringBuilder sb = new StringBuilder(); - sb.append(uri.getHost()); - sb.append("#"); - sb.append(uri.getPort()); - sb.append("#"); - sb.append(uri.getPath().replace('/', '#')); - return sb.toString(); - } - - public void close() { - LOG.debug("closing - removing all endpoints"); - synchronized (endpoints) { - closed = true; - for (EndpointDescription endpoint : endpoints) { - try { - removeEndpoint(endpoint); - } catch (Exception ex) { - LOG.error("Exception while removing endpoint during close", ex); - } - } - endpoints.clear(); - } - discoveryPluginTracker.close(); - } -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java deleted file mode 100644 index c505bb4..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/publish/PublishingEndpointListenerFactory.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.publish; - -import java.util.ArrayList; -import java.util.Dictionary; -import java.util.Hashtable; -import java.util.List; - -import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery; -import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; -import org.osgi.framework.ServiceFactory; -import org.osgi.framework.ServiceRegistration; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.service.remoteserviceadmin.RemoteConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Creates local EndpointListeners that publish to ZooKeeper. - */ -public class PublishingEndpointListenerFactory implements ServiceFactory<PublishingEndpointListener> { - - private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class); - - private final BundleContext bctx; - private final ZooKeeper zk; - private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>(); - private ServiceRegistration serviceRegistration; - - public PublishingEndpointListenerFactory(ZooKeeper zk, BundleContext bctx) { - this.bctx = bctx; - this.zk = zk; - } - - public PublishingEndpointListener getService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr) { - LOG.debug("new EndpointListener from factory"); - synchronized (listeners) { - PublishingEndpointListener pel = new PublishingEndpointListener(zk, bctx); - listeners.add(pel); - return pel; - } - } - - public void ungetService(Bundle b, ServiceRegistration<PublishingEndpointListener> sr, - PublishingEndpointListener pel) { - LOG.debug("remove EndpointListener"); - synchronized (listeners) { - if (listeners.remove(pel)) { - pel.close(); - } - } - } - - public synchronized void start() { - Dictionary<String, String> props = new Hashtable<String, String>(); - props.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, - "(&(" + Constants.OBJECTCLASS + "=*)(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID - + "=" + Utils.getUUID(bctx) + "))"); - props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true"); - serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this, props); - } - - public synchronized void stop() { - if (serviceRegistration != null) { - serviceRegistration.unregister(); - serviceRegistration = null; - } - synchronized (listeners) { - for (PublishingEndpointListener pel : listeners) { - pel.close(); - } - listeners.clear(); - } - } - - /** - * Only for the test case! - */ - protected List<PublishingEndpointListener> getListeners() { - synchronized (listeners) { - return listeners; - } - } -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java deleted file mode 100644 index 4d0a25f..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/EndpointListenerTracker.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.subscribe; - -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.util.tracker.ServiceTracker; - -/** - * Tracks interest in EndpointListeners. Delegates to InterfaceMonitorManager to manage - * interest in the scopes of each EndpointListener. - */ -public class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> { - private final InterfaceMonitorManager imManager; - - public EndpointListenerTracker(BundleContext bctx, InterfaceMonitorManager imManager) { - super(bctx, EndpointListener.class, null); - this.imManager = imManager; - } - - @Override - public EndpointListener addingService(ServiceReference<EndpointListener> endpointListener) { - imManager.addInterest(endpointListener); - return null; - } - - @Override - public void modifiedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) { - // called when an EndpointListener updates its service properties, - // e.g. when its interest scope is expanded/reduced - imManager.addInterest(endpointListener); - } - - @Override - public void removedService(ServiceReference<EndpointListener> endpointListener, EndpointListener service) { - imManager.removeInterest(endpointListener); - } - -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java deleted file mode 100644 index 95277d3..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitor.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.subscribe; - -import java.io.ByteArrayInputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; -import org.apache.cxf.dosgi.endpointdesc.EndpointDescriptionParser; -import org.apache.cxf.dosgi.endpointdesc.PropertiesMapper; -import org.apache.zookeeper.AsyncCallback.StatCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Monitors ZooKeeper for changes in published endpoints. - * <p> - * Specifically, it monitors the node path associated with a given interface class, - * whose data is a serialized version of an EndpointDescription, and notifies an - * EndpointListener when changes are detected (which can then propagate the - * notification to other EndpointListeners with a matching scope). - * <p> - * Note that the EndpointListener is used here as a decoupling interface for - * convenience, and is not necessarily used according to its documented contract. - */ -public class InterfaceMonitor implements Watcher, StatCallback { - - private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class); - - private final String znode; - private final ZooKeeper zk; - private final EndpointListener endpointListener; - private final boolean recursive; - private volatile boolean closed; - - // This map reference changes, so don't synchronize on it - private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>(); - - private EndpointDescriptionParser parser; - - public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) { - this.zk = zk; - this.znode = Utils.getZooKeeperPath(objClass); - this.recursive = objClass == null || objClass.isEmpty(); - this.endpointListener = endpointListener; - this.parser = new EndpointDescriptionParser(); - LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]", - new Object[] {recursive ? "(recursive)" : "", scope, objClass}); - } - - /** - * Returns all endpoints that are currently known to this monitor. - * - * @return all endpoints that are currently known to this monitor - */ - public synchronized List<EndpointDescription> getEndpoints() { - return new ArrayList<EndpointDescription>(nodes.values()); - } - - public void start() { - watch(); - } - - private void watch() { - LOG.debug("registering a ZooKeeper.exists({}) callback", znode); - zk.exists(znode, this, this, null); - } - - /** - * Zookeeper Watcher interface callback. - */ - public void process(WatchedEvent event) { - LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event); - processDelta(); - } - - /** - * Zookeeper StatCallback interface callback. - */ - @SuppressWarnings("deprecation") - public void processResult(int rc, String path, Object ctx, Stat stat) { - LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc); - - switch (rc) { - case Code.Ok: - case Code.NoNode: - processDelta(); - return; - - case Code.SessionExpired: - case Code.NoAuth: - case Code.ConnectionLoss: - return; - - default: - watch(); - } - } - - private void processDelta() { - if (closed) { - return; - } - - if (zk.getState() != ZooKeeper.States.CONNECTED) { - LOG.debug("ZooKeeper connection was already closed! Not processing changed event."); - return; - } - - try { - if (zk.exists(znode, false) != null) { - zk.getChildren(znode, this); - refreshNodes(); - } else { - LOG.debug("znode {} doesn't exist -> not processing any changes", znode); - } - } catch (Exception e) { - if (zk.getState() != ZooKeeper.States.CONNECTED) { - LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery - } else { - LOG.error("Error getting ZooKeeper data.", e); - } - } - } - - public synchronized void close() { - closed = true; - for (EndpointDescription endpoint : nodes.values()) { - endpointListener.endpointRemoved(endpoint, null); - } - nodes.clear(); - } - - private synchronized void refreshNodes() { - if (closed) { - return; - } - LOG.info("Processing change on node: {}", znode); - - Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>(); - Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes); - processChildren(znode, newNodes, prevNodes); - - // whatever is left in prevNodes now has been removed from Discovery - LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values()); - for (EndpointDescription endpoint : prevNodes.values()) { - endpointListener.endpointRemoved(endpoint, null); - } - nodes = newNodes; - } - - /** - * Iterates through all child nodes of the given node and tries to find - * endpoints. If the recursive flag is set it also traverses into the child - * nodes. - * - * @return true if an endpoint was found and if the node therefore needs to - * be monitored for changes - */ - private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes, - Map<String, EndpointDescription> prevNodes) { - List<String> children; - try { - LOG.debug("Processing the children of {}", zn); - children = zk.getChildren(zn, false); - - boolean foundANode = false; - for (String child : children) { - String childZNode = zn + '/' + child; - EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode); - if (endpoint != null) { - EndpointDescription prevEndpoint = prevNodes.get(child); - LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: " - + endpoint.getProperties().values()); - newNodes.put(child, endpoint); - prevNodes.remove(child); - foundANode = true; - LOG.debug("Properties: {}", endpoint.getProperties()); - if (prevEndpoint == null) { - // This guy is new - endpointListener.endpointAdded(endpoint, null); - } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) { - // TODO - } - } - if (recursive && processChildren(childZNode, newNodes, prevNodes)) { - zk.getChildren(childZNode, this); - } - } - - return foundANode; - } catch (KeeperException e) { - LOG.error("Problem processing ZooKeeper node", e); - } catch (InterruptedException e) { - LOG.error("Problem processing ZooKeeper node", e); - } - return false; - } - - /** - * Retrieves data from the given node and parses it into an EndpointDescription. - * - * @param node a node path - * @return endpoint found in the node or null if no endpoint was found - */ - private EndpointDescription getEndpointDescriptionFromNode(String node) { - try { - Stat stat = zk.exists(node, false); - if (stat == null || stat.getDataLength() <= 0) { - return null; - } - byte[] data = zk.getData(node, false, null); - LOG.debug("Got data for node: {}", node); - - EndpointDescription endpoint = getFirstEnpointDescription(data); - if (endpoint != null) { - return endpoint; - } - LOG.warn("No Discovery information found for node: {}", node); - } catch (Exception e) { - LOG.error("Problem getting EndpointDescription from node " + node, e); - } - return null; - } - - public EndpointDescription getFirstEnpointDescription(byte[] data) { - List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data)); - if (elements.isEmpty()) { - return null; - } - Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty()); - return new EndpointDescription(props); - } -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java deleted file mode 100644 index 240e5ea..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/subscribe/InterfaceMonitorManager.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.subscribe; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery; -import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.EndpointListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.cxf.dosgi.discovery.local.util.Utils.matchFilter; - -/** - * Manages the EndpointListeners and the scopes they are interested in. - * For each scope with interested EndpointListeners an InterfaceMonitor is created. - * The InterfaceMonitor calls back when it detects added or removed external Endpoints. - * These events are then forwarded to all interested EndpointListeners. - */ -public class InterfaceMonitorManager { - - private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class); - - private final BundleContext bctx; - private final ZooKeeper zk; - // map of EndpointListeners and the scopes they are interested in - private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes = - new HashMap<ServiceReference<EndpointListener>, List<String>>(); - // map of scopes and their interest data - private final Map<String, Interest> interests = new HashMap<String, Interest>(); - - protected static class Interest { - List<ServiceReference<EndpointListener>> endpointListeners = - new CopyOnWriteArrayList<ServiceReference<EndpointListener>>(); - InterfaceMonitor monitor; - } - - public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) { - this.bctx = bctx; - this.zk = zk; - } - - public void addInterest(ServiceReference<EndpointListener> endpointListener) { - if (isOurOwnEndpointListener(endpointListener)) { - LOG.debug("Skipping our own EndpointListener"); - return; - } - - LOG.info("updating EndpointListener interests: {}", endpointListener); - if (LOG.isDebugEnabled()) { - LOG.debug("updated EndpointListener properties: {}", Utils.getProperties(endpointListener)); - } - for (String scope : Utils.getScopes(endpointListener)) { - String objClass = Utils.getObjectClass(scope); - LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass); - addInterest(endpointListener, scope, objClass); - } - } - - private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) { - return Boolean.parseBoolean(String.valueOf( - endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID))); - } - - @SuppressWarnings("unchecked") - public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener, - String scope, String objClass) { - // get or create interest for given scope and add listener to it - Interest interest = interests.get(scope); - if (interest == null) { - // create interest, add listener and start monitor - interest = new Interest(); - interests.put(scope, interest); - interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events - interest.monitor = createInterfaceMonitor(scope, objClass, interest); - interest.monitor.start(); - } else { - // interest already exists, so just add listener to it - if (!interest.endpointListeners.contains(endpointListener)) { - interest.endpointListeners.add(endpointListener); - } - // notify listener of all known endpoints for given scope - // (as EndpointListener contract requires of all added/modified listeners) - for (EndpointDescription endpoint : interest.monitor.getEndpoints()) { - notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener)); - } - } - - // add scope to listener's scopes list - List<String> scopes = endpointListenerScopes.get(endpointListener); - if (scopes == null) { - scopes = new ArrayList<String>(1); - endpointListenerScopes.put(endpointListener, scopes); - } - if (!scopes.contains(scope)) { - scopes.add(scope); - } - } - - public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) { - LOG.info("removing EndpointListener interests: {}", endpointListener); - List<String> scopes = endpointListenerScopes.get(endpointListener); - if (scopes == null) { - return; - } - - for (String scope : scopes) { - Interest interest = interests.get(scope); - if (interest != null) { - interest.endpointListeners.remove(endpointListener); - if (interest.endpointListeners.isEmpty()) { - interest.monitor.close(); - interests.remove(scope); - } - } - } - endpointListenerScopes.remove(endpointListener); - } - - protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) { - // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor - EndpointListener endpointListener = new EndpointListener() { - - public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { - notifyListeners(endpoint, scope, false, interest.endpointListeners); - } - - public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { - notifyListeners(endpoint, scope, true, interest.endpointListeners); - } - }; - return new InterfaceMonitor(zk, objClass, endpointListener, scope); - } - - private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded, - List<ServiceReference<EndpointListener>> endpointListeners) { - for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) { - EndpointListener service = bctx.getService(endpointListenerRef); - try { - EndpointListener endpointListener = (EndpointListener)service; - LOG.trace("matching {} against {}", endpoint, currentScope); - if (matchFilter(bctx, currentScope, endpoint)) { - LOG.debug("Matched {} against {}", endpoint, currentScope); - notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(), - endpointListener); - } - } finally { - if (service != null) { - bctx.ungetService(endpointListenerRef); - } - } - } - } - - private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded, - Bundle endpointListenerBundle, EndpointListener endpointListener) { - if (endpointListenerBundle == null) { - LOG.info("listening service was unregistered, ignoring"); - } else if (isAdded) { - LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle " - + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint); - endpointListener.endpointAdded(endpoint, currentScope); - } else { - LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle " - + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint); - endpointListener.endpointRemoved(endpoint, currentScope); - } - } - - public synchronized void close() { - for (Interest interest : interests.values()) { - interest.monitor.close(); - } - interests.clear(); - endpointListenerScopes.clear(); - } - - /** - * Only for test case! - */ - protected synchronized Map<String, Interest> getInterests() { - return interests; - } - - /** - * Only for test case! - */ - protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() { - return endpointListenerScopes; - } -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java b/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java deleted file mode 100644 index 5fcb111..0000000 --- a/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/util/Utils.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper.util; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Dictionary; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; -import org.osgi.service.remoteserviceadmin.EndpointListener; - -public final class Utils { - - static final String PATH_PREFIX = "/osgi/service_registry"; - static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*"); - - private Utils() { - // never constructed - } - - public static String getZooKeeperPath(String name) { - return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/'); - } - - /** - * Returns the value of a "string+" property as an array of strings. - * <p> - * A "string+" property can have a value which is either a string, - * an array of strings, or a collection of strings. - * <p> - * If the given value is not of one of the valid types, or is null, - * an empty array is returned. - * - * @param property a "string+" property value - * @return the property value as an array of strings, or an empty array - */ - public static String[] getStringPlusProperty(Object property) { - if (property instanceof String) { - return new String[] {(String)property}; - } else if (property instanceof String[]) { - return (String[])property; - } else if (property instanceof Collection) { - try { - @SuppressWarnings("unchecked") - Collection<String> strings = (Collection<String>)property; - return strings.toArray(new String[strings.size()]); - } catch (ArrayStoreException ase) { - // ignore collections with wrong type - } - } - return new String[0]; - } - - /** - * Removes nulls and empty strings from the given string array. - * - * @param strings an array of strings - * @return a new array containing the non-null and non-empty - * elements of the original array in the same order - */ - public static String[] removeEmpty(String[] strings) { - String[] result = new String[strings.length]; - int copied = 0; - for (String s : strings) { - if (s != null && !s.isEmpty()) { - result[copied++] = s; - } - } - return copied == result.length ? result : Arrays.copyOf(result, copied); - } - - public static String[] getScopes(ServiceReference<?> sref) { - return removeEmpty(getStringPlusProperty(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE))); - } - - // copied from the DSW OSGiUtils class - public static String getUUID(BundleContext bc) { - synchronized ("org.osgi.framework.uuid") { - String uuid = bc.getProperty("org.osgi.framework.uuid"); - if (uuid == null) { - uuid = UUID.randomUUID().toString(); - System.setProperty("org.osgi.framework.uuid", uuid); - } - return uuid; - } - } - - /** - * Puts the given key-value pair in the given dictionary if the key does not - * already exist in it or if its existing value is null. - * - * @param dict a dictionary - * @param key the key - * @param value the default value to set - */ - public static void setDefault(Dictionary<String, String> dict, String key, String value) { - if (dict.get(key) == null) { - dict.put(key, value); - } - } - - /** - * Converts the given Dictionary to a Map. - * - * @param dict a dictionary - * @param <K> the key type - * @param <V> the value type - * @return the converted map, or an empty map if the given dictionary is null - */ - public static <K, V> Map<K, V> toMap(Dictionary<K, V> dict) { - Map<K, V> map = new HashMap<K, V>(); - if (dict != null) { - Enumeration<K> keys = dict.keys(); - while (keys.hasMoreElements()) { - K key = keys.nextElement(); - map.put(key, dict.get(key)); - } - } - return map; - } - - public static String getObjectClass(String scope) { - Matcher m = OBJECTCLASS_PATTERN.matcher(scope); - return m.matches() ? m.group(1) : null; - } - - /** - * Returns a service's properties as a map. - * - * @param serviceReference a service reference - * @return the service's properties as a map - */ - public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) { - String[] keys = serviceReference.getPropertyKeys(); - Map<String, Object> props = new HashMap<String, Object>(keys.length); - for (String key : keys) { - Object val = serviceReference.getProperty(key); - props.put(key, val); - } - return props; - } -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/DiscoveryDriverTest.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/DiscoveryDriverTest.java b/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/DiscoveryDriverTest.java deleted file mode 100644 index 84470c2..0000000 --- a/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/DiscoveryDriverTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper; - -import junit.framework.TestCase; - -public class DiscoveryDriverTest extends TestCase { - - public void testDUMMY() { - assertTrue(true); - } - -// public void testDiscoveryDriver() throws Exception { -// BundleContext bc = getDefaultBundleContext(); -// Dictionary<String, String> props = getDefaultProps(); -// -// final StringBuilder sb = new StringBuilder(); -// DiscoveryDriver dd = new DiscoveryDriver(bc, props) { -// @Override -// ZooKeeper createZooKeeper() throws IOException { -// sb.append(zkHost + ":" + zkPort); -// ZooKeeper zk = EasyMock.createMock(ZooKeeper.class); -// EasyMock.replay(zk); -// return zk; -// } -// }; -// EasyMock.verify(bc); -// assertEquals("somehost:1910", sb.toString()); -// -// EasyMock.verify(dd.zooKeeper); -// EasyMock.reset(dd.zooKeeper); -// dd.zooKeeper.close(); -// EasyMock.expectLastCall(); -// EasyMock.replay(dd.zooKeeper); -// -// ServiceTracker st1 = EasyMock.createMock(ServiceTracker.class); -// st1.close(); -// EasyMock.expectLastCall(); -// EasyMock.replay(st1); -// ServiceTracker st2 = EasyMock.createMock(ServiceTracker.class); -// st2.close(); -// EasyMock.expectLastCall(); -// EasyMock.replay(st2); -// -// dd.lookupTracker = st1; -// dd.publicationTracker = st2; -// -// dd.destroy(); -// } -// -// private void expectServiceTrackerCalls(BundleContext bc, String objectClass) -// throws InvalidSyntaxException { -// Filter filter = EasyMock.createNiceMock(Filter.class); -// EasyMock.replay(filter); -// -// EasyMock.expect(bc.createFilter("(objectClass=" + objectClass + ")")) -// .andReturn(filter).anyTimes(); -// bc.addServiceListener((ServiceListener) EasyMock.anyObject(), -// EasyMock.eq("(objectClass=" + objectClass + ")")); -// EasyMock.expectLastCall().anyTimes(); -// EasyMock.expect(bc.getServiceReferences(objectClass, null)) -// .andReturn(new ServiceReference [0]).anyTimes(); -// } -// -// public void testProcessEvent() throws Exception { -// DiscoveryDriver db = new DiscoveryDriver(getDefaultBundleContext(), getDefaultProps()) { -// @Override -// ZooKeeper createZooKeeper() throws IOException { -// return null; -// } -// }; -// -// FindInZooKeeperCustomizer fc = new FindInZooKeeperCustomizer(null, null); -// List<InterfaceMonitor> l1 = new ArrayList<InterfaceMonitor>(); -// InterfaceMonitor dm1a = EasyMock.createMock(InterfaceMonitor.class); -// dm1a.process(); -// EasyMock.expectLastCall(); -// EasyMock.replay(dm1a); -// InterfaceMonitor dm1b = EasyMock.createMock(InterfaceMonitor.class); -// dm1b.process(); -// EasyMock.expectLastCall(); -// EasyMock.replay(dm1b); -// l1.add(dm1a); -// l1.add(dm1b); -// -// List<InterfaceMonitor> l2 = new ArrayList<InterfaceMonitor>(); -// InterfaceMonitor dm2 = EasyMock.createMock(InterfaceMonitor.class); -// dm2.process(); -// EasyMock.expectLastCall(); -// EasyMock.replay(dm2); -// l2.add(dm2); -// -// fc.watchers.put(EasyMock.createMock(DiscoveredServiceTracker.class), l1); -// fc.watchers.put(EasyMock.createMock(DiscoveredServiceTracker.class), l2); -// -// db.finderCustomizer = fc; -// db.process(null); -// -// EasyMock.verify(dm1a); -// EasyMock.verify(dm1b); -// EasyMock.verify(dm2); -// } -// -// private BundleContext getDefaultBundleContext() throws InvalidSyntaxException { -// BundleContext bc = EasyMock.createMock(BundleContext.class); -// expectServiceTrackerCalls(bc, ServicePublication.class.getName()); -// expectServiceTrackerCalls(bc, DiscoveredServiceTracker.class.getName()); -// EasyMock.replay(bc); -// return bc; -// } -// -// private Dictionary<String, String> getDefaultProps() { -// Dictionary<String, String> props = new Hashtable<String, String>(); -// props.put("zookeeper.host", "somehost"); -// props.put("zookeeper.port", "1910"); -// props.put("zookeeper.timeout", "1500"); -// return props; -// } -} http://git-wip-us.apache.org/repos/asf/cxf-dosgi/blob/1425743f/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/FindInZooKeeperCustomizerTest.java ---------------------------------------------------------------------- diff --git a/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/FindInZooKeeperCustomizerTest.java b/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/FindInZooKeeperCustomizerTest.java deleted file mode 100644 index cb2180b..0000000 --- a/discovery/distributed/cxf-discovery/src/test/java/org/apache/cxf/dosgi/discovery/zookeeper/FindInZooKeeperCustomizerTest.java +++ /dev/null @@ -1,301 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.dosgi.discovery.zookeeper; - -import junit.framework.TestCase; - -public class FindInZooKeeperCustomizerTest extends TestCase { - - public void testDUMMY() { - assertTrue(true); - } - -// public void testAddingServiceInterface() { -// DiscoveredServiceTracker dst = new DiscoveredServiceTracker() { -// public void serviceChanged(DiscoveredServiceNotification dsn) {} -// }; -// -// ServiceReference sr = EasyMock.createMock(ServiceReference.class); -// EasyMock.expect(sr.getProperty(DiscoveredServiceTracker.INTERFACE_MATCH_CRITERIA)) -// .andReturn(Collections.singleton(String.class.getName())); -// EasyMock.expect(sr.getProperty(DiscoveredServiceTracker.FILTER_MATCH_CRITERIA)) -// .andReturn(null); -// EasyMock.replay(sr); -// -// DiscoveredServiceTracker dst2 = new DiscoveredServiceTracker() { -// public void serviceChanged(DiscoveredServiceNotification dsn) {} -// }; -// -// ServiceReference sr2 = EasyMock.createMock(ServiceReference.class); -// EasyMock.expect(sr2.getProperty(DiscoveredServiceTracker.INTERFACE_MATCH_CRITERIA)) -// .andReturn(Arrays.asList(Integer.class.getName(), Comparable.class.getName())); -// EasyMock.expect(sr2.getProperty(DiscoveredServiceTracker.FILTER_MATCH_CRITERIA)) -// .andReturn(null); -// EasyMock.replay(sr2); -// -// BundleContext bc = EasyMock.createMock(BundleContext.class); -// EasyMock.expect(bc.getService(sr)).andReturn(dst); -// EasyMock.expect(bc.getService(sr2)).andReturn(dst2); -// EasyMock.replay(bc); -// -// ZooKeeper zk = EasyMock.createMock(ZooKeeper.class); -// zkExpectExists(zk, String.class.getName()); -// zkExpectExists(zk, Integer.class.getName()); -// zkExpectExists(zk, Comparable.class.getName()); -// EasyMock.expectLastCall(); -// EasyMock.replay(zk); -// -// FindInZooKeeperCustomizer fc = new FindInZooKeeperCustomizer(bc, zk); -// -// // --------------------------------------------------------------- -// // Test the addingService APIs -// // --------------------------------------------------------------- -// -// assertEquals("Precondition failed", 0, fc.watchers.size()); -// fc.addingService(sr); -// assertEquals(1, fc.watchers.size()); -// -// DiscoveredServiceTracker key = fc.watchers.keySet().iterator().next(); -// assertSame(dst, key); -// List<InterfaceMonitor> dmList = fc.watchers.get(key); -// assertEquals(1, dmList.size()); -// InterfaceMonitor dm = dmList.iterator().next(); -// assertNotNull(dm.listener); -// assertSame(zk, dm.zookeeper); -// assertEquals(Utils.getZooKeeperPath(String.class.getName()), dm.znode); -// -// assertEquals("Precondition failed", 1, fc.watchers.size()); -// fc.addingService(sr2); -// assertEquals(2, fc.watchers.size()); -// -// assertTrue(fc.watchers.containsKey(dst)); -// assertTrue(fc.watchers.containsKey(dst2)); -// assertEquals(dmList, fc.watchers.get(dst)); -// List<InterfaceMonitor> dmList2 = fc.watchers.get(dst2); -// assertEquals(2, dmList2.size()); -// -// Set<String> actual = new HashSet<String>(); -// for (InterfaceMonitor im : dmList2) { -// actual.add(im.znode); -// } -// Set<String> expected = new HashSet<String>(Arrays.asList( -// Utils.getZooKeeperPath(Integer.class.getName()), -// Utils.getZooKeeperPath(Comparable.class.getName()))); -// assertEquals(expected, actual); -// -// EasyMock.verify(zk); -// -// // --------------------------------------------------------------- -// // Test the modifiedService APIs -// // --------------------------------------------------------------- -// EasyMock.reset(zk); -// zkExpectExists(zk, List.class.getName()); -// EasyMock.replay(zk); -// -// EasyMock.reset(sr); -// EasyMock.expect(sr.getProperty(DiscoveredServiceTracker.INTERFACE_MATCH_CRITERIA)) -// .andReturn(Collections.singleton(List.class.getName())); -// EasyMock.expect(sr.getProperty(DiscoveredServiceTracker.FILTER_MATCH_CRITERIA)) -// .andReturn(null); -// EasyMock.replay(sr); -// -// assertEquals("Precondition failed", 2, fc.watchers.size()); -// fc.modifiedService(sr, dst); -// assertEquals("Precondition failed", 2, fc.watchers.size()); -// -// assertTrue(fc.watchers.containsKey(dst)); -// assertTrue(fc.watchers.containsKey(dst2)); -// assertEquals(dmList2, fc.watchers.get(dst2)); -// List<InterfaceMonitor> dmList3 = fc.watchers.get(dst); -// assertEquals(1, dmList3.size()); -// assertEquals(Utils.getZooKeeperPath(List.class.getName()), dmList3.iterator().next().znode); -// -// EasyMock.verify(zk); -// -// // --------------------------------------------------------------- -// // Test the removedService APIs -// // --------------------------------------------------------------- -// EasyMock.reset(zk); -// EasyMock.replay(zk); -// -// assertEquals("Precondition failed", 2, fc.watchers.size()); -// fc.removedService(sr2, dst2); -// assertEquals("Precondition failed", 1, fc.watchers.size()); -// -// assertEquals(dmList3, fc.watchers.get(dst)); -// assertNull(fc.watchers.get(dst2)); -// -// EasyMock.verify(zk); -// } -// -// public void testAddingServiceFilter() { -// DiscoveredServiceTracker dst = new DiscoveredServiceTracker() { -// public void serviceChanged(DiscoveredServiceNotification dsn) {} -// }; -// -// ServiceReference sr = EasyMock.createMock(ServiceReference.class); -// EasyMock.expect(sr.getProperty(DiscoveredServiceTracker.INTERFACE_MATCH_CRITERIA)) -// .andReturn(null); -// Set<String> stringFilter = Collections.singleton("(objectClass=java.lang.String)"); -// EasyMock.expect(sr.getProperty(DiscoveredServiceTracker.FILTER_MATCH_CRITERIA)) -// .andReturn(stringFilter); -// EasyMock.replay(sr); -// -// DiscoveredServiceTracker dst2 = new DiscoveredServiceTracker() { -// public void serviceChanged(DiscoveredServiceNotification dsn) {} -// }; -// -// ServiceReference sr2 = EasyMock.createMock(ServiceReference.class); -// EasyMock.expect(sr2.getProperty(DiscoveredServiceTracker.INTERFACE_MATCH_CRITERIA)) -// .andReturn(null); -// List<String> combinedFilter = -// Arrays.asList("(objectClass=java.lang.Integer)", "(objectClass=java.lang.Comparable)"); -// EasyMock.expect(sr2.getProperty(DiscoveredServiceTracker.FILTER_MATCH_CRITERIA)) -// .andReturn(combinedFilter); -// EasyMock.replay(sr2); -// -// BundleContext bc = EasyMock.createMock(BundleContext.class); -// EasyMock.expect(bc.getService(sr)).andReturn(dst); -// EasyMock.expect(bc.getService(sr2)).andReturn(dst2); -// EasyMock.replay(bc); -// -// ZooKeeper zk = EasyMock.createMock(ZooKeeper.class); -// zkExpectExists(zk, String.class.getName()); -// zkExpectExists(zk, Integer.class.getName()); -// zkExpectExists(zk, Comparable.class.getName()); -// EasyMock.expectLastCall(); -// EasyMock.replay(zk); -// -// FindInZooKeeperCustomizer fc = new FindInZooKeeperCustomizer(bc, zk); -// -// // --------------------------------------------------------------- -// // Test the addingService APIs -// // --------------------------------------------------------------- -// -// assertEquals("Precondition failed", 0, fc.watchers.size()); -// fc.addingService(sr); -// assertEquals(1, fc.watchers.size()); -// -// DiscoveredServiceTracker key = fc.watchers.keySet().iterator().next(); -// assertSame(dst, key); -// List<InterfaceMonitor> dmList = fc.watchers.get(key); -// assertEquals(1, dmList.size()); -// InterfaceMonitor dm = dmList.iterator().next(); -// assertNotNull(dm.listener); -// assertSame(zk, dm.zookeeper); -// assertEquals(Utils.getZooKeeperPath(String.class.getName()), dm.znode); -// -// assertEquals("Precondition failed", 1, fc.watchers.size()); -// fc.addingService(sr2); -// assertEquals(2, fc.watchers.size()); -// -// assertTrue(fc.watchers.containsKey(dst)); -// assertTrue(fc.watchers.containsKey(dst2)); -// assertEquals(dmList, fc.watchers.get(dst)); -// List<InterfaceMonitor> dmList2 = fc.watchers.get(dst2); -// assertEquals(2, dmList2.size()); -// Set<String> actual = new HashSet<String>(); -// for (InterfaceMonitor im : dmList2) { -// actual.add(im.znode); -// } -// Set<String> expected = new HashSet<String>(Arrays.asList( -// Utils.getZooKeeperPath(Integer.class.getName()), -// Utils.getZooKeeperPath(Comparable.class.getName()))); -// assertEquals(expected, actual); -// -// EasyMock.verify(zk); -// -// // --------------------------------------------------------------- -// // Test the modifiedService APIs -// // --------------------------------------------------------------- -// EasyMock.reset(zk); -// zkExpectExists(zk, List.class.getName()); -// EasyMock.replay(zk); -// -// EasyMock.reset(sr); -// Set<String> listFilter = Collections.singleton("(objectClass=java.util.List)"); -// EasyMock.expect(sr.getProperty(DiscoveredServiceTracker.INTERFACE_MATCH_CRITERIA)) -// .andReturn(null); -// EasyMock.expect(sr.getProperty(DiscoveredServiceTracker.FILTER_MATCH_CRITERIA)) -// .andReturn(listFilter); -// EasyMock.replay(sr); -// -// assertEquals("Precondition failed", 2, fc.watchers.size()); -// fc.modifiedService(sr, dst); -// assertEquals("Precondition failed", 2, fc.watchers.size()); -// -// assertTrue(fc.watchers.containsKey(dst)); -// assertTrue(fc.watchers.containsKey(dst2)); -// assertEquals(dmList2, fc.watchers.get(dst2)); -// List<InterfaceMonitor> dmList3 = fc.watchers.get(dst); -// assertEquals(1, dmList3.size()); -// assertEquals(Utils.getZooKeeperPath(List.class.getName()), dmList3.iterator().next().znode); -// -// EasyMock.verify(zk); -// -// // --------------------------------------------------------------- -// // Test the removedService APIs -// // --------------------------------------------------------------- -// EasyMock.reset(zk); -// EasyMock.replay(zk); -// -// assertEquals("Precondition failed", 2, fc.watchers.size()); -// fc.removedService(sr2, dst2); -// assertEquals("Precondition failed", 1, fc.watchers.size()); -// -// assertEquals(dmList3, fc.watchers.get(dst)); -// assertNull(fc.watchers.get(dst2)); -// -// EasyMock.verify(zk); -// } -// -// public void testGetInterfacesFromFilter() { -// testGetInterfacesFromFilter("objectClass=org.apache_2.Some$FunnyClass", -// "org.apache_2.Some$FunnyClass"); -// testGetInterfacesFromFilter("(&(a=b)(objectClass = org.acme.Q)", -// "org.acme.Q"); -// testGetInterfacesFromFilter("(&(objectClassIdentifier=b)(objectClass = org.acme.Q)", -// "org.acme.Q"); -// testGetInterfacesFromFilter("(|(OBJECTCLASS= X )(objectclass = Y)", -// "X", "Y"); -// testGetInterfacesFromFilter(new String[] {"(objectClass=X)", "(objectClass=Y)"}, -// "X", "Y"); -// } -// -// private void testGetInterfacesFromFilter(String filter, String ... interfaces) { -// testGetInterfacesFromFilter(new String[] {filter}, interfaces); -// } -// -// private void testGetInterfacesFromFilter(String[] filters, String ... interfaces) { -// FindInZooKeeperCustomizer.getInterfacesFromFilter(Arrays.asList(filters)); -// } -// -// private void zkExpectExists(ZooKeeper zk, String className) { -// zk.exists(EasyMock.eq(Utils.getZooKeeperPath(className)), -// (Watcher) EasyMock.anyObject(), -// (StatCallback) EasyMock.anyObject(), EasyMock.isNull()); -// EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { -// public Object answer() throws Throwable { -// assertEquals(EasyMock.getCurrentArguments()[1], -// EasyMock.getCurrentArguments()[2]); -// return null; -// } -// }); -// } -}