Author: jfclere
Date: Thu Apr  9 16:32:04 2009
New Revision: 763726

URL: http://svn.apache.org/viewvc?rev=763726&view=rev
Log:
Add the Tcp code... Still need the code in httpd-trunk to test it.

Added:
    tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java
    tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java
Modified:
    tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java
    tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java
    tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java

Modified: 
tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java?rev=763726&r1=763725&r2=763726&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java Thu 
Apr  9 16:32:04 2009
@@ -29,11 +29,6 @@
 
 import org.apache.catalina.connector.Connector;
 
-import java.net.MulticastSocket;
-import java.net.InetAddress;
-import java.net.DatagramPacket;
-import java.io.UnsupportedEncodingException;
-
 import org.apache.tomcat.util.modeler.Registry;
 
 /*
@@ -66,6 +61,20 @@
     public void setTtl(int ttl) { this.ttl = ttl; }
     public int getTtl() { return ttl; }
 
+    /**
+     * Proxy list, format "address:port,address:port".
+     */
+    protected String proxyList = null;
+    public String getProxyList() { return proxyList; }
+    public void setProxyList(String proxyList) { this.proxyList = proxyList; }
+
+    /**
+     * URL prefix.
+     */
+    protected String proxyURL = "/HeartbeatListener";
+    public String getProxyURL() { return proxyURL; }
+    public void setProxyURL(String proxyURL) { this.proxyURL = proxyURL; }
+
     private CollectedInfo coll = null;
 
     private Sender sender = null;
@@ -77,8 +86,18 @@
         Object source = event.getLifecycle();
         if (Lifecycle.PERIODIC_EVENT.equals(event.getType())) {
             if (sender == null) {
-                sender = new MultiCastSender();
-                sender.init(this);
+                if (proxyList == null)
+                    sender = new MultiCastSender();
+                else
+                    sender = new TcpSender();
+
+                try {
+                    sender.init(this);
+                } catch (Exception ex) {
+                    log.error("Unable to initialize Sender: " + ex);
+                    sender = null;
+                    return;
+                }
             }
 
             /* Read busy and ready */

Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java?rev=763726&r1=763725&r2=763726&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java Thu 
Apr  9 16:32:04 2009
@@ -40,7 +40,7 @@
     MulticastSocket s = null;
     InetAddress group = null;
 
-    public void init(HeartbeatListener config) {
+    public void init(HeartbeatListener config) throws Exception {
         this.config = config;
     }
 

Added: tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java?rev=763726&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java (added)
+++ tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java Thu Apr  9 
16:32:04 2009
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+import java.net.InetAddress;
+
+/*
+ * This class represents a front-end httpd server.
+ *
+ */
+public class Proxy {
+
+  protected enum State { OK, ERROR, DOWN };
+
+  public InetAddress address = null;
+  public int port = 80;
+  public State state = State.OK;
+}

Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java?rev=763726&r1=763725&r2=763726&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java (original)
+++ tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java Thu Apr  9 
16:32:04 2009
@@ -27,7 +27,7 @@
   /**
    * Set the configuration parameters
    */
-  public void init(HeartbeatListener config);
+  public void init(HeartbeatListener config) throws Exception;
 
   /**
    * Send the message to the proxies

Added: tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java?rev=763726&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java (added)
+++ tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java Thu Apr  9 
16:32:04 2009
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.InetAddress;
+import java.io.UnsupportedEncodingException;
+import java.util.StringTokenizer;
+
+/*
+ * Sender to proxies using multicast socket.
+ */
+public class TcpSender
+    implements Sender {
+
+    private static Log log = LogFactory.getLog(HeartbeatListener.class);
+
+    HeartbeatListener config = null;
+
+    /**
+     * Proxies.
+     */
+    protected Proxy[] proxies = null;
+
+
+    /**
+     * Active connections.
+     */
+
+    protected Socket[] connections = null;
+    protected BufferedReader[] connectionReaders = null;
+    protected BufferedWriter[] connectionWriters = null;
+
+
+    public void init(HeartbeatListener config) throws Exception {
+        this.config = config;
+        StringTokenizer tok = new StringTokenizer(config.getProxyList(), ",");
+        proxies = new Proxy[tok.countTokens()];
+        int i = 0;
+        while (tok.hasMoreTokens()) {
+            String token = tok.nextToken().trim();
+            int pos = token.indexOf(':');
+            if (pos <=0)
+                throw new Exception("bad ProxyList");
+            proxies[i] = new Proxy();
+            proxies[i].port = Integer.parseInt(token.substring(pos + 1));
+            try {
+                 proxies[i].address = InetAddress.getByName(token.substring(0, 
pos));
+            } catch (Exception e) {
+                throw new Exception("bad ProxyList");
+            }
+            i++;
+        }
+        connections = new Socket[proxies.length];
+        connectionReaders = new BufferedReader[proxies.length];
+        connectionWriters = new BufferedWriter[proxies.length];
+
+    }
+
+    public int send(String mess) throws Exception {
+        if (connections == null) {
+            log.error("Not initialized");
+            return -1;
+        }
+        String requestLine = "POST " + config.getProxyURL() + " HTTP/1.0";
+
+        for (int i = 0; i < connections.length; i++) {
+            if (connections[i] == null) {
+                try {
+                    connections[i] = new Socket(proxies[i].address, 
proxies[i].port);
+                    connectionReaders[i] = new BufferedReader(new 
InputStreamReader(connections[i].getInputStream()));
+                    connectionWriters[i] = new BufferedWriter(new 
OutputStreamWriter(connections[i].getOutputStream()));
+                } catch (Exception ex) {
+                    log.error("Unable to connect to proxy: " + ex);
+                    close(i);
+                } 
+            }
+            if (connections[i] == null)
+                continue; // try next proxy in the list
+            BufferedWriter writer = connectionWriters[i];
+            try {
+                writer.write(requestLine); 
+                writer.write("\r\n");
+                writer.write("Content-Length: " + mess.length() + "\r\n");
+                writer.write("User-Agent: HeartbeatListener/1.0\r\n");
+                writer.write("Connection: Keep-Alive\r\n");
+                writer.write("\r\n");
+                writer.write(mess);
+                writer.write("\r\n");
+                writer.flush();
+            } catch (Exception ex) {
+                log.error("Unable to send collected load information to proxy: 
" + ex);
+                close(i);
+            } 
+            if (connections[i] == null)
+                continue; // try next proxy in the list
+            
+            /* Read httpd answer */
+            String responseStatus = connectionReaders[i].readLine();
+            if (responseStatus == null) {
+                log.error("Unable to read response from proxy");
+                close(i);
+                continue;
+            } else {
+                responseStatus = 
responseStatus.substring(responseStatus.indexOf(' ') + 1, 
responseStatus.indexOf(' ', responseStatus.indexOf(' ') + 1));
+                int status = Integer.parseInt(responseStatus);
+                if (status != 200) {
+                    log.error("Status is " + status);
+                    close(i);
+                    continue;
+                }
+
+                // read all the headers.
+                String header = connectionReaders[i].readLine();
+                int contentLength = 0;
+                while (!"".equals(header)) {
+                    int colon = header.indexOf(':');
+                    String headerName = header.substring(0, colon).trim();
+                    String headerValue = header.substring(colon + 1).trim();
+                    if ("content-length".equalsIgnoreCase(headerName)) {
+                        contentLength = Integer.parseInt(headerValue);
+                    }
+                }
+                if (contentLength > 0) {
+                    char[] buf = new char[512];
+                    while (contentLength > 0) {
+                        int thisTime = (contentLength > buf.length) ? 
buf.length : contentLength;
+                        int n = connectionReaders[i].read(buf, 0, thisTime);
+                        if (n <= 0) {
+                            log.error("Read content failed");
+                            close(i);
+                            break;
+                        } else {
+                            contentLength -= n;
+                        }
+                   }
+                }
+            }
+               
+        }
+
+        return 0;
+    }
+
+    /**
+     * Close connection.
+     */
+    protected void close(int i) {
+        try {
+            if (connectionReaders[i] != null) {
+                connectionReaders[i].close();
+            }
+        } catch (IOException e) {
+        }
+        connectionReaders[i] = null;
+        try {
+            if (connectionWriters[i] != null) {
+                connectionWriters[i].close();
+            }
+        } catch (IOException e) {
+        }
+        connectionWriters[i] = null;
+        try {
+            if (connections[i] != null) {
+                connections[i].close();
+            }
+        } catch (IOException e) {
+        }
+        connections[i] = null;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to