This is an automated email from the ASF dual-hosted git repository.

amichair pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-rsa.git

commit 5d51c4c60e5e51e5f8ad2a84aef40c8482a5f90b
Author: Amichai Rothman <[email protected]>
AuthorDate: Fri Mar 20 00:34:03 2026 +0200

    ARIES-2202 Add TCP Discovery Provider
---
 discovery/pom.xml                                  |   7 +-
 discovery/tcp/Readme.md                            |  38 ++++
 discovery/tcp/pom.xml                              |  62 ++++++
 .../apache/aries/rsa/discovery/tcp/Interest.java   |  65 ++++++
 .../aries/rsa/discovery/tcp/InterestManager.java   |  88 ++++++++
 .../aries/rsa/discovery/tcp/TcpConnection.java     | 121 +++++++++++
 .../rsa/discovery/tcp/TcpConnectionManager.java    | 242 +++++++++++++++++++++
 .../aries/rsa/discovery/tcp/TcpDiscovery.java      | 205 +++++++++++++++++
 .../apache/aries/rsa/discovery/tcp/TcpMessage.java |  82 +++++++
 features/src/main/resources/features.xml           |   5 +
 itests/felix/pom.xml                               |   5 +
 .../apache/aries/rsa/itests/felix/RsaTestBase.java |  12 +
 12 files changed, 929 insertions(+), 3 deletions(-)

diff --git a/discovery/pom.xml b/discovery/pom.xml
index 31a7de5e..8a5e642d 100644
--- a/discovery/pom.xml
+++ b/discovery/pom.xml
@@ -32,10 +32,11 @@
     <name>Aries Remote Service Admin Discovery</name>
 
     <modules>
-      <module>local</module>
-      <module>zookeeper</module>
-      <module>config</module>
       <module>command</module>
+      <module>config</module>
+      <module>local</module>
       <module>mdns</module>
+      <module>tcp</module>
+      <module>zookeeper</module>
     </modules>
 </project>
diff --git a/discovery/tcp/Readme.md b/discovery/tcp/Readme.md
new file mode 100644
index 00000000..96f220b7
--- /dev/null
+++ b/discovery/tcp/Readme.md
@@ -0,0 +1,38 @@
+# TCP Discovery
+
+Discovers remote endpoints using direct TCP connections between all peers.
+This is useful for small clusters of local peers such as a few hosts on a LAN
+or even a few framework instances on the same host (e.g. two-container 
intergration tests).
+
+* Every peer opens a server socket on its configured bind address and port.
+* Every peer is configured with a fixed list of peer addresses.
+* Every peer opens a TCP connection to every other known peer (one per pair).
+* Discovery messages are sent directly between the peers.
+* All messages are serialized using Java serialization, so there are no 
external dependencies.
+* Broken connections are retried in a loop with a configurable delay.
+* An optional basic gossip protocol is supported: every connection handshake 
includes
+  the addresses of all peers known to each side, and new connections are 
opened to any previously
+  unknown ones. The configured addresses are thus considered 'seed' addresses 
to join the peer group.
+
+## Discovery Configuration
+
+PID: org.apache.aries.rsa.discovery.tcp
+
+| Key            | Default                     | Description                   
                                                     |
+|----------------|-----------------------------|------------------------------------------------------------------------------------|
+| address        | localhost:7667              | adress and port to publish to 
peers<br/>If port is not specified, default is used. |
+| bindAddress    | 0.0.0.0<br>(all interfaces) | The address to bind the 
server socket to                                           |
+| peers          |                             | Comma-separated list of peer 
addresses (host:port)                                 |
+| reconnectDelay | 5000                        | Delay (in millis) between 
failed connection retries                                |
+| gossip         | true                        | Enable the basic gossip 
protocol                                                   |
+
+In order to facilitate configuration in test environments, configuration 
properties that
+are not defined via Configuration Admin (with the above PID) fall back to 
framework
+properties, and then to system properties. These framework/system properties 
are specified using the PID
+as the property name prefix.
+
+For example, in an integration test setup with two frameworks, each instance 
can
+specify its own distinct port using a property such as
+`org.apache.aries.rsa.discovery.tcp.address=localhost:7668`
+and specify each other as peers using
+`org.apache.aries.rsa.discovery.tcp.peers=localhost:7669`.
diff --git a/discovery/tcp/pom.xml b/discovery/tcp/pom.xml
new file mode 100644
index 00000000..149bb9c4
--- /dev/null
+++ b/discovery/tcp/pom.xml
@@ -0,0 +1,62 @@
+<?xml version='1.0' encoding='UTF-8' ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.aries.rsa</groupId>
+        <artifactId>org.apache.aries.rsa.parent</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../parent/pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.aries.rsa.discovery</groupId>
+    <artifactId>org.apache.aries.rsa.discovery.tcp</artifactId>
+    <packaging>jar</packaging>
+    <name>Aries Remote Service Admin Discovery TCP</name>
+
+    <properties>
+        <topDirectoryLocation>../..</topDirectoryLocation>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.aries.rsa</groupId>
+            <artifactId>org.apache.aries.rsa.spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>5.18.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.remoteserviceadmin</artifactId>
+            <version>1.1.0</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/Interest.java 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/Interest.java
new file mode 100644
index 00000000..e96fa09e
--- /dev/null
+++ 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/Interest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.tcp;
+
+import org.apache.aries.rsa.util.StringPlus;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static 
org.osgi.service.remoteserviceadmin.EndpointEventListener.ENDPOINT_LISTENER_SCOPE;
+
+/**
+ * An interest is a combination of an {@link EndpointEventListener} and its
+ * published scope (i.e. the filters defining what endpoints it is interested 
in).
+ * <p>
+ * The {@code Interest} class acts as a gatekeeper for an {@code 
EndpointEventListener} -
+ * it keeps track of its scopes, and when notified of endpoint events,
+ * it forwards to the listener only those that match what it is interested in.
+ */
+public class Interest {
+    private static final Logger LOG = LoggerFactory.getLogger(Interest.class);
+
+    private final List<String> scopes;
+    private final EndpointEventListener listener;
+
+    public Interest(ServiceReference<?> sref, EndpointEventListener listener) {
+        this.scopes = 
StringPlus.normalize(sref.getProperty(ENDPOINT_LISTENER_SCOPE));
+        this.listener = listener;
+    }
+
+    public void notifyListener(EndpointEvent event) {
+        EndpointDescription endpoint = event.getEndpoint();
+        scopes.stream().filter(endpoint::matches).findFirst().ifPresent(scope 
-> { // notify with first scope
+            LOG.info("Calling endpointChanged on {} for filter {}, type {}, 
endpoint {}",
+                listener, scope, event.getType(), endpoint);
+            listener.endpointChanged(event, scope);
+        });
+    }
+
+    @Override
+    public String toString() {
+        return "Interest [scopes=" + scopes + ", listener=" + 
listener.getClass() + "]";
+    }
+}
diff --git 
a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/InterestManager.java
 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/InterestManager.java
new file mode 100644
index 00000000..4f9c2829
--- /dev/null
+++ 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/InterestManager.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.tcp;
+
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.osgi.service.remoteserviceadmin.EndpointEvent.*;
+
+/**
+ * Manages bookkeeping of all known local {@link EndpointEventListener}s along 
with
+ * their interests (scopes), as well as all known remote endpoints, and 
notifies
+ * the former about the latter.
+ */
+public class InterestManager {
+    // listener to its interest
+    private final Map<ServiceReference<EndpointEventListener>, Interest> 
interests = new ConcurrentHashMap<>();
+    // peer framework UUID to endpointId to endpoint
+    private final Map<String , Map<String, EndpointDescription>> 
remoteEndpoints = new ConcurrentHashMap<>();
+
+    public void addListener(ServiceReference<EndpointEventListener> sref,
+            EndpointEventListener listener, boolean isNew) {
+        // a new listener must be notified immediately of all previously known 
remote endpoints.
+        // an existing listener is not notified, but we do need to update its 
scopes (interest)
+        Interest interest = new Interest(sref, listener);
+        interests.put(sref, interest); // if it already exists - replace it 
with the new interest (scopes)
+        if (isNew) {
+            // notify new listener of all known remote endpoints
+            remoteEndpoints.values().stream()
+                .flatMap(endpoints -> endpoints.values().stream())
+                .forEach(endpoint -> interest.notifyListener(new 
EndpointEvent(ADDED, endpoint)));
+        }
+    }
+
+    public void removeListener(ServiceReference<EndpointEventListener> sref) {
+        interests.remove(sref);
+    }
+
+    private void notifyAllListeners(int type, EndpointDescription endpoint) {
+        EndpointEvent event = new EndpointEvent(type, endpoint);
+        interests.values().forEach(interest -> interest.notifyListener(event));
+    }
+
+    public void addEndpoint(EndpointDescription endpoint) {
+      Map<String, EndpointDescription> endpoints = remoteEndpoints
+          .computeIfAbsent(endpoint.getFrameworkUUID(), s -> new 
ConcurrentHashMap<>());
+        boolean exists = endpoints.put(endpoint.getId(), endpoint) != null;
+        notifyAllListeners(exists ? MODIFIED : ADDED, endpoint);
+    }
+
+    public void removeEndpoint(String peerUuid, String endpointId) {
+        Map<String, EndpointDescription> endpoints = 
remoteEndpoints.get(peerUuid);
+        if (endpoints != null) {
+            EndpointDescription endpoint = endpoints.remove(endpointId);
+            if (endpoint != null) {
+                notifyAllListeners(REMOVED, endpoint);
+            }
+        }
+    }
+
+    public void removePeer(String peerUuid) {
+        Map<String, EndpointDescription> endpoints = 
remoteEndpoints.remove(peerUuid);
+        if (endpoints != null) {
+            endpoints.values().forEach(endpoint -> notifyAllListeners(REMOVED, 
endpoint));
+        }
+    }
+}
diff --git 
a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnection.java
 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnection.java
new file mode 100644
index 00000000..6a0e23e3
--- /dev/null
+++ 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnection.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.tcp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * A TCP connection between two TCP discovery peers.
+ * <p>
+ * This class handles the low-level network reading/writing/serialization,
+ * while leaving the higher-level logic (including received message handling
+ * and connection close handling) to the {@link TcpConnectionManager).
+ */
+public class TcpConnection {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TcpConnection.class);
+
+    private final Socket socket;
+    private final ObjectInputStream in;
+    private final ObjectOutputStream out;
+    private final Thread readThread;
+    private final boolean isOutbound;
+
+    private final BiConsumer<TcpConnection, TcpMessage> onMessage;
+    private final Consumer<TcpConnection> onClose;
+
+    private volatile String peerAddress; // the peer host:port, for incoming 
connections it is set only after handshake
+    private volatile String peerUuid; // set after handshake
+
+    public TcpConnection(Socket socket, String peerAddress,
+             BiConsumer<TcpConnection, TcpMessage> onMessage, 
Consumer<TcpConnection> onClose) throws IOException {
+        this.socket = socket;
+        this.isOutbound = peerAddress != null; // inbound only gets 
peerAddress after handshake
+        this.peerAddress = peerAddress;
+        this.onMessage = onMessage;
+        this.onClose = onClose;
+        this.out = new ObjectOutputStream(socket.getOutputStream()); // output 
must be initialized before input
+        this.out.flush(); // so we don't deadlock on reading object stream 
header
+        this.in = new ObjectInputStream(socket.getInputStream());
+        readThread = new Thread(null, this::readLoop, 
getClass().getSimpleName() + "-Reader-" + this);
+        readThread.start();
+    }
+
+    public boolean isOutbound() {
+        return isOutbound;
+    }
+
+    public String getPeerAddress() {
+        return peerAddress;
+    }
+
+    public void setPeerAddress(String address) {
+        this.peerAddress = address;
+    }
+
+    public String getPeerUuid() {
+        return peerUuid;
+    }
+
+    public void setPeerUuid(String peerUuid) {
+        this.peerUuid = peerUuid;
+    }
+
+    public void send(TcpMessage message) {
+        try {
+            synchronized (out) {
+                out.writeObject(message);
+                out.flush();
+            }
+        } catch (IOException ioe) {
+            LOG.error("Error sending TCP message", ioe);
+            close();
+        }
+    }
+
+    public void close() {
+        try {
+            socket.close(); // read thread will get SocketException, fire 
onClose and die
+        } catch (IOException ioe) {
+            LOG.error("Error closing socket", ioe);
+        }
+    }
+
+    private void readLoop() {
+        while (true) {
+            try {
+                TcpMessage message = (TcpMessage)in.readObject();
+                onMessage.accept(this, message);
+            } catch (Throwable t) {
+                if (!(t instanceof IOException)) {
+                    LOG.error("Unexpected error in read loop", t);
+                }
+                onClose.accept(this); // invoked only here, when the socket 
dies for any reason
+                return; // thread's dead
+            }
+        }
+    }
+}
diff --git 
a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnectionManager.java
 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnectionManager.java
new file mode 100644
index 00000000..8b635327
--- /dev/null
+++ 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpConnectionManager.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.tcp;
+
+import org.apache.aries.rsa.discovery.tcp.TcpMessage.*;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.aries.rsa.discovery.tcp.TcpDiscovery.toURI;
+
+/**
+ * Manages all TCP connections for this provider, as well as
+ * the higher-level TCP discovery protocol logic.
+ * <p>
+ * This includes accepting incoming connections on a server socket,
+ * initiating outgoing connections to configured (or discovered) peers,
+ * connection retry logic after unexpected disconnection, handling
+ * incoming messages from connections and handling gossip-discovered peers.
+ * <p>
+ * In addition to the protocol-level functionality, it keeps track of
+ * all known local endpoints and notifies all remote peers about them.
+ */
+public class TcpConnectionManager implements EndpointEventListener {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TcpConnectionManager.class);
+
+    private final InterestManager interestManager;
+    private final String localAddress;
+    private final String localUuid;
+    private final long reconnectDelay;
+    private final boolean gossip;
+
+    private final ExecutorService executor = Executors.newCachedThreadPool(); 
// for connect/accept threads
+    private final Set<TcpConnection> connections = 
ConcurrentHashMap.newKeySet(); // all connections, including before handshake
+    private final Map<String, TcpConnection> connectionsByUuid = new 
ConcurrentHashMap<>(); // connections after handshake (known uuid)
+    private final Map<String, EndpointDescription> localEndpoints = new 
ConcurrentHashMap<>();
+    private final Set<String> peers = ConcurrentHashMap.newKeySet(); // all 
configured and discovered (gossip) peer addresses
+
+    private ServerSocket serverSocket;
+    private volatile boolean closing;
+
+    public TcpConnectionManager(InterestManager interestManager, String 
localAddress,
+            String localUuid, long reconnectDelay, boolean gossip) {
+        this.interestManager = interestManager;
+        this.localAddress = localAddress;
+        this.localUuid = localUuid;
+        this.reconnectDelay = reconnectDelay;
+        this.gossip = gossip;
+    }
+
+    public void open(String bindAddress, int port, Collection<String> peers) 
throws IOException {
+        serverSocket = new ServerSocket();
+        serverSocket.setReuseAddress(true);
+        serverSocket.bind(new InetSocketAddress(bindAddress, port));
+        executor.submit(this::acceptLoop);
+        addPeers(peers);
+    }
+
+    public void close() throws IOException {
+        closing = true;
+        serverSocket.close(); // acceptLoop will get SocketException
+        connections.forEach(TcpConnection::close);
+        executor.shutdownNow();
+    }
+
+    private void addPeers(Collection<String> peers) {
+        peers.stream()
+            .filter(peer -> !this.peers.contains(peer)) // only new ones
+            .filter(peer -> !localAddress.equals(peer)) // exclude ourself
+            .forEach(peer -> {
+                LOG.info("Adding peer {}", peer);
+                this.peers.add(peer);
+                executor.submit(() -> connectLoop(peer));
+            });
+    }
+
+    private void acceptLoop() {
+        while (true) {
+            try {
+                Socket socket = serverSocket.accept();
+                executor.submit(() -> {
+                    try {
+                        onConnected(socket, null);
+                    } catch (IOException ioe) {
+                        LOG.error("error initializing connection on accepted 
socket", ioe);
+                    }
+                });
+            } catch (IOException ioe) {
+                return; // socket closed
+            } catch (Throwable t) {
+                LOG.error("Unexpected error in accept loop - shutting down", 
t);
+                return;
+            }
+        }
+    }
+
+    private void connectLoop(String address) {
+        URI uri = toURI(address);
+        while (!closing) {
+            try {
+                Socket socket = new Socket(uri.getHost(), uri.getPort());
+                onConnected(socket, address);
+                return; // connection established; onConnectionClosed will 
restart this loop if needed
+            } catch (IOException ioe) {
+                LOG.debug("error connecting to {}, will retry soon", uri, ioe);
+                try {
+                    Thread.sleep(reconnectDelay);
+                } catch (InterruptedException ie) {
+                    // end loop (shutting down)
+                    return;
+                }
+            } catch (Throwable t) {
+                LOG.error("Unexpected error in connect loop - aborting 
connection to {}", uri, t);
+                return;
+            }
+        }
+    }
+
+    private void onConnected(Socket socket, String address) throws IOException 
{
+        try {
+            TcpConnection conn = new TcpConnection(socket, address, 
this::onMessage, this::onClosed);
+            connections.add(conn);
+            conn.send(new HandshakeMessage(localUuid, localAddress, new 
ArrayList<>(peers)));
+            // don't send known endpoints yet, only after receiving handshake
+        } catch (IOException ioe) {
+            try {
+                socket.close();
+            } catch (IOException ioe2) {
+                ioe.addSuppressed(ioe2);
+            }
+            throw ioe;
+        }
+    }
+
+    public void onClosed(TcpConnection conn) {
+        connections.remove(conn);
+        String peerUuid = conn.getPeerUuid();
+        if (peerUuid != null) { // passed the handshake
+            boolean removed = connectionsByUuid.remove(peerUuid, conn); // 
remove if this is the active connection
+            if (removed && !closing) {
+                interestManager.removePeer(peerUuid); // no active connections 
with peer
+            }
+        }
+        // re-start the connect retry thread if necessary:
+        // only if we're the outbound peer, and not in the process of shutting 
down,
+        // and there isn't already an active (reverse?) connection with the 
same peer,
+        // or we don't know who the peer is yet (before handshake)
+        if (conn.isOutbound() && !closing && (peerUuid == null || 
!connectionsByUuid.containsKey(peerUuid))) {
+            executor.submit(() -> connectLoop(conn.getPeerAddress()));
+        }
+    }
+
+    private void onMessage(TcpConnection conn, TcpMessage message) {
+        if (message instanceof HandshakeMessage) {
+            HandshakeMessage h = (HandshakeMessage) message;
+            // update the peer data
+            conn.setPeerAddress(h.getAddress());
+            conn.setPeerUuid(h.getUuid());
+            // if gossip is enabled, try adding all of this peer's peers,
+            // as well as the peer itself (in case it found us via its
+            // own gossip, and we haven't met before)
+            if (gossip) {
+                addPeers(h.getPeers());
+                addPeers(Collections.singleton(h.getAddress()));
+            }
+            // if we already have another connection with this peer (e.g. 
reverse direction)
+            // then we keep the old one (which is already in use) and close 
the new one
+            boolean existing = connectionsByUuid.putIfAbsent(h.getUuid(), 
conn) != null;
+            if (existing) {
+                // both sides can check if a connection between them already 
exists, but
+                // there is a possible race condition where one peer receives 
the handshake
+                // and closes the connection before it even had a chance to 
send its handshake -
+                // so the other peer will never know the connection is 
intentionally closed
+                // and not experiencing connection errors. If it is the 
outbound side, it will
+                // keep retrying to connect. to solve this, only the outbound 
peer is the one
+                // that closes the connection. The inbound side just doesn't 
use it until then.
+                if (conn.isOutbound()) {
+                    conn.close();
+                }
+                return;
+            }
+            // send all of our known local endpoints to the new peer
+            localEndpoints.values().forEach(endpoint -> conn.send(new 
UpdateMessage(endpoint.getProperties())));
+        } else if (message instanceof UpdateMessage) {
+            UpdateMessage u = (UpdateMessage) message;
+            EndpointDescription endpoint = new 
EndpointDescription(u.getProperties());
+            interestManager.addEndpoint(endpoint);
+        } else if (message instanceof RemoveMessage) {
+            RemoveMessage r = (RemoveMessage) message;
+            interestManager.removeEndpoint(conn.getPeerUuid(), 
r.getEndpointId());
+        } else {
+            throw new IllegalArgumentException("unsupported message type: " + 
message);
+        }
+    }
+
+    @Override
+    public void endpointChanged(EndpointEvent event, String filter) {
+        // notify all peers of the endpoint event
+        TcpMessage message;
+        EndpointDescription endpoint = event.getEndpoint();
+        String endpointId = endpoint.getId();
+        switch (event.getType()) {
+            case EndpointEvent.ADDED:
+            case EndpointEvent.MODIFIED:
+                localEndpoints.put(endpointId, endpoint);
+                message = new UpdateMessage(endpoint.getProperties());
+                break;
+            case EndpointEvent.MODIFIED_ENDMATCH:
+            case EndpointEvent.REMOVED:
+                localEndpoints.remove(endpointId);
+                message = new RemoveMessage(endpointId);
+                break;
+            default: throw new RuntimeException("Unknown event type: " + 
event.getType());
+        }
+        connectionsByUuid.values().forEach(c -> c.send(message));
+    }
+}
diff --git 
a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpDiscovery.java
 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpDiscovery.java
new file mode 100644
index 00000000..9efdd7f9
--- /dev/null
+++ 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpDiscovery.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.tcp;
+
+import org.apache.aries.rsa.annotations.RSADiscoveryProvider;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.osgi.framework.Constants.FRAMEWORK_UUID;
+import static 
org.osgi.service.component.annotations.ReferenceCardinality.MULTIPLE;
+import static org.osgi.service.component.annotations.ReferencePolicy.DYNAMIC;
+import static 
org.osgi.service.remoteserviceadmin.EndpointEventListener.ENDPOINT_LISTENER_SCOPE;
+import static 
org.osgi.service.remoteserviceadmin.RemoteConstants.ENDPOINT_FRAMEWORK_UUID;
+
+/**
+ * The main TCP Discovery provider component.
+ * <p>
+ * It initializes the provider using config admin configuration,
+ * initializes the {@link InterestManager} and {@link TcpConnectionManager},
+ * registers an {@link EndpointEventListener} to track locally exported
+ * endpoints, and listens for registrations of other
+ * EndpointEventListeners which are managed by the InterestManager.
+ */
+@RSADiscoveryProvider(protocols = "aries.tcp")
+@Component(immediate = true, configurationPid = TcpDiscovery.DISCOVERY_TCP_PID)
+public class TcpDiscovery {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TcpDiscovery.class);
+    public static final String DISCOVERY_TCP_PID = 
"org.apache.aries.rsa.discovery.tcp";
+    private static final String OWN_LISTENER_PROP = "aries.discovery.tcp";
+    public static final int DEFAULT_PORT = 7667;
+
+    @interface Config {
+        String address() default "localhost:" + DEFAULT_PORT;
+        String bindAddress() default "0.0.0.0";
+        String[] peers() default {};
+        long reconnectDelay() default 5000;
+        boolean gossip() default true;
+    }
+
+    private InterestManager interestManager;
+    private TcpConnectionManager connectionManager;
+    private ServiceRegistration<?> listenerRegistration;
+
+    public TcpDiscovery() {
+        // initialize in constructor before we start getting reference bind 
events
+        interestManager = new InterestManager();
+    }
+
+    public static URI toURI(String address) {
+        try {
+            URI uri = new URI("tcp://" + address);
+            if (uri.getPort() == -1) {
+                uri = new URI(uri.getScheme(), uri.getUserInfo(), 
uri.getHost(),
+                    DEFAULT_PORT, uri.getPath(), uri.getQuery(), 
uri.getFragment());
+            }
+            return uri;
+        } catch (URISyntaxException urise) {
+            LOG.error("failed to parse address " + address, urise);
+            throw new RuntimeException(urise);
+        }
+    }
+
+    // merge config from ConfigAdmin, framework properties and system 
properties
+    @SuppressWarnings("unchecked")
+    private <T extends Annotation> T mergeConfig(BundleContext context, String 
prefix, T config) {
+        Class<T> cls = (Class<T>)config.annotationType();
+        return (T)Proxy.newProxyInstance(cls.getClassLoader(), new Class[] { 
cls },
+            (proxy, method, args) -> {
+                Object value = method.invoke(config, args);
+                Object defaultValue = method.getDefaultValue();
+                if (method.getDeclaringClass() != Object.class && 
Objects.deepEquals(value, defaultValue)) {
+                    String prop = prefix + method.getName();
+                    value = context.getProperty(prop);
+                    if (value == null) {
+                        value = System.getProperty(prop);
+                    }
+                    if (value == null) {
+                        value = defaultValue;
+                    } else if (method.getReturnType() == Boolean.TYPE) {
+                        value = Boolean.valueOf(value.toString());
+                    } else if (method.getReturnType() == Integer.TYPE) {
+                        value = Integer.valueOf(value.toString());
+                    } else if (method.getReturnType() == Long.TYPE) {
+                        value = Long.valueOf(value.toString());
+                    } else if (method.getReturnType() == String[].class) {
+                        value = ((String)value).split("\\s*,\\s*");
+                    }
+                } else if (method.getDeclaringClass() == Object.class && 
method.getName().equals("toString")) {
+                    // add a nice toString that shows all merged config names 
and values (including arrays)
+                    value = Arrays.stream(cls.getMethods())
+                        .filter(m -> 
!m.getDeclaringClass().equals(Annotation.class)) // not Object.class!
+                        .collect(Collectors.<Method, String, 
Object>toMap(Method::getName, m -> {
+                            try {
+                                Object v = m.invoke(proxy);
+                                return (v == null || v instanceof Object[]) ? 
Arrays.toString((Object[])v) : v.toString();
+                            } catch (Exception ignore) {
+                                return "<ERROR>";
+                            }
+                        })).toString();
+                }
+                return value;
+            });
+    }
+
+    private void initConnectionManager(Config config, String uuid) throws 
IOException {
+        String address = config.address();
+        String bindAddress = config.bindAddress();
+        String[] peers = config.peers();
+        if (address == null || address.isEmpty() || address.equals("0.0.0.0")) 
{
+            throw new IllegalArgumentException("invalid address: " + address);
+        }
+        if (peers.length == 0) {
+            // no peers - may be legitimate in a server with incoming 
connections
+            // only, or for single-host dev/itest systems where the bundle 
should
+            // start successfully even if it's not doing anything too useful
+            LOG.info("no peers configured - will wait for incoming 
connections");
+        }
+        URI uri = toURI(address);
+        bindAddress = bindAddress == null ? uri.getHost() : 
toURI(bindAddress).getHost(); // just the host
+        connectionManager = new TcpConnectionManager(
+            interestManager, address, uuid, config.reconnectDelay(), 
config.gossip());
+        connectionManager.open(bindAddress, uri.getPort(), 
Arrays.asList(peers));
+    }
+
+    private void registerListener(BundleContext context, String uuid) {
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put(OWN_LISTENER_PROP, Boolean.TRUE); // mark our own listener 
for exclusion
+        String scope = "(&(objectClass=*)(" + ENDPOINT_FRAMEWORK_UUID + "=" + 
uuid + "))";
+        props.put(ENDPOINT_LISTENER_SCOPE, scope);
+        listenerRegistration = 
context.registerService(EndpointEventListener.class, connectionManager, props);
+    }
+
+    @Activate
+    void start(BundleContext context, Config config) {
+        String uuid = context.getProperty(FRAMEWORK_UUID);
+        config = mergeConfig(context, getClass().getPackageName() + ".", 
config);
+        LOG.info("Starting TCP discovery for framework {} with config {}", 
uuid, config);
+        try {
+            initConnectionManager(config, uuid);
+            // register ourselves to capture local endpoint exports
+            registerListener(context, uuid);
+        } catch (IOException ioe) {
+            LOG.error("failed to start TCP connection manager", ioe);
+        }
+    }
+
+    @Deactivate
+    void stop() throws IOException {
+        if (listenerRegistration != null) {
+            listenerRegistration.unregister();
+        }
+        if (connectionManager != null) {
+            connectionManager.close();
+        }
+    }
+
+    @Reference(cardinality = MULTIPLE, policy = DYNAMIC, target = "(!(" + 
OWN_LISTENER_PROP + "=*))")
+    void bindEndpointEventListener(ServiceReference<EndpointEventListener> 
sref, EndpointEventListener listener) {
+        interestManager.addListener(sref, listener, true);
+    }
+
+    void updatedEndpointEventListener(ServiceReference<EndpointEventListener> 
sref, EndpointEventListener listener) {
+        interestManager.addListener(sref, listener, false);
+    }
+
+    void unbindEndpointEventListener(ServiceReference<EndpointEventListener> 
sref) {
+        interestManager.removeListener(sref);
+    }
+}
diff --git 
a/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpMessage.java
 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpMessage.java
new file mode 100644
index 00000000..093a941b
--- /dev/null
+++ 
b/discovery/tcp/src/main/java/org/apache/aries/rsa/discovery/tcp/TcpMessage.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.tcp;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A message sent on the TCP connection.
+ */
+public class TcpMessage implements Serializable {
+
+    public static class HandshakeMessage extends TcpMessage {
+
+        private final String uuid;
+        private final String address;
+        private final List<String> peers;
+
+        public HandshakeMessage(String uuid, String address, List<String> 
peers) {
+            this.uuid = uuid;
+            this.address = address;
+            this.peers = peers;
+        }
+
+        public String getUuid() {
+            return uuid;
+        }
+
+        public String getAddress() {
+            return address;
+        }
+
+        public List<String> getPeers() {
+            return peers;
+        }
+    }
+
+    // there is no distinction between add and update, since it's
+    // not the sender's job to keep track of what each peer knows
+    public static class UpdateMessage extends TcpMessage {
+
+        private final Map<String, Object> properties;
+
+        public UpdateMessage(Map<String, Object> properties) {
+            this.properties = properties;
+        }
+
+        public Map<String, Object> getProperties() {
+            return properties;
+        }
+    }
+
+    public static class RemoveMessage extends TcpMessage {
+
+        private final String endpointId;
+
+        public RemoveMessage(String endpointId) {
+            this.endpointId = endpointId;
+        }
+
+        public String getEndpointId() {
+            return endpointId;
+        }
+    }
+}
diff --git a/features/src/main/resources/features.xml 
b/features/src/main/resources/features.xml
index d73f643f..b2abff4d 100644
--- a/features/src/main/resources/features.xml
+++ b/features/src/main/resources/features.xml
@@ -51,6 +51,11 @@
         
<bundle>mvn:org.apache.aries.rsa.discovery/org.apache.aries.rsa.discovery.config/${project.version}</bundle>
     </feature>
 
+    <feature name="aries-rsa-discovery-tcp" version="${project.version}">
+        <feature>aries-rsa-core</feature>
+        
<bundle>mvn:org.apache.aries.rsa.discovery/org.apache.aries.rsa.discovery.tcp/${project.version}</bundle>
+    </feature>
+
     <feature name="aries-rsa-discovery-zookeeper" version="${project.version}">
         <feature>aries-rsa-core</feature>
         
<bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.zookeeper/${zookeeper.version}</bundle>
diff --git a/itests/felix/pom.xml b/itests/felix/pom.xml
index 0bc28dbb..df3db818 100644
--- a/itests/felix/pom.xml
+++ b/itests/felix/pom.xml
@@ -144,6 +144,11 @@
             <artifactId>org.apache.aries.rsa.discovery.zookeeper</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.aries.rsa.discovery</groupId>
+            <artifactId>org.apache.aries.rsa.discovery.tcp</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>io.dropwizard.metrics</groupId>
             <artifactId>metrics-core</artifactId>
diff --git 
a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java 
b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java
index e5003f78..360e4432 100644
--- 
a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java
+++ 
b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/RsaTestBase.java
@@ -179,6 +179,11 @@ public class RsaTestBase {
                          mvn("org.apache.aries.rsa.discovery", 
"org.apache.aries.rsa.discovery.zookeeper"));
     }
 
+    protected static Option rsaDiscoveryTcp() {
+        return composite(
+            mvn("org.apache.aries.rsa.discovery", 
"org.apache.aries.rsa.discovery.tcp"));
+    }
+
     protected static Option rsaProviderTcp() {
         return mvn("org.apache.aries.rsa.provider", 
"org.apache.aries.rsa.provider.tcp");
     }
@@ -189,6 +194,13 @@ public class RsaTestBase {
                          mvn("org.apache.aries.rsa.provider", 
"org.apache.aries.rsa.provider.fastbin"));
     }
 
+    protected static Option configTcpDiscovery(int instance, int peerInstance) 
{
+        return newConfiguration("org.apache.aries.rsa.discovery.tcp") //
+                .put("address", "127.0.0.1:" + (7667 + instance)) //
+                .put("peers", "127.0.0.1:" + (7667 + peerInstance)) //
+                .asOption();
+    }
+
     protected static Option configZKDiscovery() {
         return newConfiguration("org.apache.aries.rsa.discovery.zookeeper") //
             .put("zookeeper.host", "127.0.0.1") //


Reply via email to