Author: dblevins
Date: Sun Nov 11 13:13:41 2012
New Revision: 1407972

URL: http://svn.apache.org/viewvc?rev=1407972&view=rev
Log:
OPENEJB-1794: Multipoint Automatic Reconnect

Modified:
    
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java

Modified: 
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1407972&r1=1407971&r2=1407972&view=diff
==============================================================================
--- 
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
 (original)
+++ 
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
 Sun Nov 11 13:13:41 2012
@@ -27,10 +27,13 @@ import org.apache.openejb.util.Logger;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.URI;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
@@ -51,7 +54,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -86,7 +95,7 @@ public class MultipointServer {
 
     private final Tracker tracker;
 
-    private final LinkedList<URI> connect = new LinkedList<URI>();
+    private final LinkedList<Host> connect = new LinkedList<Host>();
     private final Map<URI, Session> connections = new HashMap<URI, Session>();
 
     private long joined = 0;
@@ -197,7 +206,14 @@ public class MultipointServer {
     }
 
     public List<URI> getConnectionsQueued() {
-        return new ArrayList<URI>(connect);
+        synchronized (connect) {
+            final ArrayList<URI> uris = new ArrayList<URI>(connect.size());
+            for (Host host : connect) {
+                uris.add(host.getUri());
+            }
+            return uris;
+        }
+
     }
 
     public long getReconnectDelay() {
@@ -219,10 +235,12 @@ public class MultipointServer {
         if (System.nanoTime() - joined <= reconnectDelay) return;
 
         for (URI uri : roots) {
+            final Host host = new Host(uri);
             synchronized (connect) {
-                if (!connections.containsKey(uri) && !connect.contains(uri)) {
+                if (!connections.containsKey(uri) && !connect.contains(host)) {
                     log.info("Reconnect{uri=" + uri + "}");
-                    connect.addLast(uri);
+                    connect.addLast(host);
+                    host.resolveDns();
                     this.joined = System.nanoTime();
                 }
             }
@@ -570,6 +588,7 @@ public class MultipointServer {
             // This keeps the heartbeat and rejoin regular
             selectorTimeout = adjustedSelectorTimeout(start);
         }
+        log.info("MultipointServer has terminated.");
     }
 
     private long adjustedSelectorTimeout(long start) {
@@ -581,25 +600,46 @@ public class MultipointServer {
     }
 
     private void initiateConnections() {
+
         synchronized (connect) {
+            final LinkedList<Host> unresolved = new LinkedList<Host>();
+
             while (connect.size() > 0) {
 
-                final URI uri = connect.removeFirst();
+                final Host host = connect.removeFirst();
 
-                if (connections.containsKey(uri)) continue;
+                log.debug("Initiate(uri=" + host.getUri() + ")");
 
-                final int port = uri.getPort();
-                final String host = uri.getHost();
+                if (connections.containsKey(host.getUri())) continue;
+
+                if (!host.isDone()) {
+                    unresolved.add(host);
+                    log.debug("Unresolved(uri=" + host.getUri() + ")");
+                    continue;
+                }
+
+                final InetSocketAddress address;
+                try {
+                    address = host.getSocketAddress();
+                } catch (ExecutionException e) {
+                    final Throwable t = (e.getCause() != null) ? e.getCause() 
: e;
+                    final String message = String.format("Failed 
Connect{uri=%s} %s{message=\"%s\"}", host.getUri(), 
t.getClass().getSimpleName(), t.getMessage());
+                    log.warning(message);
+                    continue;
+                } catch (TimeoutException e) {
+                    unresolved.add(host);
+                    log.debug("Unresolved(uri=" + host.getUri() + ")");
+                    continue;
+                }
 
                 try {
+                    final URI uri = host.getUri();
                     println("open " + uri);
 
                     // Create a non-blocking NIO channel
                     final SocketChannel socketChannel = SocketChannel.open();
                     socketChannel.configureBlocking(false);
 
-                    final InetSocketAddress address = new 
InetSocketAddress(host, port);
-
                     socketChannel.connect(address);
 
                     final Session session = new Session(socketChannel, 
address, uri);
@@ -613,6 +653,8 @@ public class MultipointServer {
                     throw new ServerRuntimeException(e);
                 }
             }
+
+            connect.addAll(unresolved);
         }
     }
 
@@ -857,7 +899,9 @@ public class MultipointServer {
     private ArrayList<URI> connections() {
         synchronized (connect) {
             final ArrayList<URI> list = new 
ArrayList<URI>(connections.keySet());
-            list.addAll(connect);
+            for (Host host : connect) {
+                list.add(host.getUri());
+            }
             return list;
         }
     }
@@ -907,10 +951,13 @@ public class MultipointServer {
         uri = normalize(uri);
         if (me.equals(uri)) return;
 
+        final Host host = new Host(uri);
+
         synchronized (connect) {
-            if (!connections.containsKey(uri) && !connect.contains(uri)) {
-                log.debug("Queuing{uri=" + uri + "}");
-                connect.addLast(uri);
+            if (!connections.containsKey(uri) && !connect.contains(host)) {
+                log.info("Queuing{uri=" + uri + "}");
+                connect.addLast(host);
+                host.resolveDns();
             }
         }
     }
@@ -1138,4 +1185,58 @@ public class MultipointServer {
 
         return s;
     }
+
+    private final Executor dnsResolutionQueue = 
Executors.newFixedThreadPool(2);
+
+    private class Host {
+        private final URI uri;
+        private final FutureTask<InetAddress> address;
+
+        private Host(URI uri) {
+            this.uri = uri;
+            this.address = new FutureTask<InetAddress>(new 
Callable<InetAddress>(){
+                public InetAddress call() throws Exception {
+                    return InetAddress.getByName(Host.this.uri.getHost());
+                }
+            });
+        }
+
+        public void resolveDns() {
+            dnsResolutionQueue.execute(address);
+        }
+
+        public boolean isDone() {
+            return address.isDone();
+        }
+
+        public InetSocketAddress getSocketAddress() throws ExecutionException, 
TimeoutException {
+            try {
+                final InetAddress inetAddress = address.get(0, 
TimeUnit.NANOSECONDS);
+                return new InetSocketAddress(inetAddress, uri.getPort());
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                throw new TimeoutException();
+            }
+        }
+
+        public URI getUri() {
+            return uri;
+        }
+
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Host host = (Host) o;
+
+            return uri.equals(host.uri);
+        }
+
+        @Override
+        public int hashCode() {
+            return uri.hashCode();
+        }
+    }
 }


Reply via email to