This is an automated email from the ASF dual-hosted git repository. amichair pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
commit c27b38fa5b2a160bf0064c1634b34556bc15f53f Author: Amichai Rothman <[email protected]> AuthorDate: Sun Mar 22 00:47:32 2026 +0200 ARIES-2211 Add EventListenerBridge for legacy EndpointListener support --- .../java/org/apache/aries/rsa/core/Activator.java | 13 +- .../apache/aries/rsa/core/EventListenerBridge.java | 375 +++++++++++++++++++++ 2 files changed, 387 insertions(+), 1 deletion(-) diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/Activator.java b/rsa/src/main/java/org/apache/aries/rsa/core/Activator.java index d943873f..5ffd5408 100644 --- a/rsa/src/main/java/org/apache/aries/rsa/core/Activator.java +++ b/rsa/src/main/java/org/apache/aries/rsa/core/Activator.java @@ -33,14 +33,25 @@ import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin; public class Activator implements BundleActivator { private DistributionProviderTracker tracker; + private EventListenerBridge bridge; public void start(BundleContext bundlecontext) throws Exception { tracker = new DistributionProviderTracker(bundlecontext); tracker.open(); + // enable optional event listener bridge for backwards compatibility + if (Boolean.getBoolean("org.apache.aries.rsa.bridge")) { + bridge = new EventListenerBridge(bundlecontext); + bridge.start(); + } } public void stop(BundleContext context) throws Exception { - tracker.close(); + if (tracker != null) { + tracker.close(); + } + if (bridge != null) { + bridge.stop(); + } } } diff --git a/rsa/src/main/java/org/apache/aries/rsa/core/EventListenerBridge.java b/rsa/src/main/java/org/apache/aries/rsa/core/EventListenerBridge.java new file mode 100644 index 00000000..1030d0cf --- /dev/null +++ b/rsa/src/main/java/org/apache/aries/rsa/core/EventListenerBridge.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.core; + +import org.apache.aries.rsa.util.StringPlus; +import org.osgi.framework.*; +import org.osgi.framework.hooks.service.ListenerHook; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.osgi.service.remoteserviceadmin.EndpointListener; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; + +/** + * This service acts as a bridge between those consumers/producers of endpoint + * events that support only the deprecated {@link EndpointListener} interface and + * those that support only the newer {@link EndpointEventListener} interface. + * <p> + * According to the RSA specification, all actors must be backward-compatible + * with the old interface - they must listen to both and notify both, with the + * caveat that if a listener implements both interfaces, notifiers should only + * send it events using the newer API in order to prevent event duplication. + * <p> + * Components in the Aries RSA implementation now only support the newer + * interface natively, as inter-compatibility with components using the old + * interface are rarely needed. However, for spec compatibility, TCK (compliance) + * tests, and those rare occasions - this bridge fills in the gap when needed. + * <p> + * Consumer (listener) and producer (notifier) implementations may support + * either only the old interface, or only the new interface, or both. + * <p> + * If they support both, they can directly notify peers of all the three types, + * and, trivially, new-only or old-only components already work with equivalent + * peers. So the only cases that need to be bridged are a pair of new-only and + * old-only peers that need to communicate with each other. Handling any other + * case via the bridge could produce duplicate events and must be avoided. + * <p> + * The bridge is implemented using four mechanisms: + * <ol> + * <li>A {@link ServiceListener} keeps track of all registered services + * implementing {@code EndpointListener} (old consumers) and + * {@code EndpointEventListener} (new consumers), and ignores those + * that implement both. + * </li> + * <li>A {@link ListenerHook} keeps track of all registered + * ServiceListeners (including ServiceTrackers) that are looking + * for services implementing {@code EndpointListener} (old producers) + * or {@code EndpointEventListener} (new producers). + * </li> + * <li>A registered service that implements both interfaces (our own + * listener/consumer) that is backed by a {@link ServiceFactory} + * that gives each calling bundle (producer bundle) its own + * {@code Adapter} instance. + * </li> + * <li> + * An {@link Adapter Adapter} service object (per bundle) that + * implements both interfaces, and depending on the interface types + * supported by the producer bundle associated with it, either does + * nothing (if the producer supports both interfaces or neither + * of them) or converts invocations of the one interface supported + * by the producer into a notification sent to all consumers of the + * opposite type (old producer to all new consumers or new producer + * to all old consumers). + * </li> + * </ol> + * <p> + * Note that due to the limitations of {@code ListenerHook}, the adapter + * conversion policy is determined per bundle and not per individual producer + * within the bundle. This works as long as the (reasonable) assumption is + * made that all producers within the same bundle implement the same + * interface version (or both of them). + * + * @deprecated this class exists only for spec-required backwards compatibility + * with the {@code EndpointListener} interface, which is itself deprecated + * in the spec. It will be removed when {@code EndpointListener} is removed + * from the spec. Users of code or libraries that still use {@code EventListener} + * exclusively are urged to upgrade to use of {@code EndpointEventListener} + * rather than relying on this bridge moving forward. + */ +@SuppressWarnings("deprecation") +public class EventListenerBridge implements ServiceListener, ListenerHook { + + /** + * An adapter service that can convert invocation of one of the endpoint + * listener interfaces to the other, based on the producer interface types. + * <p> + * Note that it will only dispatch events if the producer supports exactly one + * type of listener (if it supports both or none, it must not be bridged). + */ + private class Adapter implements EndpointListener, EndpointEventListener { + + long bundleId; + + /** + * Creates an adapter service instance for the given bundle's producer types. + */ + Adapter(long bundleId) { + this.bundleId = bundleId; + } + + @Override + public void endpointChanged(EndpointEvent event, String filter) { + if (producers.get(bundleId) == ENDPOINT_EVENT_LISTENER) { + dispatchToOld(event); + } + } + + @Override + public void endpointAdded(EndpointDescription endpoint, String filter) { + if (producers.get(bundleId) == ENDPOINT_LISTENER) { + dispatchToNew(new EndpointEvent(EndpointEvent.ADDED, endpoint)); + } + } + + @Override + public void endpointRemoved(EndpointDescription endpoint, String filter) { + if (producers.get(bundleId) == ENDPOINT_LISTENER) { + dispatchToNew(new EndpointEvent(EndpointEvent.REMOVED, endpoint)); + } + } + } + + private class AdapterServiceFactory implements ServiceFactory<Adapter> { + @Override + public Adapter getService(Bundle bundle, ServiceRegistration<Adapter> reg) { + // provide a new adapter instance per producer bundle, according to its supported types + return new Adapter(bundle.getBundleId()); + } + + @Override + public void ungetService(Bundle bundle, ServiceRegistration<Adapter> reg, Adapter service) { + } + }; + + // bitmask of supported listener types + private static final int + ENDPOINT_LISTENER = 1, + ENDPOINT_EVENT_LISTENER = 2; + + private static final String + OBJECT_CLASS_FILTER_PREFIX = "(" + Constants.OBJECTCLASS + "=", + OWN_LISTENER_PROP = EventListenerBridge.class.getName(), + ENDPOINT_LISTENER_CLASS_NAME = EndpointListener.class.getName(), + ENDPOINT_EVENT_LISTENER_CLASS_NAME = EndpointEventListener.class.getName(), + SERVICE_LISTENER_FILTER = "(|" + getObjectClass(EndpointListener.class) + + getObjectClass(EndpointEventListener.class) + "(" + OWN_LISTENER_PROP + "=true))"; + + private final Set<ServiceReference<EndpointListener>> oldConsumers = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set<ServiceReference<EndpointEventListener>> newConsumers = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Map<Long, Integer> producers = new ConcurrentHashMap<>(); // bundleId to producer types bitmap + + private final BundleContext context; + private ServiceRegistration<ServiceFactory<Adapter>> factoryRegistration; + private ServiceRegistration<ListenerHook> hookRegistration; + + public EventListenerBridge(BundleContext context) { + this.context = context; + } + + private static String getObjectClass(Class<?> cls) { + return OBJECT_CLASS_FILTER_PREFIX + (cls == null ? "*" : cls.getName()) + ")"; + } + + private static Collection<String> getObjectClasses(String filter) { + if (filter == null) + return Collections.emptyList(); + Collection<String> classes = new ArrayList<>(2); + int i = 0; + while (true) { + i = filter.indexOf(OBJECT_CLASS_FILTER_PREFIX, i); + if (i < 0) { + return classes; + } + i += OBJECT_CLASS_FILTER_PREFIX.length(); + int j = filter.indexOf(')', i); + classes.add(filter.substring(i, j)); + i = j; + } + } + + private String match(ServiceReference<?> sref, EndpointDescription endpoint) { + List<String> filters = StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)); + for (String filter : filters) { + try { + Filter f = context.createFilter(filter); + if (f.matches(endpoint.getProperties())) + return filter; + } catch (InvalidSyntaxException e) { + throw new RuntimeException(e); + } + } + return null; + } + + @SuppressWarnings("unchecked") + public void start() throws InvalidSyntaxException { + // ServiceListener to track consumers + context.addServiceListener(this, SERVICE_LISTENER_FILTER); + // ListenerHook to track what producers are looking for + hookRegistration = context.registerService(ListenerHook.class, this, null); + // our listener, backed by a ServiceFactory, to be used by producers + factoryRegistration = (ServiceRegistration<ServiceFactory<Adapter>>) + context.registerService( + new String[] { ENDPOINT_LISTENER_CLASS_NAME, ENDPOINT_EVENT_LISTENER_CLASS_NAME }, + new AdapterServiceFactory(), getListenerProperties()); + } + + public void stop() { + if (factoryRegistration != null) { + factoryRegistration.unregister(); + } + if (hookRegistration != null) { + hookRegistration.unregister(); + } + context.removeServiceListener(this); + } + + private Hashtable<String, Object> getListenerProperties() { + // our listener's scope is the combined scope of all known consumers we may dispatch to + Hashtable<String, Object> props = new Hashtable<>(); + props.put(OWN_LISTENER_PROP, "true"); // mark our own listener for exclusion from consumers + String[] scopes = Stream.concat(oldConsumers.stream(), newConsumers.stream()) + .flatMap(s -> StringPlus.normalize(s.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).stream()) + .toArray(String[]::new); + props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, scopes); + return props; + } + + @Override // ServiceListener - track consumers that publish EL/EEL services + @SuppressWarnings("unchecked") + public void serviceChanged(ServiceEvent event) { + ServiceReference<?> sref = event.getServiceReference(); + boolean ignore = sref.getProperty(OWN_LISTENER_PROP) != null; // ignore our own listener + List<String> classes = Arrays.asList( + (String[])event.getServiceReference().getProperty(Constants.OBJECTCLASS)); + boolean el = classes.contains(ENDPOINT_LISTENER_CLASS_NAME); + boolean eel = classes.contains(ENDPOINT_EVENT_LISTENER_CLASS_NAME); + if (event.getType() == ServiceEvent.UNREGISTERING || ignore) + el = eel = false; // remove both + // update our consumer lists with relevant one-interface-only consumers + boolean modified = false; + if (!el) + modified |= oldConsumers.remove(sref); + else if (!eel) // only EL (not EEL) + modified |= oldConsumers.add((ServiceReference<EndpointListener>)sref); + if (!eel) + modified |= newConsumers.remove(sref); + else if (!el) // only EEL (not EL) + modified |= newConsumers.add((ServiceReference<EndpointEventListener>)sref); + // update our own listener's scopes accordingly + if (modified) { + factoryRegistration.setProperties(getListenerProperties()); + } + } + + /** + * Updates the supported endpoint listener types (EE/EEL) that producers + * are looking for, by tracking all registered service listeners + * (via ListenerHook) and checking their filters for our interface types. + * + * @param listeners the updated (added/removed) service listener's info + */ + private void updateProducerNeeds(Collection<ListenerInfo> listeners) { + for (ListenerInfo info : listeners) { + String filter = info.getFilter(); + if (filter != null) { + Collection<String> classes = getObjectClasses(filter); + int types = (classes.contains(ENDPOINT_LISTENER_CLASS_NAME) ? ENDPOINT_LISTENER : 0) + | (classes.contains(ENDPOINT_EVENT_LISTENER_CLASS_NAME) ? ENDPOINT_EVENT_LISTENER : 0); + if (types != 0) { + long bundleId = info.getBundleContext().getBundle().getBundleId(); + boolean remove = info.isRemoved(); + producers.compute(bundleId, (id, t) -> { + t = t == null ? 0 : t; + if (remove) + t &= ~types; // remove types + else + t |= types; // add types + return t == 0 ? null : t; // remove from map if no types + }); + } + } + } + } + + @Override // ListenerHook - track what listeners potential producers are looking for + public void added(Collection<ListenerInfo> listeners) { + // note: if a listener is registered again but with a different filter, + // the old one is automatically removed first before the new one is added + // also, the ListenerInfo instance passed to add and remove for the same + // listener service will be equal (if we want to keep closer track of them) + updateProducerNeeds(listeners); + } + + @Override // ListenerHook - track what listeners potential producers are looking for + public void removed(Collection<ListenerInfo> listeners) { + updateProducerNeeds(listeners); + } + + /** + * Dispatches an endpoint event from a new producer to all old consumers that match it. + * + * @param event the event to dispatch + */ + private void dispatchToOld(EndpointEvent event) { + EndpointDescription endpoint = event.getEndpoint(); + oldConsumers.forEach(sref -> { + String filter = match(sref, endpoint); + if (filter != null) { + EndpointListener listener = context.getService(sref); + if (listener != null) { + try { + switch (event.getType()) { + case EndpointEvent.ADDED: + listener.endpointAdded(endpoint, filter); + break; + case EndpointEvent.REMOVED: + listener.endpointRemoved(endpoint, filter); + break; + case EndpointEvent.MODIFIED: + case EndpointEvent.MODIFIED_ENDMATCH: + listener.endpointRemoved(endpoint, filter); + listener.endpointAdded(endpoint, filter); + break; + } + } finally { + context.ungetService(sref); + } + } + } + }); + } + + /** + * Dispatches an endpoint event from an old producer to all new consumers that match it. + * + * @param event the event to dispatch + */ + private void dispatchToNew(EndpointEvent event) { + newConsumers.forEach(sref -> { + String filter = match(sref, event.getEndpoint()); + if (filter != null) { + EndpointEventListener listener = context.getService(sref); + if (listener != null) { + try { + listener.endpointChanged(event, filter); + } finally { + context.ungetService(sref); + } + } + } + }); + } +}
