On 18.10.2011 19:24, Jeremy Sandell wrote:
On Mon, Oct 17, 2011 at 2:11 PM, Jacob Carlborg <[email protected]
<mailto:[email protected]>> wrote:
On 2011-10-17 16:01, Andrea Fontana wrote:
I handle request on different threads. I do some pre-processing on
scgi data and I fill a struct:
request.get[]
request.post[]
request.cookie[]
request.headers[string]
then I call a virtual function (to override on subclasses) like:
do(request, output);
where user fill output struct in a way like:
output.data ~= "<html><body><h1>hello world</h1></body></html>";
output.status = 200
output.cookies = bla bla
and then if is method != "head" i send headers + data, else just
"headers".
btw 99% of usage is get, post, head.
Yes, but if you want to write a web site that is RESTful you need
the other HTTP methods as well, at least PUT and DELETE.
BTW, what about creating something like Rack but for D. Rack is a
low level interface in front of the web server which web frameworks
can be built on top.
http://rack.github.com/
--
/Jacob Carlborg
Yes, this is exactly why I was wondering whether FastCGI had been
implemented (though SCGI works for me as well) - so that I could write
something on top of it, in much the same way I would using (for example)
WSGI in Python.
I also agree with you re: supporting all of the HTTP methods. Just
because the most common ones are GET, POST, and HEAD doesn't mean we
should leave out the others; both PUT and DELETE are quite useful.
Best regards,
Jeremy Sandell
Adam D. Ruppe has a wrapper for libfcgi at github. I started
implementing fcgi, but it's basically just a very, very limited
ugly-hack prototype. Doubt you'll get much use of it, but I'll attach it
here anyway.
module fcgi;
import std.socket, std.stdio, std.socketstream;
import std.datetime;
import std.concurrency;
import core.thread;
import std.process;
import std.conv;
private import std.c.windows.windows, std.c.windows.winsock;
///////////////
// PROTOCOL END
///////////////
S toStruct(S, T)(T[] buf) {
static ubyte[S.sizeof] buf2;
assert(buf.length >= S.sizeof);
buf2 = buf[0..S.sizeof];
return cast(S)(buf2);
}
/// Who is responsible for closing the socket?
enum SocketLifetimeOwner {
application = 1,
server = 2
}
alias void delegate(Request) RequestCallback;
alias void delegate(ushort, int, ProtocolStatus) EndRequestHandler;
class Request {
RequestId _id;
Params _params;
Role _role;
SocketLifetimeOwner _socketOwner;
EndRequestHandler _endRequestHandler;
ubyte[] _input;
ListenThread _server;
bool _ended;
this(RequestId id, ListenThread server, EndRequestHandler
endRequestHandler) {
_id = id;
_endRequestHandler = endRequestHandler;
_server = server;
}
@property ubyte[] input() pure nothrow @safe {
return _input;
}
@property RequestId id() pure const nothrow @safe {
return _id;
}
@property void id(RequestId id) pure nothrow @safe {
_id = id;
}
@property const(Params) params() pure const nothrow @safe {
return _params;
}
private @property void params(Params params) pure nothrow @safe {
_params = params;
}
@property Role role() pure const nothrow @safe {
return _role;
}
private @property void role(Role role) pure nothrow @safe {
_role = role;
}
@property SocketLifetimeOwner socketOwner() pure const nothrow @safe {
return _socketOwner;
}
@property void socketOwner(SocketLifetimeOwner owner) pure nothrow @safe {
_socketOwner = owner;
}
void write(T)(const(T[]) data, RecordType method = RecordType.stdOut) {
assert(!_ended);
// Break into chunks of 65535
auto socket = _server._active;
auto head = RecordHead(method, _id,
cast(ushort)(data.length));
socket.send(cast(ubyte[RecordHead.sizeof])head);
socket.send(cast(ubyte[])data);
head.contentLength = 0;
socket.send(cast(ubyte[RecordHead.sizeof])head);
}
void end() {
_endRequestHandler(_id, 0, ProtocolStatus.requestComplete);
_ended = true;
}
}
class Params {
private string[string] _params;
private this(string[string] params) {
_params = params;
}
private this(in ubyte[] raw) pure nothrow @trusted {
int i;
do {
auto keyLen = cast(ubyte) raw[i++];
auto valLen = cast(ubyte) raw[i++];
auto key = cast(string)raw[i .. i+keyLen];
i += keyLen;
auto value = cast(string)raw[i .. i+valLen];
i += valLen;
_params[key] = value;
} while(i < raw.length);
}
string opDispatch(string key)() const pure nothrow @safe {
auto v = key in _params;
return v ? *v : "";
}
override string toString() {
return to!string(_params);
}
}
class ListenThread : Thread {
this(string host, ushort port) {
this(host, port, &processRequest);
}
this(string host, ushort port, RequestCallback callback) {
_callback = callback;
_listener = new TcpSocket();
_listener.blocking = false;
_listener.bind(new InternetAddress(host, port));
debug writeln("listening to ", host, ":", port, " blocking:",
_listener.blocking);
super(&run);
}
protected void processRequest(Request request) {
assert(false, " this should be overridden");
}
private:
Socket _listener;
bool _quitting;
Socket _active;
Socket[] clients;
RequestCallback _callback;
enum MAX_CONNECTIONS = ubyte.max;
void run() {
ulong requests, reqPrSec, totMs, totHnReq, reqTmThisSec;
ulong lastReq;
auto sw = StopWatch(AutoStart.yes);
clients.reserve(MAX_CONNECTIONS);
auto ss = new SocketSet(MAX_CONNECTIONS+1);
_listener.listen(MAX_CONNECTIONS);
while(clients.length || !_quitting) {
// Wait for incoming
int select;
do {
ss.reset();
ss.add(_listener);
foreach(client; clients) {
ss.add(client);
}
// we'll time-out to check for quitting
select = Socket.select(ss, null, null, 1000);
} while(select <= 0 && !_quitting)
if(_quitting) {
_listener.shutdown(SocketShutdown.RECEIVE);
continue;
}
// Process pending requests
for(int i;i<clients.length;i++) {
auto client = clients[i];
if(!ss.isSet(client)) continue;
debug writeln(client.handle(), " begin");
_active = client;
auto reqStart = sw.peek.hnsecs;
auto request = readRequest(client);
if(!request) {
auto lastErr = WSAGetLastError();
switch(lastErr) {
case 0:
break;
case 10053:
writeln(client.handle(), " ECONNABORT");
break;
case 10054:
writeln(client.handle(), " ECONNRESET");
break;
case 10035:
writeln(client.handle(), " EWOULDBLOCK");
continue;
default:
writeln(client.handle(), " ERR: ", lastErr);
assert(false, to!string(lastErr));
break;
}
closeClient(client);
// replace current with last element, and rerun on this
index
clients[i] = clients[$-1];
clients = clients[0..$-1];
clients.assumeSafeAppend();
--i;
assert(clients.capacity);
continue;
}
assert(request.id, " shouldn't handle management requests");
debug writeln(_active.handle(), " handling request ",
request.id);
// TODO: handle management requests
_callback(request);
auto shouldClose = request.socketOwner ==
SocketLifetimeOwner.application;
// TODO: Make sure end is called
if(shouldClose) {
client.shutdown(SocketShutdown.SEND);
}
auto reqEnd = sw.peek.hnsecs;
requests++;
reqPrSec++;
auto used = cast(double)(reqEnd - reqStart);
reqTmThisSec += used;
totHnReq += cast(ulong)used;
//writeln("REQ HANDLED IN ", used, " hnsec = ", used/1000, "
msec = ", cast(double)totHnReq/cast(double)requests, " hnsec pr request, since
last:", reqEnd-lastReq);
lastReq = reqEnd;
if(sw.peek.msecs >= 1_000) {
auto rps = cast(double)reqPrSec/(sw.peek.hnsecs/10000000.0);
totMs += sw.peek.hnsecs;
auto trps = cast(double)requests/(totMs/10000000.0);
sw.reset();
writeln("ReqPrSec: ", rps, " AvgPrCB: ",
(cast(double)reqTmThisSec/cast(double)reqPrSec)/1000.0);
writeln("TOT: ", requests, " HNS: ", 10000000.0/trps);
reqPrSec = 0;
reqTmThisSec = 0;
}
}
_active = null;
// Accept new connections
if(ss.isSet(_listener)) {
assert(clients.capacity);
auto client = _listener.accept();
debug writeln(client.handle(), " accepted");
client.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER,
std.socket.linger(1, 60));
client.setOption(SocketOptionLevel.SOCKET,
SocketOption.REUSEADDR, 1);
clients ~= client;
}
}
_listener.shutdown(SocketShutdown.RECEIVE);
_listener.close();
}
void closeClient(Socket socket) {
debug writeln(socket.handle(), " closing");
socket.shutdown(SocketShutdown.BOTH);
socket.close();
}
void endRequest(
RequestId requestId,
int applicationStatus,
ProtocolStatus protocolStatus)
{
auto socket = _active;
assert(socket);
debug writeln(socket.handle(), " ending request ", requestId, " app: ",
applicationStatus, " prot: ", protocolStatus);
auto head = RecordHead(RecordType.endRequest, requestId,
EndRequestBody.sizeof);
socket.send(cast(ubyte[RecordHead.sizeof])head);
auto endRequest = EndRequestBody(applicationStatus, protocolStatus);
socket.send(cast(ubyte[EndRequestBody.sizeof])endRequest);
}
Request readRequest(Socket _socket) {
static Request[RequestId] partialRequests;
Request request;
RecordHead head;
static ubyte[RecordHead.sizeof] headBuf = void;
static ubyte[ushort.max] content = void;
bool readingStream;
do {
// Receive header
assert(_socket.isAlive);
debug writeln(_socket.handle(), " receive record");
auto len = _socket.receive(headBuf);
if(len == Socket.ERROR || len == 0) {
debug writeln(_socket.handle(), " finished or error,
numPartialRequests: ", partialRequests.length);
return null;
}
// Get (partial) request
head = cast(RecordHead)headBuf;
debug writeln(_socket.handle(), " ", head);
auto existing = head.requestId in partialRequests;
if(!existing || head.type == RecordType.beginRequest) {
partialRequests[head.requestId] =
request = new Request(head.requestId, this, &endRequest);
} else {
request = *existing;
}
// Read request data
if(head.contentLength) {
len = _socket.receive(content[0..head.contentLength]);
assert(len == head.contentLength);
}
// Handle Record
final switch(head.type) {
case RecordType.beginRequest:
debug writeln(_socket.handle(), " BEGINREQUEST");
auto beginRequest = toStruct!(BeginRequestBody)(content);
request.role = beginRequest.role;
request.socketOwner = beginRequest.keepConnection ?
SocketLifetimeOwner.server :
SocketLifetimeOwner.application;
debug writeln(_socket.handle(), " Request: ",
request.requestId, " role: ", request.role, " owner: ", request.socketOwner);
break;
case RecordType.abortRequest:
writeln("ABORTREQUEST!");
assert(false);
break;
case RecordType.endRequest:
writeln("ENDREQUEST!");
assert(false);
break;
case RecordType.params:
debug writeln(_socket.handle(), " PARAMS");
if(head.contentLength) {
assert(!request.params);
debug writeln(_socket.handle(), " Parsing params");
request.params = new
Params(content[0..head.contentLength].dup);
} else {
// params finished
debug writeln(_socket.handle(), " -> DONE");
}
break;
case RecordType.stdIn:
if(head.contentLength) {
readingStream = true;
// TODO: use some appender
request._input ~= content[0..head.contentLength];
debug writeln(_socket.handle(), " STDIN! LEN:",
head.contentLength, " - ", cast(char[])request._input);
} else {
readingStream = false;
debug writeln(_socket.handle(), " STDIN DONE! - ",
cast(char[])request._input);
}
break;
case RecordType.stdOut:
writeln("STDOUT!");
assert(false);
break;
case RecordType.stdErr:
writeln("STDERR!");
assert(false);
break;
case RecordType.data:
writeln("DATA!");
assert(false);
break;
case RecordType.getValues:
writeln("GETVALUES!");
assert(false);
break;
case RecordType.getValuesResult:
writeln("GETVALUESRESULT!");
assert(false);
break;
case RecordType.unknownType:
writeln("UNKNOWNTYPE!");
assert(false);
break;
}
if(head.paddingLength) {
len = _socket.receive(content[0..head.paddingLength]);
assert(len == head.paddingLength);
assert(len <= ubyte.max);
}
} while(!(head.type == RecordType.stdIn && head.contentLength == 0))
debug writeln(_socket.handle(), " finished reading to stream");
assert(request);
partialRequests.remove(request.id);
return request;
}
}
void onRequest(Request request) {
debug writeln("request: ", request.requestId, " creating response");
//auto content = "<html><body>request:
"~to!string(cast(string[string])request.params._params)~"</body></html>";
auto content = "<html><body>Hello World</body></html>";
auto http = "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\nContent-Type:
text/html;
charset=utf-8\r\nContent-Length:"~to!string(content.length)~"\r\n\r\n";
request.write(http);
request.write(content);
request.end();
}
void main(string[] args) {
auto listener = new ListenThread("localhost", to!ushort(args[1]), (Request
r){ onRequest(r);});
listener.start();
}
/********************************************************************************
* FastCGI Protocol
* http://www.fastcgi.com/drupal/node/6?q=node/22
********************************************************************************/
enum ubyte FastCGIVersion = 1;
enum RequestId FastCGIManagementRequestId = 0;
auto pack(A...)(in A arr) pure nothrow @safe
{
enum requiredSize = A.length * A[0].sizeof;
static assert(requiredSize <= long.sizeof);
enum signed = is(A[0] == byte) || is(A[0] == short) ||
is(A[0] == int) || is(A[0] == long);
static if(requiredSize == 8 && signed)
alias long R;
else static if(requiredSize == 8 && !signed)
alias ulong R;
else static if(requiredSize == 4 && signed)
alias int R;
else static if(requiredSize == 4 && !signed)
alias uint R;
else static if(requiredSize == 2 && signed)
alias short R;
else static if(requiredSize == 2 && !signed)
alias ushort R;
else static if(requiredSize == 1 && signed)
alias byte R;
else static if(requiredSize == 1 && !signed)
alias ubyte R;
else
static assert(false);
R result;
void add(int N)() {
result |= arr[N] << ((A.length-N-1)*8);
static if(N < A.length-1)
add!(N+1)();
}
add!(0)();
return result;
}
auto unpack(T)(const T v) pure nothrow @safe {
enum N = T.sizeof/ubyte.sizeof;
ubyte[N] result = void;
void add(int I)() {
result[I] = (v >> (I*8)) & 0xFF;
static if(I < N-1)
add!(I+1)();
}
add!(0)();
return result;
}
enum RecordType : ubyte {
beginRequest = 1,
abortRequest = 2,
endRequest = 3,
params = 4,
stdIn = 5,
stdOut = 6,
stdErr = 7,
data = 8,
getValues = 9,
getValuesResult = 10,
unknownType = 11,
}
alias ushort RequestId;
struct RecordHead {
ubyte protocolVersion = FastCGIVersion;
RecordType type;
ubyte requestIdB1;
ubyte requestIdB0;
ubyte contentLengthB1;
ubyte contentLengthB0;
ubyte paddingLength;
ubyte reserved;
this(RecordType type, RequestId requestId, ushort contentLength) pure
nothrow @safe {
this.type = type;
this.requestId = requestId;
this.contentLength = contentLength;
this.protocolVersion = FastCGIVersion;
}
pure @safe const nothrow invariant() {
assert(protocolVersion == FastCGIVersion);
}
@property RequestId requestId() const pure nothrow @safe {
return pack(requestIdB1, requestIdB0);
}
@property void requestId(RequestId id) pure nothrow @safe {
auto unpacked = unpack(id);
requestIdB1 = unpacked[1];
requestIdB0 = unpacked[0];
}
@property ushort contentLength() const pure nothrow @safe {
return pack(contentLengthB1, contentLengthB0);
}
@property void contentLength(ushort len) pure nothrow @safe {
auto unpacked = unpack(len);
contentLengthB1 = unpacked[1];
contentLengthB0 = unpacked[0];
}
@property bool isManagementRecord() const pure nothrow @safe {
return requestId == FastCGIManagementRequestId;
}
@property bool isApplicationRecord() const pure nothrow @safe {
return !isManagementRecord;
}
}
enum Role : short {
responder = 1,
authorizer = 2,
filter = 3
}
enum Flags {
keepConnection = 1
}
struct BeginRequestBody {
ubyte roleB1;
ubyte roleB0;
ubyte flags;
ubyte[5] reserved;
@property Role role() const pure nothrow @safe {
return cast(Role)pack(roleB1, roleB0);
}
@property void role(Role role) pure nothrow @safe {
auto unpacked = unpack(role);
roleB1 = unpacked[1];
roleB0 = unpacked[0];
}
@property bool keepConnection() const pure nothrow @safe {
return (flags & Flags.keepConnection) != 0;
}
}
enum ProtocolStatus : ubyte {
requestComplete = 0,
cannotMultiplexConnection = 1,
overloaded = 2,
unknownRole = 3,
}
struct EndRequestBody {
ubyte appStatusB3;
ubyte appStatusB2;
ubyte appStatusB1;
ubyte appStatusB0;
ProtocolStatus protocolStatus;
ubyte[3] reserved;
this(int appStatus, ProtocolStatus status) pure nothrow @safe {
this.appStatus = appStatus;
this.protocolStatus = status;
}
@property int appStatus() const pure nothrow @safe {
return pack(appStatusB3, appStatusB2, appStatusB1, appStatusB0);
}
@property void appStatus(int status) pure nothrow @safe {
auto unpacked = unpack(status);
appStatusB3 = unpacked[3];
appStatusB2 = unpacked[2];
appStatusB1 = unpacked[1];
appStatusB0 = unpacked[0];
}
}
struct UnknownTypeBody {
ubyte type;
ubyte[7] reserved;
}