Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java?view=auto&rev=462889 ============================================================================== --- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java (added) +++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java Wed Oct 11 11:02:08 2006 @@ -0,0 +1,524 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.niohttp.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +/** + * A generic parser for HTTP incoming request/response digesting + * <p/> + * RFC 2616 + * Request = Request-Line ; Section 5.1 + * *(( general-header ; Section 4.5 + * | request-header ; Section 5.3 + * | entity-header ) CRLF) ; Section 7.1 + * CRLF + * [ message-body ] ; Section 4.3 + * Request-Line = Method SP Request-URI SP HTTP-Version CRLF + * <p/> + * Response = Status-Line ; Section 6.1 + * *(( general-header ; Section 4.5 + * | response-header ; Section 6.2 + * | entity-header ) CRLF) ; Section 7.1 + * CRLF + * [ message-body ] ; Section 7.2 + * Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF + */ +public class ReadHandler { + + private static final Log log = LogFactory.getLog(ReadHandler.class); + + CharsetDecoder asciiDecoder = Charset.forName("us-ascii").newDecoder(); + + ByteBuffer buffer = ByteBuffer.allocate(4096); + ByteBuffer chunkedBuffer = ByteBuffer.allocate(4096); + HttpMessage httpMessage; + + // where should new bytes read from the incoming channel be stored + int readPos; + int processPos; /* position within the main 'buffer' */ + + // holders for parsed data + String curHeaderName = null; + StringBuffer curHeaderValue = new StringBuffer(); + int bodyStart; + int contentLength; + int currentChunkRemainder; + boolean lastChunkReceived = false; + + // holders of internal state of the parser + private boolean requestMode = true; + private boolean parsingMessageLine = true; + private boolean parsingHeader = true; + private boolean parsingChunks = false; + private boolean messageComplete = false; + + public void reset() { + buffer.clear(); + chunkedBuffer.clear(); + if (requestMode) { + httpMessage = new HttpRequest(); + } else { + httpMessage = new HttpResponse(); + } + readPos = 0; + processPos = 0; + curHeaderName = null; + curHeaderValue = new StringBuffer(); + bodyStart = 0; + contentLength = 0; + currentChunkRemainder = 0; + lastChunkReceived = false; + parsingMessageLine = true; + parsingHeader = true; + parsingChunks = false; + messageComplete = false; + } + + public ReadHandler(boolean requestMode) { + this.requestMode = requestMode; + if (requestMode) { + httpMessage = new HttpRequest(); + } else { + httpMessage = new HttpResponse(); + } + } + + public boolean handle(SocketChannel socket, SelectionKey sk) { + try { + // set position within buffer to read from channel + buffer.position(readPos); + + // perform read from channel to this location + int bytesRead = socket.read(buffer); + if (bytesRead == -1) { + // end of stream reached + socket.close(); + sk.cancel(); + debug("end-of-stream detected and socket closed"); + return false; + } + + //if (log.isDebugEnabled()) { + debug("Read from socket to buffer position: " + readPos + " to: " + buffer.position()); + debug(Util.dumpAsHex(buffer.array(), readPos)); + //} + + // save position for next read + readPos = buffer.position(); + return processIncomingMessage(); + + } catch (IOException e) { + log.warn("Unexpected error reading from socket. Closing connection : " + e.getMessage()); + try { + socket.close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + sk.cancel(); + } + return false; + } + + private boolean isMessageComplete() { + return messageComplete; + } + + public boolean isConnectionClose() { + return httpMessage.isConnectionClose(); + } + + private boolean processIncomingMessage() { + + debug("\tprocessing httpMessage"); + if (parsingMessageLine) { + debug("\t\tparsing httpMessage line"); + parseMessageLine(); + } + if (!parsingMessageLine && parsingHeader) { + debug("\t\tparsing headers"); + parseHeaders(); + } + if (!parsingHeader && !messageComplete) { + debug("\t\tparsing body"); + parseBody(); + } + + return messageComplete; + } + + private void skipBlankLines(ByteBuffer buf) { + + buffer.position(processPos); + int pos = processPos; + int start = pos; + + while (pos + 1 < readPos && + buf.get(pos) == Constants.CR && + buf.get(pos + 1) == Constants.LF) { + pos += 2; + } + // did we really skip any? + if (pos > start) { + processPos = pos; // advanced processed position + } + } + + private String readToSpace(ByteBuffer buf) { + return readToDelimeter(buf, Constants.SP); + } + + private String readToColon(ByteBuffer buf) { + return readToDelimeter(buf, Constants.COLON); + } + + /** + * read to the position of the byte given by 'delim' from the + * processPos position of the buffer. + * <p/> + * updates processPos if 'delim' was found, to the found position + * + * @param buf + * @param delim + * @return the bytes from the start position to the delimiter (excluding) as a string + */ + private String readToDelimeter(ByteBuffer buf, final byte delim) { + + buffer.position(processPos); + int pos = processPos; + int start = pos; + + // processPos (what we have not digested yet) + // | delim readPos (where we will read to next) + // | | | + // x x x x D x x x . . . . + // ^ ^ + // what we should get as a result of this operation + + while (pos < readPos && buf.get(pos) != delim) { + pos++; + } + + if (pos < readPos) { + debug("\t\t\t$$readToDelimeter(" + delim + ") FOUND"); + processPos = pos + 1; // advance over processed bytes and delim + return extractAsString(start, pos - 1); + } else { + debug("\t\t\t$$readToDelimeter(" + delim + ") NOT FOUND"); + return null; + } + } + + private String extractAsString(int start, int end) { + + if (end < start) return ""; + + byte[] temp = new byte[end - start + 1]; + buffer.position(start); + buffer.get(temp, 0, end - start + 1); + + try { + debug("\t\t@@ extractAsString(" + start + ", " + end + ") from Buffer " + + buffer + " as : " + asciiDecoder.decode(ByteBuffer.wrap(temp)).toString()); + return asciiDecoder.decode(ByteBuffer.wrap(temp)).toString(); + } catch (CharacterCodingException e) { + e.printStackTrace(); + return null; // TODO + } + } + + private String readToCRLF(ByteBuffer buf) { + + buffer.position(processPos); + int pos = processPos; + int start = pos; + + while (pos + 1 < readPos && + buf.get(pos) != Constants.CR && buf.get(pos + 1) != Constants.LF) { + pos++; + } + + if (pos < readPos) { + debug("\t\t\t$$readToCRLF() FOUND"); + processPos = pos + 2; // advance over processed bytes and CR LF + return extractAsString(start, pos - 1); + } + return null; + } + + private void parseMessageLine() { + + // skip any blank lines + skipBlankLines(buffer); + + if (requestMode) { + parseRequestLine(); + } else { + parseStatusLine(); + } + } + + private void parseRequestLine() { + HttpRequest request = (HttpRequest) httpMessage; + + // read method + if (request.getMethod() == null) { + String method = readToSpace(buffer); + if (method != null) { + request.setMethod(method); + } else { + return; + } + } + + // read URI + if (request.getPath() == null) { + String uri = readToSpace(buffer); + if (uri != null) { + request.setPath(uri); + } else { + return; + } + } + + // read version string + if (request.getProtocol() == null) { + String proto = readToCRLF(buffer); + if (proto != null) { + request.setProtocol(proto); + parsingMessageLine = false; + } else { + return; + } + } + } + + private void parseStatusLine() { + HttpResponse response = (HttpResponse) httpMessage; + + // read version + if (response.getVersion() == null) { + String version = readToSpace(buffer); + if (version != null) { + response.setVersion(version); + } else { + return; + } + } + + // read code + if (response.getStatus().getCode() == 0) { + String code = readToSpace(buffer); + if (code != null) { + response.getStatus().setCode(Integer.parseInt(code)); + } else { + return; + } + } + + // read message + if (response.getStatus().getMessage() == null) { + String msg = readToCRLF(buffer); + if (msg != null) { + response.getStatus().setMessage(msg); + parsingMessageLine = false; + } else { + return; + } + } + } + + private void parseHeaders() { + while ((processPos < readPos) && parseHeader()) { + debug("\t\t\t... parsing headers..."); + } + + debug("\t\t\t...exit parsingheaders loop : processPos: " + + processPos + " readPos : " + readPos); + } + + // return false after reading a blank line + private boolean parseHeader() { + + debug("\t\tParse Header processPos: " + processPos + " readPos: " + readPos); + + String line = readToCRLF(buffer); + if (line == null) { + return false; + } else { + // handle multi line headers later todo + int colon = line.indexOf(":"); + if (colon != -1) { + httpMessage.addHeader( + line.substring(0, colon), + line.substring(colon + 2/* include skip space too*/)); + } else if (line.length() == 0) { + + debug("\t\t\theaders parsed"); + parsingHeader = false; + + // prepare to parse body + bodyStart = processPos; + debug("\t\t\tparsed headers. begin parsing body to buffer position:" + bodyStart); + + if (httpMessage.isChunked()) { + parsingChunks = true; + } else { + contentLength = httpMessage.getContentLength(); + } + + return false; + } + } + return true; + } + + private String parseHeaderName(ByteBuffer buf) { + return readToColon(buf); + } + + private String parseHeaderValue(ByteBuffer buf) { + int firstChar; + do { + String value = readToCRLF(buf); + if (value != null) { + curHeaderValue.append(value); + } + firstChar = buf.get(buf.position()); + } while (firstChar == Constants.SP || firstChar == Constants.HT); + return curHeaderValue.toString(); + } + + private boolean parseNextChunk() { + debug("\t\t\tparseNextChunk(currentChunkRemainder: " + currentChunkRemainder + + " processPos: " + processPos + " readPos: " + readPos); + if (currentChunkRemainder > 0) { + // now start processing from where we left off until we reach the end + buffer.position(processPos); + + byte b; + while (currentChunkRemainder > 0 && buffer.position() < readPos) { + b = buffer.get(); + chunkedBuffer.put(b); + processPos++; + currentChunkRemainder--; + } + + if (currentChunkRemainder == 0) { + // read to end of data CRLF and discard trailing CRLF + debug("\t\t\tcurrentChunkRemainder is 0 .. reading to CRLF.."); + readToCRLF(buffer); + } + } + if (currentChunkRemainder == 0) { + // is there another chunk? + String chunkHead = readToCRLF(buffer); + debug("\t\t\treading chunkHead : " + chunkHead); + if (chunkHead != null && chunkHead.length() > 0) { + int lenEnd = chunkHead.indexOf(';'); + if (lenEnd != -1) { + currentChunkRemainder = Integer.parseInt( + chunkHead.substring(0, lenEnd).trim(), 16 /* radix */); + } else { + currentChunkRemainder = Integer.parseInt( + chunkHead.trim(), 16 /* radix */); + } + } else { + return true; + } + + // did we encounter the "0" chunk? + if (currentChunkRemainder == 0) { + + debug("\t\t\tall chunks received"); + chunkedBuffer.flip(); + + // read upto end of next CRLF + String footer; + do { + debug("\t\t\t...parsing chunk footers..."); + footer = readToCRLF(buffer); + // TODO process footers if we need + } while (footer != null && !"".equals(footer)); + + /*try { + CharBuffer cb = asciiDecoder.decode(chunkedBuffer); + debug("Chunked Buffer : \n" + cb.toString()); + } catch (CharacterCodingException e) { + e.printStackTrace(); + }*/ + + if ("".equals(footer)) { + lastChunkReceived = true; + messageComplete = true; + return false; + } + } + } + + return true; // continue to recursively call this same method + } + + private void parseBody() { + debug("\t\t\tparseBody(processPos: " + processPos + ", readPos: " + readPos + ")"); + if (parsingChunks && !lastChunkReceived) { + while (processPos < readPos && parseNextChunk()) { + debug("\t\t\t...parsing body chunk...."); + } + + if (lastChunkReceived && messageComplete) { + // copy chunked body to main buffer, to start at the bodyStart position + buffer.position(bodyStart); + chunkedBuffer.position(0); + buffer.put(chunkedBuffer); + buffer.flip(); + httpMessage.setBuffer(buffer, bodyStart); + } + + } else { + + if (readPos >= bodyStart + contentLength) { + // do we have the whole body in our buffer? + processPos = readPos; + buffer.position(processPos); + buffer.flip(); + + debug("\t\t\tfinish reading. body starts from: " + + bodyStart + " and ends: " + processPos + " in buffer : " + buffer); + httpMessage.setBuffer(buffer, bodyStart); + messageComplete = true; + } + } + } + + public HttpMessage getHttpMessage() { + return httpMessage; + } + + // TODO remove this method... this is too much debugging!! + private static void debug(String msg) { + //System.out.println(msg); + } +}
Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ResponseStatus.java URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ResponseStatus.java?view=auto&rev=462889 ============================================================================== --- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ResponseStatus.java (added) +++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ResponseStatus.java Wed Oct 11 11:02:08 2006 @@ -0,0 +1,58 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.niohttp.impl; + +public class ResponseStatus { + + public static final ResponseStatus OK = + new ResponseStatus(200, Constants.OK); + public static final ResponseStatus MOVED_PERMANENTLY = + new ResponseStatus(301, "Moved Permanently"); + public static final ResponseStatus NOT_FOUND = + new ResponseStatus(404, "Not Found"); + public static final ResponseStatus INTERNAL_SERVER_ERROR = + new ResponseStatus(500, "Internal Server Error"); + + private int code; + private String message; + + public ResponseStatus() {} + + public ResponseStatus(int code, String message) { + this.code = code; + this.message = message; + } + + public String toString() { + return code + Constants.STRING_SP + message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } +} Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Util.java URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Util.java?view=auto&rev=462889 ============================================================================== --- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Util.java (added) +++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Util.java Wed Oct 11 11:02:08 2006 @@ -0,0 +1,151 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.niohttp.impl; + +import java.net.SocketException; +import java.net.NetworkInterface; +import java.net.InetAddress; +import java.util.Enumeration; +import java.io.OutputStream; +import java.io.InputStream; +import java.io.IOException; + +public class Util { + + /** + * Copied from transport.http of Axis2 + * <p/> + * Returns the ip address to be used for the replyto epr + * CAUTION: + * This will go through all the available network interfaces and will try to return an ip address. + * First this will try to get the first IP which is not loopback address (127.0.0.1). If none is found + * then this will return this will return 127.0.0.1. + * This will <b>not<b> consider IPv6 addresses. + * <p/> + * TODO: + * - Improve this logic to genaralize it a bit more + * - Obtain the ip to be used here from the Call API + * + * @return Returns String. + * @throws java.net.SocketException + */ + public static String getIpAddress() throws SocketException { + Enumeration e = NetworkInterface.getNetworkInterfaces(); + String address = "127.0.0.1"; + + while (e.hasMoreElements()) { + NetworkInterface netface = (NetworkInterface) e.nextElement(); + Enumeration addresses = netface.getInetAddresses(); + + while (addresses.hasMoreElements()) { + InetAddress ip = (InetAddress) addresses.nextElement(); + if (!ip.isLoopbackAddress() && isIP(ip.getHostAddress())) { + return ip.getHostAddress(); + } + } + } + return address; + } + + private static boolean isIP(String hostAddress) { + return hostAddress.split("[.]").length == 4; + } + + /** + * Dumps the given bytes to STDOUT as a hex dump (up to length bytes). + * + * @param byteBuffer the data to print as hex + * @param length the number of bytes to print + * @return ... + */ + public static final String dumpAsHex(byte[] byteBuffer, int length) { + StringBuffer outputBuf = new StringBuffer(length * 4); + + int p = 0; + int rows = length / 16; + + for (int i = 0; (i < rows) && (p < length); i++) { + int ptemp = p; + + for (int j = 0; j < 16; j++) { + String hexVal = Integer.toHexString(byteBuffer[ptemp] & 0xff); + + if (hexVal.length() == 1) { + hexVal = "0" + hexVal; //$NON-NLS-1$ + } + + outputBuf.append(hexVal + " "); //$NON-NLS-1$ + ptemp++; + } + + outputBuf.append(" "); //$NON-NLS-1$ + + for (int j = 0; j < 16; j++) { + if ((byteBuffer[p] > 32) && (byteBuffer[p] < 127)) { + outputBuf.append((char) byteBuffer[p] + ""); //$NON-NLS-1$ + } else { + outputBuf.append("."); //$NON-NLS-1$ + } + + p++; + } + + outputBuf.append("\n"); //$NON-NLS-1$ + } + + int n = 0; + + for (int i = p; i < length; i++) { + String hexVal = Integer.toHexString(byteBuffer[i] & 0xff); + + if (hexVal.length() == 1) { + hexVal = "0" + hexVal; //$NON-NLS-1$ + } + + outputBuf.append(hexVal + " "); //$NON-NLS-1$ + n++; + } + + for (int i = n; i < 16; i++) { + outputBuf.append(" "); //$NON-NLS-1$ + } + + outputBuf.append(" "); //$NON-NLS-1$ + + for (int i = p; i < length; i++) { + if ((byteBuffer[i] > 32) && (byteBuffer[i] < 127)) { + outputBuf.append((char) byteBuffer[i] + ""); //$NON-NLS-1$ + } else { + outputBuf.append("."); //$NON-NLS-1$ + } + } + + outputBuf.append("\n"); //$NON-NLS-1$ + + return outputBuf.toString(); + } + + public static void copyStreams(InputStream in, OutputStream out) throws IOException { + // Transfer bytes from in to out + byte[] buf = new byte[4096]; + int len; + while ((len = in.read(buf)) > 0) { + out.write(buf, 0, len); + } + in.close(); + out.close(); + } +} Added: incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/WriteHandler.java URL: http://svn.apache.org/viewvc/incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/WriteHandler.java?view=auto&rev=462889 ============================================================================== --- incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/WriteHandler.java (added) +++ incubator/synapse/trunk/java/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/WriteHandler.java Wed Oct 11 11:02:08 2006 @@ -0,0 +1,61 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.niohttp.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +public class WriteHandler { + + private static final Log log = LogFactory.getLog(WriteHandler.class); + + private ByteBuffer buffer; + private boolean connectionClose; + + public void setMessage(ByteBuffer buffer, boolean connectionClose) { + this.buffer = buffer; + this.connectionClose = connectionClose; + } + + public boolean isConnectionClose() { + return connectionClose; + } + + /** + * @param socket + * @return true if response has been completely written + */ + public boolean handle(SocketChannel socket) { + try { + log.debug("Writing to wire : \n" + Util.dumpAsHex(buffer.array(), buffer.limit())); + if (buffer.remaining() > 0) { + log.debug("Writing response.."); + socket.write(buffer); + if (buffer.remaining() == 0) { + log.debug("Completely wrote response"); + return true; + } + } + } catch (IOException e) { + e.printStackTrace(); + } + return false; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
