This is an automated email from the ASF dual-hosted git repository. amichair pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
commit 5d51c4c60e5e51e5f8ad2a84aef40c8482a5f90b Author: Amichai Rothman <[email protected]> AuthorDate: Fri Mar 20 00:34:03 2026 +0200 ARIES-2202 Add TCP Discovery Provider --- discovery/pom.xml | 7 +- discovery/tcp/Readme.md | 38 ++++ discovery/tcp/pom.xml | 62 ++++++ .../apache/aries/rsa/discovery/tcp/Interest.java | 65 ++++++ .../aries/rsa/discovery/tcp/InterestManager.java | 88 ++++++++ .../aries/rsa/discovery/tcp/TcpConnection.java | 121 +++++++++++ .../rsa/discovery/tcp/TcpConnectionManager.java | 242 +++++++++++++++++++++ .../aries/rsa/discovery/tcp/TcpDiscovery.java | 205 +++++++++++++++++ .../apache/aries/rsa/discovery/tcp/TcpMessage.java | 82 +++++++ features/src/main/resources/features.xml | 5 + itests/felix/pom.xml | 5 + .../apache/aries/rsa/itests/felix/RsaTestBase.java | 12 + 12 files changed, 929 insertions(+), 3 deletions(-) diff --git a/discovery/pom.xml b/discovery/pom.xml index 31a7de5e..8a5e642d 100644 --- a/discovery/pom.xml +++ b/discovery/pom.xml @@ -32,10 +32,11 @@ <name>Aries Remote Service Admin Discovery</name> <modules> - <module>local</module> - <module>zookeeper</module> - <module>config</module> <module>command</module> + <module>config</module> + <module>local</module> <module>mdns</module> + <module>tcp</module> + <module>zookeeper</module> </modules> </project> diff --git a/discovery/tcp/Readme.md b/discovery/tcp/Readme.md new file mode 100644 index 00000000..96f220b7 --- /dev/null +++ b/discovery/tcp/Readme.md @@ -0,0 +1,38 @@ +# TCP Discovery + +Discovers remote endpoints using direct TCP connections between all peers. +This is useful for small clusters of local peers such as a few hosts on a LAN +or even a few framework instances on the same host (e.g. two-container intergration tests). + +* Every peer opens a server socket on its configured bind address and port. +* Every peer is configured with a fixed list of peer addresses. +* Every peer opens a TCP connection to every other known peer (one per pair). +* Discovery messages are sent directly between the peers. +* All messages are serialized using Java serialization, so there are no external dependencies. +* Broken connections are retried in a loop with a configurable delay. +* An optional basic gossip protocol is supported: every connection handshake includes + the addresses of all peers known to each side, and new connections are opened to any previously + unknown ones. The configured addresses are thus considered 'seed' addresses to join the peer group. + +## Discovery Configuration + +PID: org.apache.aries.rsa.discovery.tcp + +| Key | Default | Description | +|----------------|-----------------------------|------------------------------------------------------------------------------------| +| address | localhost:7667 | adress and port to publish to peers<br/>If port is not specified, default is used. | +| bindAddress | 0.0.0.0<br>(all interfaces) | The address to bind the server socket to | +| peers | | Comma-separated list of peer addresses (host:port) | +| reconnectDelay | 5000 | Delay (in millis) between failed connection retries | +| gossip | true | Enable the basic gossip protocol | + +In order to facilitate configuration in test environments, configuration properties that +are not defined via Configuration Admin (with the above PID) fall back to framework +properties, and then to system properties. These framework/system properties are specified using the PID +as the property name prefix. + +For example, in an integration test setup with two frameworks, each instance can +specify its own distinct port using a property such as +`org.apache.aries.rsa.discovery.tcp.address=localhost:7668` +and specify each other as peers using +`org.apache.aries.rsa.discovery.tcp.peers=localhost:7669`. diff --git a/discovery/tcp/pom.xml b/discovery/tcp/pom.xml new file mode 100644 index 00000000..149bb9c4 --- /dev/null +++ b/discovery/tcp/pom.xml @@ -0,0 +1,62 @@ +<?xml version='1.0' encoding='UTF-8' ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.aries.rsa</groupId> + <artifactId>org.apache.aries.rsa.parent</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../parent/pom.xml</relativePath> + </parent> + + <groupId>org.apache.aries.rsa.discovery</groupId> + <artifactId>org.apache.aries.rsa.discovery.tcp</artifactId> + <packaging>jar</packaging> + <name>Aries Remote Service Admin Discovery TCP</name> + + <properties> + <topDirectoryLocation>../..</topDirectoryLocation> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.aries.rsa</groupId> + <artifactId>org.apache.aries.rsa.spi</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>5.18.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.service.remoteserviceadmin</artifactId> + <version>1.1.0</version> + <scope>compile</scope> + </dependency> + </dependencies> + +</project> diff --git a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/Interest.java b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/Interest.java new file mode 100644 index 00000000..e96fa09e --- /dev/null +++ b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/Interest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.tcp; + +import org.apache.aries.rsa.util.StringPlus; +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.osgi.service.remoteserviceadmin.EndpointEventListener.ENDPOINT_LISTENER_SCOPE; + +/** + * An interest is a combination of an {@link EndpointEventListener} and its + * published scope (i.e. the filters defining what endpoints it is interested in). + * <p> + * The {@code Interest} class acts as a gatekeeper for an {@code EndpointEventListener} - + * it keeps track of its scopes, and when notified of endpoint events, + * it forwards to the listener only those that match what it is interested in. + */ +public class Interest { + private static final Logger LOG = LoggerFactory.getLogger(Interest.class); + + private final List<String> scopes; + private final EndpointEventListener listener; + + public Interest(ServiceReference<?> sref, EndpointEventListener listener) { + this.scopes = StringPlus.normalize(sref.getProperty(ENDPOINT_LISTENER_SCOPE)); + this.listener = listener; + } + + public void notifyListener(EndpointEvent event) { + EndpointDescription endpoint = event.getEndpoint(); + scopes.stream().filter(endpoint::matches).findFirst().ifPresent(scope -> { // notify with first scope + LOG.info("Calling endpointChanged on {} for filter {}, type {}, endpoint {}", + listener, scope, event.getType(), endpoint); + listener.endpointChanged(event, scope); + }); + } + + @Override + public String toString() { + return "Interest [scopes=" + scopes + ", listener=" + listener.getClass() + "]"; + } +} diff --git a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/InterestManager.java b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/InterestManager.java new file mode 100644 index 00000000..4f9c2829 --- /dev/null +++ b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/InterestManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.tcp; + +import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.osgi.service.remoteserviceadmin.EndpointEvent.*; + +/** + * Manages bookkeeping of all known local {@link EndpointEventListener}s along with + * their interests (scopes), as well as all known remote endpoints, and notifies + * the former about the latter. + */ +public class InterestManager { + // listener to its interest + private final Map<ServiceReference<EndpointEventListener>, Interest> interests = new ConcurrentHashMap<>(); + // peer framework UUID to endpointId to endpoint + private final Map<String , Map<String, EndpointDescription>> remoteEndpoints = new ConcurrentHashMap<>(); + + public void addListener(ServiceReference<EndpointEventListener> sref, + EndpointEventListener listener, boolean isNew) { + // a new listener must be notified immediately of all previously known remote endpoints. + // an existing listener is not notified, but we do need to update its scopes (interest) + Interest interest = new Interest(sref, listener); + interests.put(sref, interest); // if it already exists - replace it with the new interest (scopes) + if (isNew) { + // notify new listener of all known remote endpoints + remoteEndpoints.values().stream() + .flatMap(endpoints -> endpoints.values().stream()) + .forEach(endpoint -> interest.notifyListener(new EndpointEvent(ADDED, endpoint))); + } + } + + public void removeListener(ServiceReference<EndpointEventListener> sref) { + interests.remove(sref); + } + + private void notifyAllListeners(int type, EndpointDescription endpoint) { + EndpointEvent event = new EndpointEvent(type, endpoint); + interests.values().forEach(interest -> interest.notifyListener(event)); + } + + public void addEndpoint(EndpointDescription endpoint) { + Map<String, EndpointDescription> endpoints = remoteEndpoints + .computeIfAbsent(endpoint.getFrameworkUUID(), s -> new ConcurrentHashMap<>()); + boolean exists = endpoints.put(endpoint.getId(), endpoint) != null; + notifyAllListeners(exists ? MODIFIED : ADDED, endpoint); + } + + public void removeEndpoint(String peerUuid, String endpointId) { + Map<String, EndpointDescription> endpoints = remoteEndpoints.get(peerUuid); + if (endpoints != null) { + EndpointDescription endpoint = endpoints.remove(endpointId); + if (endpoint != null) { + notifyAllListeners(REMOVED, endpoint); + } + } + } + + public void removePeer(String peerUuid) { + Map<String, EndpointDescription> endpoints = remoteEndpoints.remove(peerUuid); + if (endpoints != null) { + endpoints.values().forEach(endpoint -> notifyAllListeners(REMOVED, endpoint)); + } + } +} diff --git a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnection.java b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnection.java new file mode 100644 index 00000000..6a0e23e3 --- /dev/null +++ b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnection.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.tcp; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.Socket; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +/** + * A TCP connection between two TCP discovery peers. + * <p> + * This class handles the low-level network reading/writing/serialization, + * while leaving the higher-level logic (including received message handling + * and connection close handling) to the {@link TcpConnectionManager). + */ +public class TcpConnection { + private static final Logger LOG = LoggerFactory.getLogger(TcpConnection.class); + + private final Socket socket; + private final ObjectInputStream in; + private final ObjectOutputStream out; + private final Thread readThread; + private final boolean isOutbound; + + private final BiConsumer<TcpConnection, TcpMessage> onMessage; + private final Consumer<TcpConnection> onClose; + + private volatile String peerAddress; // the peer host:port, for incoming connections it is set only after handshake + private volatile String peerUuid; // set after handshake + + public TcpConnection(Socket socket, String peerAddress, + BiConsumer<TcpConnection, TcpMessage> onMessage, Consumer<TcpConnection> onClose) throws IOException { + this.socket = socket; + this.isOutbound = peerAddress != null; // inbound only gets peerAddress after handshake + this.peerAddress = peerAddress; + this.onMessage = onMessage; + this.onClose = onClose; + this.out = new ObjectOutputStream(socket.getOutputStream()); // output must be initialized before input + this.out.flush(); // so we don't deadlock on reading object stream header + this.in = new ObjectInputStream(socket.getInputStream()); + readThread = new Thread(null, this::readLoop, getClass().getSimpleName() + "-Reader-" + this); + readThread.start(); + } + + public boolean isOutbound() { + return isOutbound; + } + + public String getPeerAddress() { + return peerAddress; + } + + public void setPeerAddress(String address) { + this.peerAddress = address; + } + + public String getPeerUuid() { + return peerUuid; + } + + public void setPeerUuid(String peerUuid) { + this.peerUuid = peerUuid; + } + + public void send(TcpMessage message) { + try { + synchronized (out) { + out.writeObject(message); + out.flush(); + } + } catch (IOException ioe) { + LOG.error("Error sending TCP message", ioe); + close(); + } + } + + public void close() { + try { + socket.close(); // read thread will get SocketException, fire onClose and die + } catch (IOException ioe) { + LOG.error("Error closing socket", ioe); + } + } + + private void readLoop() { + while (true) { + try { + TcpMessage message = (TcpMessage)in.readObject(); + onMessage.accept(this, message); + } catch (Throwable t) { + if (!(t instanceof IOException)) { + LOG.error("Unexpected error in read loop", t); + } + onClose.accept(this); // invoked only here, when the socket dies for any reason + return; // thread's dead + } + } + } +} diff --git a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnectionManager.java b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnectionManager.java new file mode 100644 index 00000000..8b635327 --- /dev/null +++ b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnectionManager.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.tcp; + +import org.apache.aries.rsa.discovery.tcp.TcpMessage.*; +import org.osgi.service.remoteserviceadmin.EndpointDescription; +import org.osgi.service.remoteserviceadmin.EndpointEvent; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.aries.rsa.discovery.tcp.TcpDiscovery.toURI; + +/** + * Manages all TCP connections for this provider, as well as + * the higher-level TCP discovery protocol logic. + * <p> + * This includes accepting incoming connections on a server socket, + * initiating outgoing connections to configured (or discovered) peers, + * connection retry logic after unexpected disconnection, handling + * incoming messages from connections and handling gossip-discovered peers. + * <p> + * In addition to the protocol-level functionality, it keeps track of + * all known local endpoints and notifies all remote peers about them. + */ +public class TcpConnectionManager implements EndpointEventListener { + private static final Logger LOG = LoggerFactory.getLogger(TcpConnectionManager.class); + + private final InterestManager interestManager; + private final String localAddress; + private final String localUuid; + private final long reconnectDelay; + private final boolean gossip; + + private final ExecutorService executor = Executors.newCachedThreadPool(); // for connect/accept threads + private final Set<TcpConnection> connections = ConcurrentHashMap.newKeySet(); // all connections, including before handshake + private final Map<String, TcpConnection> connectionsByUuid = new ConcurrentHashMap<>(); // connections after handshake (known uuid) + private final Map<String, EndpointDescription> localEndpoints = new ConcurrentHashMap<>(); + private final Set<String> peers = ConcurrentHashMap.newKeySet(); // all configured and discovered (gossip) peer addresses + + private ServerSocket serverSocket; + private volatile boolean closing; + + public TcpConnectionManager(InterestManager interestManager, String localAddress, + String localUuid, long reconnectDelay, boolean gossip) { + this.interestManager = interestManager; + this.localAddress = localAddress; + this.localUuid = localUuid; + this.reconnectDelay = reconnectDelay; + this.gossip = gossip; + } + + public void open(String bindAddress, int port, Collection<String> peers) throws IOException { + serverSocket = new ServerSocket(); + serverSocket.setReuseAddress(true); + serverSocket.bind(new InetSocketAddress(bindAddress, port)); + executor.submit(this::acceptLoop); + addPeers(peers); + } + + public void close() throws IOException { + closing = true; + serverSocket.close(); // acceptLoop will get SocketException + connections.forEach(TcpConnection::close); + executor.shutdownNow(); + } + + private void addPeers(Collection<String> peers) { + peers.stream() + .filter(peer -> !this.peers.contains(peer)) // only new ones + .filter(peer -> !localAddress.equals(peer)) // exclude ourself + .forEach(peer -> { + LOG.info("Adding peer {}", peer); + this.peers.add(peer); + executor.submit(() -> connectLoop(peer)); + }); + } + + private void acceptLoop() { + while (true) { + try { + Socket socket = serverSocket.accept(); + executor.submit(() -> { + try { + onConnected(socket, null); + } catch (IOException ioe) { + LOG.error("error initializing connection on accepted socket", ioe); + } + }); + } catch (IOException ioe) { + return; // socket closed + } catch (Throwable t) { + LOG.error("Unexpected error in accept loop - shutting down", t); + return; + } + } + } + + private void connectLoop(String address) { + URI uri = toURI(address); + while (!closing) { + try { + Socket socket = new Socket(uri.getHost(), uri.getPort()); + onConnected(socket, address); + return; // connection established; onConnectionClosed will restart this loop if needed + } catch (IOException ioe) { + LOG.debug("error connecting to {}, will retry soon", uri, ioe); + try { + Thread.sleep(reconnectDelay); + } catch (InterruptedException ie) { + // end loop (shutting down) + return; + } + } catch (Throwable t) { + LOG.error("Unexpected error in connect loop - aborting connection to {}", uri, t); + return; + } + } + } + + private void onConnected(Socket socket, String address) throws IOException { + try { + TcpConnection conn = new TcpConnection(socket, address, this::onMessage, this::onClosed); + connections.add(conn); + conn.send(new HandshakeMessage(localUuid, localAddress, new ArrayList<>(peers))); + // don't send known endpoints yet, only after receiving handshake + } catch (IOException ioe) { + try { + socket.close(); + } catch (IOException ioe2) { + ioe.addSuppressed(ioe2); + } + throw ioe; + } + } + + public void onClosed(TcpConnection conn) { + connections.remove(conn); + String peerUuid = conn.getPeerUuid(); + if (peerUuid != null) { // passed the handshake + boolean removed = connectionsByUuid.remove(peerUuid, conn); // remove if this is the active connection + if (removed && !closing) { + interestManager.removePeer(peerUuid); // no active connections with peer + } + } + // re-start the connect retry thread if necessary: + // only if we're the outbound peer, and not in the process of shutting down, + // and there isn't already an active (reverse?) connection with the same peer, + // or we don't know who the peer is yet (before handshake) + if (conn.isOutbound() && !closing && (peerUuid == null || !connectionsByUuid.containsKey(peerUuid))) { + executor.submit(() -> connectLoop(conn.getPeerAddress())); + } + } + + private void onMessage(TcpConnection conn, TcpMessage message) { + if (message instanceof HandshakeMessage) { + HandshakeMessage h = (HandshakeMessage) message; + // update the peer data + conn.setPeerAddress(h.getAddress()); + conn.setPeerUuid(h.getUuid()); + // if gossip is enabled, try adding all of this peer's peers, + // as well as the peer itself (in case it found us via its + // own gossip, and we haven't met before) + if (gossip) { + addPeers(h.getPeers()); + addPeers(Collections.singleton(h.getAddress())); + } + // if we already have another connection with this peer (e.g. reverse direction) + // then we keep the old one (which is already in use) and close the new one + boolean existing = connectionsByUuid.putIfAbsent(h.getUuid(), conn) != null; + if (existing) { + // both sides can check if a connection between them already exists, but + // there is a possible race condition where one peer receives the handshake + // and closes the connection before it even had a chance to send its handshake - + // so the other peer will never know the connection is intentionally closed + // and not experiencing connection errors. If it is the outbound side, it will + // keep retrying to connect. to solve this, only the outbound peer is the one + // that closes the connection. The inbound side just doesn't use it until then. + if (conn.isOutbound()) { + conn.close(); + } + return; + } + // send all of our known local endpoints to the new peer + localEndpoints.values().forEach(endpoint -> conn.send(new UpdateMessage(endpoint.getProperties()))); + } else if (message instanceof UpdateMessage) { + UpdateMessage u = (UpdateMessage) message; + EndpointDescription endpoint = new EndpointDescription(u.getProperties()); + interestManager.addEndpoint(endpoint); + } else if (message instanceof RemoveMessage) { + RemoveMessage r = (RemoveMessage) message; + interestManager.removeEndpoint(conn.getPeerUuid(), r.getEndpointId()); + } else { + throw new IllegalArgumentException("unsupported message type: " + message); + } + } + + @Override + public void endpointChanged(EndpointEvent event, String filter) { + // notify all peers of the endpoint event + TcpMessage message; + EndpointDescription endpoint = event.getEndpoint(); + String endpointId = endpoint.getId(); + switch (event.getType()) { + case EndpointEvent.ADDED: + case EndpointEvent.MODIFIED: + localEndpoints.put(endpointId, endpoint); + message = new UpdateMessage(endpoint.getProperties()); + break; + case EndpointEvent.MODIFIED_ENDMATCH: + case EndpointEvent.REMOVED: + localEndpoints.remove(endpointId); + message = new RemoveMessage(endpointId); + break; + default: throw new RuntimeException("Unknown event type: " + event.getType()); + } + connectionsByUuid.values().forEach(c -> c.send(message)); + } +} diff --git a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpDiscovery.java b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpDiscovery.java new file mode 100644 index 00000000..9efdd7f9 --- /dev/null +++ b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpDiscovery.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.tcp; + +import org.apache.aries.rsa.annotations.RSADiscoveryProvider; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.remoteserviceadmin.EndpointEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.osgi.framework.Constants.FRAMEWORK_UUID; +import static org.osgi.service.component.annotations.ReferenceCardinality.MULTIPLE; +import static org.osgi.service.component.annotations.ReferencePolicy.DYNAMIC; +import static org.osgi.service.remoteserviceadmin.EndpointEventListener.ENDPOINT_LISTENER_SCOPE; +import static org.osgi.service.remoteserviceadmin.RemoteConstants.ENDPOINT_FRAMEWORK_UUID; + +/** + * The main TCP Discovery provider component. + * <p> + * It initializes the provider using config admin configuration, + * initializes the {@link InterestManager} and {@link TcpConnectionManager}, + * registers an {@link EndpointEventListener} to track locally exported + * endpoints, and listens for registrations of other + * EndpointEventListeners which are managed by the InterestManager. + */ +@RSADiscoveryProvider(protocols = "aries.tcp") +@Component(immediate = true, configurationPid = TcpDiscovery.DISCOVERY_TCP_PID) +public class TcpDiscovery { + private static final Logger LOG = LoggerFactory.getLogger(TcpDiscovery.class); + public static final String DISCOVERY_TCP_PID = "org.apache.aries.rsa.discovery.tcp"; + private static final String OWN_LISTENER_PROP = "aries.discovery.tcp"; + public static final int DEFAULT_PORT = 7667; + + @interface Config { + String address() default "localhost:" + DEFAULT_PORT; + String bindAddress() default "0.0.0.0"; + String[] peers() default {}; + long reconnectDelay() default 5000; + boolean gossip() default true; + } + + private InterestManager interestManager; + private TcpConnectionManager connectionManager; + private ServiceRegistration<?> listenerRegistration; + + public TcpDiscovery() { + // initialize in constructor before we start getting reference bind events + interestManager = new InterestManager(); + } + + public static URI toURI(String address) { + try { + URI uri = new URI("tcp://" + address); + if (uri.getPort() == -1) { + uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), + DEFAULT_PORT, uri.getPath(), uri.getQuery(), uri.getFragment()); + } + return uri; + } catch (URISyntaxException urise) { + LOG.error("failed to parse address " + address, urise); + throw new RuntimeException(urise); + } + } + + // merge config from ConfigAdmin, framework properties and system properties + @SuppressWarnings("unchecked") + private <T extends Annotation> T mergeConfig(BundleContext context, String prefix, T config) { + Class<T> cls = (Class<T>)config.annotationType(); + return (T)Proxy.newProxyInstance(cls.getClassLoader(), new Class[] { cls }, + (proxy, method, args) -> { + Object value = method.invoke(config, args); + Object defaultValue = method.getDefaultValue(); + if (method.getDeclaringClass() != Object.class && Objects.deepEquals(value, defaultValue)) { + String prop = prefix + method.getName(); + value = context.getProperty(prop); + if (value == null) { + value = System.getProperty(prop); + } + if (value == null) { + value = defaultValue; + } else if (method.getReturnType() == Boolean.TYPE) { + value = Boolean.valueOf(value.toString()); + } else if (method.getReturnType() == Integer.TYPE) { + value = Integer.valueOf(value.toString()); + } else if (method.getReturnType() == Long.TYPE) { + value = Long.valueOf(value.toString()); + } else if (method.getReturnType() == String[].class) { + value = ((String)value).split("\\s*,\\s*"); + } + } else if (method.getDeclaringClass() == Object.class && method.getName().equals("toString")) { + // add a nice toString that shows all merged config names and values (including arrays) + value = Arrays.stream(cls.getMethods()) + .filter(m -> !m.getDeclaringClass().equals(Annotation.class)) // not Object.class! + .collect(Collectors.<Method, String, Object>toMap(Method::getName, m -> { + try { + Object v = m.invoke(proxy); + return (v == null || v instanceof Object[]) ? Arrays.toString((Object[])v) : v.toString(); + } catch (Exception ignore) { + return "<ERROR>"; + } + })).toString(); + } + return value; + }); + } + + private void initConnectionManager(Config config, String uuid) throws IOException { + String address = config.address(); + String bindAddress = config.bindAddress(); + String[] peers = config.peers(); + if (address == null || address.isEmpty() || address.equals("0.0.0.0")) { + throw new IllegalArgumentException("invalid address: " + address); + } + if (peers.length == 0) { + // no peers - may be legitimate in a server with incoming connections + // only, or for single-host dev/itest systems where the bundle should + // start successfully even if it's not doing anything too useful + LOG.info("no peers configured - will wait for incoming connections"); + } + URI uri = toURI(address); + bindAddress = bindAddress == null ? uri.getHost() : toURI(bindAddress).getHost(); // just the host + connectionManager = new TcpConnectionManager( + interestManager, address, uuid, config.reconnectDelay(), config.gossip()); + connectionManager.open(bindAddress, uri.getPort(), Arrays.asList(peers)); + } + + private void registerListener(BundleContext context, String uuid) { + Dictionary<String, Object> props = new Hashtable<>(); + props.put(OWN_LISTENER_PROP, Boolean.TRUE); // mark our own listener for exclusion + String scope = "(&(objectClass=*)(" + ENDPOINT_FRAMEWORK_UUID + "=" + uuid + "))"; + props.put(ENDPOINT_LISTENER_SCOPE, scope); + listenerRegistration = context.registerService(EndpointEventListener.class, connectionManager, props); + } + + @Activate + void start(BundleContext context, Config config) { + String uuid = context.getProperty(FRAMEWORK_UUID); + config = mergeConfig(context, getClass().getPackageName() + ".", config); + LOG.info("Starting TCP discovery for framework {} with config {}", uuid, config); + try { + initConnectionManager(config, uuid); + // register ourselves to capture local endpoint exports + registerListener(context, uuid); + } catch (IOException ioe) { + LOG.error("failed to start TCP connection manager", ioe); + } + } + + @Deactivate + void stop() throws IOException { + if (listenerRegistration != null) { + listenerRegistration.unregister(); + } + if (connectionManager != null) { + connectionManager.close(); + } + } + + @Reference(cardinality = MULTIPLE, policy = DYNAMIC, target = "(!(" + OWN_LISTENER_PROP + "=*))") + void bindEndpointEventListener(ServiceReference<EndpointEventListener> sref, EndpointEventListener listener) { + interestManager.addListener(sref, listener, true); + } + + void updatedEndpointEventListener(ServiceReference<EndpointEventListener> sref, EndpointEventListener listener) { + interestManager.addListener(sref, listener, false); + } + + void unbindEndpointEventListener(ServiceReference<EndpointEventListener> sref) { + interestManager.removeListener(sref); + } +} diff --git a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpMessage.java b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpMessage.java new file mode 100644 index 00000000..093a941b --- /dev/null +++ b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpMessage.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.discovery.tcp; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A message sent on the TCP connection. + */ +public class TcpMessage implements Serializable { + + public static class HandshakeMessage extends TcpMessage { + + private final String uuid; + private final String address; + private final List<String> peers; + + public HandshakeMessage(String uuid, String address, List<String> peers) { + this.uuid = uuid; + this.address = address; + this.peers = peers; + } + + public String getUuid() { + return uuid; + } + + public String getAddress() { + return address; + } + + public List<String> getPeers() { + return peers; + } + } + + // there is no distinction between add and update, since it's + // not the sender's job to keep track of what each peer knows + public static class UpdateMessage extends TcpMessage { + + private final Map<String, Object> properties; + + public UpdateMessage(Map<String, Object> properties) { + this.properties = properties; + } + + public Map<String, Object> getProperties() { + return properties; + } + } + + public static class RemoveMessage extends TcpMessage { + + private final String endpointId; + + public RemoveMessage(String endpointId) { + this.endpointId = endpointId; + } + + public String getEndpointId() { + return endpointId; + } + } +} diff --git a/features/src/main/resources/features.xml b/features/src/main/resources/features.xml index d73f643f..b2abff4d 100644 --- a/features/src/main/resources/features.xml +++ b/features/src/main/resources/features.xml @@ -51,6 +51,11 @@ <bundle>mvn:org.apache.aries.rsa.discovery/org.apache.aries.rsa.discovery.config/${project.version}</bundle> </feature> + <feature name="aries-rsa-discovery-tcp" version="${project.version}"> + <feature>aries-rsa-core</feature> + <bundle>mvn:org.apache.aries.rsa.discovery/org.apache.aries.rsa.discovery.tcp/${project.version}</bundle> + </feature> + <feature name="aries-rsa-discovery-zookeeper" version="${project.version}"> <feature>aries-rsa-core</feature> <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.zookeeper/${zookeeper.version}</bundle> diff --git a/itests/felix/pom.xml b/itests/felix/pom.xml index 0bc28dbb..df3db818 100644 --- a/itests/felix/pom.xml +++ b/itests/felix/pom.xml @@ -144,6 +144,11 @@ <artifactId>org.apache.aries.rsa.discovery.zookeeper</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.aries.rsa.discovery</groupId> + <artifactId>org.apache.aries.rsa.discovery.tcp</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java index e5003f78..360e4432 100644 --- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java +++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java @@ -179,6 +179,11 @@ public class RsaTestBase { mvn("org.apache.aries.rsa.discovery", "org.apache.aries.rsa.discovery.zookeeper")); } + protected static Option rsaDiscoveryTcp() { + return composite( + mvn("org.apache.aries.rsa.discovery", "org.apache.aries.rsa.discovery.tcp")); + } + protected static Option rsaProviderTcp() { return mvn("org.apache.aries.rsa.provider", "org.apache.aries.rsa.provider.tcp"); } @@ -189,6 +194,13 @@ public class RsaTestBase { mvn("org.apache.aries.rsa.provider", "org.apache.aries.rsa.provider.fastbin")); } + protected static Option configTcpDiscovery(int instance, int peerInstance) { + return newConfiguration("org.apache.aries.rsa.discovery.tcp") // + .put("address", "127.0.0.1:" + (7667 + instance)) // + .put("peers", "127.0.0.1:" + (7667 + peerInstance)) // + .asOption(); + } + protected static Option configZKDiscovery() { return newConfiguration("org.apache.aries.rsa.discovery.zookeeper") // .put("zookeeper.host", "127.0.0.1") //
