Author: dblevins
Date: Sun Nov 11 13:13:41 2012
New Revision: 1407972
URL: http://svn.apache.org/viewvc?rev=1407972&view=rev
Log:
OPENEJB-1794: Multipoint Automatic Reconnect
Modified:
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
Modified:
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1407972&r1=1407971&r2=1407972&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
(original)
+++
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
Sun Nov 11 13:13:41 2012
@@ -27,10 +27,13 @@ import org.apache.openejb.util.Logger;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketAddress;
import java.net.URI;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
@@ -51,7 +54,13 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -86,7 +95,7 @@ public class MultipointServer {
private final Tracker tracker;
- private final LinkedList<URI> connect = new LinkedList<URI>();
+ private final LinkedList<Host> connect = new LinkedList<Host>();
private final Map<URI, Session> connections = new HashMap<URI, Session>();
private long joined = 0;
@@ -197,7 +206,14 @@ public class MultipointServer {
}
public List<URI> getConnectionsQueued() {
- return new ArrayList<URI>(connect);
+ synchronized (connect) {
+ final ArrayList<URI> uris = new ArrayList<URI>(connect.size());
+ for (Host host : connect) {
+ uris.add(host.getUri());
+ }
+ return uris;
+ }
+
}
public long getReconnectDelay() {
@@ -219,10 +235,12 @@ public class MultipointServer {
if (System.nanoTime() - joined <= reconnectDelay) return;
for (URI uri : roots) {
+ final Host host = new Host(uri);
synchronized (connect) {
- if (!connections.containsKey(uri) && !connect.contains(uri)) {
+ if (!connections.containsKey(uri) && !connect.contains(host)) {
log.info("Reconnect{uri=" + uri + "}");
- connect.addLast(uri);
+ connect.addLast(host);
+ host.resolveDns();
this.joined = System.nanoTime();
}
}
@@ -570,6 +588,7 @@ public class MultipointServer {
// This keeps the heartbeat and rejoin regular
selectorTimeout = adjustedSelectorTimeout(start);
}
+ log.info("MultipointServer has terminated.");
}
private long adjustedSelectorTimeout(long start) {
@@ -581,25 +600,46 @@ public class MultipointServer {
}
private void initiateConnections() {
+
synchronized (connect) {
+ final LinkedList<Host> unresolved = new LinkedList<Host>();
+
while (connect.size() > 0) {
- final URI uri = connect.removeFirst();
+ final Host host = connect.removeFirst();
- if (connections.containsKey(uri)) continue;
+ log.debug("Initiate(uri=" + host.getUri() + ")");
- final int port = uri.getPort();
- final String host = uri.getHost();
+ if (connections.containsKey(host.getUri())) continue;
+
+ if (!host.isDone()) {
+ unresolved.add(host);
+ log.debug("Unresolved(uri=" + host.getUri() + ")");
+ continue;
+ }
+
+ final InetSocketAddress address;
+ try {
+ address = host.getSocketAddress();
+ } catch (ExecutionException e) {
+ final Throwable t = (e.getCause() != null) ? e.getCause()
: e;
+ final String message = String.format("Failed
Connect{uri=%s} %s{message=\"%s\"}", host.getUri(),
t.getClass().getSimpleName(), t.getMessage());
+ log.warning(message);
+ continue;
+ } catch (TimeoutException e) {
+ unresolved.add(host);
+ log.debug("Unresolved(uri=" + host.getUri() + ")");
+ continue;
+ }
try {
+ final URI uri = host.getUri();
println("open " + uri);
// Create a non-blocking NIO channel
final SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
- final InetSocketAddress address = new
InetSocketAddress(host, port);
-
socketChannel.connect(address);
final Session session = new Session(socketChannel,
address, uri);
@@ -613,6 +653,8 @@ public class MultipointServer {
throw new ServerRuntimeException(e);
}
}
+
+ connect.addAll(unresolved);
}
}
@@ -857,7 +899,9 @@ public class MultipointServer {
private ArrayList<URI> connections() {
synchronized (connect) {
final ArrayList<URI> list = new
ArrayList<URI>(connections.keySet());
- list.addAll(connect);
+ for (Host host : connect) {
+ list.add(host.getUri());
+ }
return list;
}
}
@@ -907,10 +951,13 @@ public class MultipointServer {
uri = normalize(uri);
if (me.equals(uri)) return;
+ final Host host = new Host(uri);
+
synchronized (connect) {
- if (!connections.containsKey(uri) && !connect.contains(uri)) {
- log.debug("Queuing{uri=" + uri + "}");
- connect.addLast(uri);
+ if (!connections.containsKey(uri) && !connect.contains(host)) {
+ log.info("Queuing{uri=" + uri + "}");
+ connect.addLast(host);
+ host.resolveDns();
}
}
}
@@ -1138,4 +1185,58 @@ public class MultipointServer {
return s;
}
+
+ private final Executor dnsResolutionQueue =
Executors.newFixedThreadPool(2);
+
+ private class Host {
+ private final URI uri;
+ private final FutureTask<InetAddress> address;
+
+ private Host(URI uri) {
+ this.uri = uri;
+ this.address = new FutureTask<InetAddress>(new
Callable<InetAddress>(){
+ public InetAddress call() throws Exception {
+ return InetAddress.getByName(Host.this.uri.getHost());
+ }
+ });
+ }
+
+ public void resolveDns() {
+ dnsResolutionQueue.execute(address);
+ }
+
+ public boolean isDone() {
+ return address.isDone();
+ }
+
+ public InetSocketAddress getSocketAddress() throws ExecutionException,
TimeoutException {
+ try {
+ final InetAddress inetAddress = address.get(0,
TimeUnit.NANOSECONDS);
+ return new InetSocketAddress(inetAddress, uri.getPort());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new TimeoutException();
+ }
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Host host = (Host) o;
+
+ return uri.equals(host.uri);
+ }
+
+ @Override
+ public int hashCode() {
+ return uri.hashCode();
+ }
+ }
}