Author: jfclere Date: Thu Apr 9 16:32:04 2009 New Revision: 763726 URL: http://svn.apache.org/viewvc?rev=763726&view=rev Log: Add the Tcp code... Still need the code in httpd-trunk to test it.
Added: tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java?rev=763726&r1=763725&r2=763726&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java (original) +++ tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java Thu Apr 9 16:32:04 2009 @@ -29,11 +29,6 @@ import org.apache.catalina.connector.Connector; -import java.net.MulticastSocket; -import java.net.InetAddress; -import java.net.DatagramPacket; -import java.io.UnsupportedEncodingException; - import org.apache.tomcat.util.modeler.Registry; /* @@ -66,6 +61,20 @@ public void setTtl(int ttl) { this.ttl = ttl; } public int getTtl() { return ttl; } + /** + * Proxy list, format "address:port,address:port". + */ + protected String proxyList = null; + public String getProxyList() { return proxyList; } + public void setProxyList(String proxyList) { this.proxyList = proxyList; } + + /** + * URL prefix. + */ + protected String proxyURL = "/HeartbeatListener"; + public String getProxyURL() { return proxyURL; } + public void setProxyURL(String proxyURL) { this.proxyURL = proxyURL; } + private CollectedInfo coll = null; private Sender sender = null; @@ -77,8 +86,18 @@ Object source = event.getLifecycle(); if (Lifecycle.PERIODIC_EVENT.equals(event.getType())) { if (sender == null) { - sender = new MultiCastSender(); - sender.init(this); + if (proxyList == null) + sender = new MultiCastSender(); + else + sender = new TcpSender(); + + try { + sender.init(this); + } catch (Exception ex) { + log.error("Unable to initialize Sender: " + ex); + sender = null; + return; + } } /* Read busy and ready */ Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java?rev=763726&r1=763725&r2=763726&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java Thu Apr 9 16:32:04 2009 @@ -40,7 +40,7 @@ MulticastSocket s = null; InetAddress group = null; - public void init(HeartbeatListener config) { + public void init(HeartbeatListener config) throws Exception { this.config = config; } Added: tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java?rev=763726&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java (added) +++ tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java Thu Apr 9 16:32:04 2009 @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.catalina.ha.backend; + +import java.net.InetAddress; + +/* + * This class represents a front-end httpd server. + * + */ +public class Proxy { + + protected enum State { OK, ERROR, DOWN }; + + public InetAddress address = null; + public int port = 80; + public State state = State.OK; +} Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java?rev=763726&r1=763725&r2=763726&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java (original) +++ tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java Thu Apr 9 16:32:04 2009 @@ -27,7 +27,7 @@ /** * Set the configuration parameters */ - public void init(HeartbeatListener config); + public void init(HeartbeatListener config) throws Exception; /** * Send the message to the proxies Added: tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java?rev=763726&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java (added) +++ tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java Thu Apr 9 16:32:04 2009 @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.catalina.ha.backend; + +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.InetAddress; +import java.net.Socket; +import java.net.InetAddress; +import java.io.UnsupportedEncodingException; +import java.util.StringTokenizer; + +/* + * Sender to proxies using multicast socket. + */ +public class TcpSender + implements Sender { + + private static Log log = LogFactory.getLog(HeartbeatListener.class); + + HeartbeatListener config = null; + + /** + * Proxies. + */ + protected Proxy[] proxies = null; + + + /** + * Active connections. + */ + + protected Socket[] connections = null; + protected BufferedReader[] connectionReaders = null; + protected BufferedWriter[] connectionWriters = null; + + + public void init(HeartbeatListener config) throws Exception { + this.config = config; + StringTokenizer tok = new StringTokenizer(config.getProxyList(), ","); + proxies = new Proxy[tok.countTokens()]; + int i = 0; + while (tok.hasMoreTokens()) { + String token = tok.nextToken().trim(); + int pos = token.indexOf(':'); + if (pos <=0) + throw new Exception("bad ProxyList"); + proxies[i] = new Proxy(); + proxies[i].port = Integer.parseInt(token.substring(pos + 1)); + try { + proxies[i].address = InetAddress.getByName(token.substring(0, pos)); + } catch (Exception e) { + throw new Exception("bad ProxyList"); + } + i++; + } + connections = new Socket[proxies.length]; + connectionReaders = new BufferedReader[proxies.length]; + connectionWriters = new BufferedWriter[proxies.length]; + + } + + public int send(String mess) throws Exception { + if (connections == null) { + log.error("Not initialized"); + return -1; + } + String requestLine = "POST " + config.getProxyURL() + " HTTP/1.0"; + + for (int i = 0; i < connections.length; i++) { + if (connections[i] == null) { + try { + connections[i] = new Socket(proxies[i].address, proxies[i].port); + connectionReaders[i] = new BufferedReader(new InputStreamReader(connections[i].getInputStream())); + connectionWriters[i] = new BufferedWriter(new OutputStreamWriter(connections[i].getOutputStream())); + } catch (Exception ex) { + log.error("Unable to connect to proxy: " + ex); + close(i); + } + } + if (connections[i] == null) + continue; // try next proxy in the list + BufferedWriter writer = connectionWriters[i]; + try { + writer.write(requestLine); + writer.write("\r\n"); + writer.write("Content-Length: " + mess.length() + "\r\n"); + writer.write("User-Agent: HeartbeatListener/1.0\r\n"); + writer.write("Connection: Keep-Alive\r\n"); + writer.write("\r\n"); + writer.write(mess); + writer.write("\r\n"); + writer.flush(); + } catch (Exception ex) { + log.error("Unable to send collected load information to proxy: " + ex); + close(i); + } + if (connections[i] == null) + continue; // try next proxy in the list + + /* Read httpd answer */ + String responseStatus = connectionReaders[i].readLine(); + if (responseStatus == null) { + log.error("Unable to read response from proxy"); + close(i); + continue; + } else { + responseStatus = responseStatus.substring(responseStatus.indexOf(' ') + 1, responseStatus.indexOf(' ', responseStatus.indexOf(' ') + 1)); + int status = Integer.parseInt(responseStatus); + if (status != 200) { + log.error("Status is " + status); + close(i); + continue; + } + + // read all the headers. + String header = connectionReaders[i].readLine(); + int contentLength = 0; + while (!"".equals(header)) { + int colon = header.indexOf(':'); + String headerName = header.substring(0, colon).trim(); + String headerValue = header.substring(colon + 1).trim(); + if ("content-length".equalsIgnoreCase(headerName)) { + contentLength = Integer.parseInt(headerValue); + } + } + if (contentLength > 0) { + char[] buf = new char[512]; + while (contentLength > 0) { + int thisTime = (contentLength > buf.length) ? buf.length : contentLength; + int n = connectionReaders[i].read(buf, 0, thisTime); + if (n <= 0) { + log.error("Read content failed"); + close(i); + break; + } else { + contentLength -= n; + } + } + } + } + + } + + return 0; + } + + /** + * Close connection. + */ + protected void close(int i) { + try { + if (connectionReaders[i] != null) { + connectionReaders[i].close(); + } + } catch (IOException e) { + } + connectionReaders[i] = null; + try { + if (connectionWriters[i] != null) { + connectionWriters[i].close(); + } + } catch (IOException e) { + } + connectionWriters[i] = null; + try { + if (connections[i] != null) { + connections[i].close(); + } + } catch (IOException e) { + } + connections[i] = null; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org