On 18.10.2011 19:24, Jeremy Sandell wrote:
On Mon, Oct 17, 2011 at 2:11 PM, Jacob Carlborg <d...@me.com
<mailto:d...@me.com>> wrote:

    On 2011-10-17 16:01, Andrea Fontana wrote:

        I handle request on different threads. I do some pre-processing on
        scgi data and I fill a struct:

        request.get[]
        request.post[]
        request.cookie[]
        request.headers[string]

        then I call a virtual function (to override on subclasses) like:

        do(request, output);

        where user fill output struct in a way like:

        output.data ~= "<html><body><h1>hello world</h1></body></html>";
        output.status = 200
        output.cookies = bla bla

        and then if is method != "head" i send headers + data, else just
        "headers".

        btw 99% of usage is get, post, head.


    Yes, but if you want to write a web site that is RESTful you need
    the other HTTP methods as well, at least PUT and DELETE.

    BTW, what about creating something like Rack but for D. Rack is a
    low level interface in front of the web server which web frameworks
    can be built on top.

    http://rack.github.com/

    --
    /Jacob Carlborg


Yes, this is exactly why I was wondering whether FastCGI had been
implemented (though SCGI works for me as well) - so that I could write
something on top of it, in much the same way I would using (for example)
WSGI in Python.

I also agree with you re: supporting all of the HTTP methods. Just
because the most common ones are GET, POST, and HEAD doesn't mean we
should leave out the others; both PUT and DELETE are quite useful.

Best regards,
Jeremy Sandell

Adam D. Ruppe has a wrapper for libfcgi at github. I started implementing fcgi, but it's basically just a very, very limited ugly-hack prototype. Doubt you'll get much use of it, but I'll attach it here anyway.
module fcgi;

import std.socket, std.stdio, std.socketstream;
import std.datetime;
import std.concurrency;
import core.thread;
import std.process;
import std.conv;
private import std.c.windows.windows, std.c.windows.winsock;




///////////////
// PROTOCOL END
///////////////

S toStruct(S, T)(T[] buf) {
    static ubyte[S.sizeof] buf2;
    assert(buf.length >= S.sizeof);
    buf2 = buf[0..S.sizeof];
    return cast(S)(buf2);
}

/// Who is responsible for closing the socket?
enum SocketLifetimeOwner {
    application = 1,
    server      = 2
}

alias void delegate(Request) RequestCallback;

alias void delegate(ushort, int, ProtocolStatus) EndRequestHandler;

class Request {
    RequestId   _id;
    Params      _params;
    Role        _role;
    SocketLifetimeOwner _socketOwner;
    EndRequestHandler _endRequestHandler;
    ubyte[]     _input;
    ListenThread _server;
    bool _ended;

    this(RequestId id, ListenThread server, EndRequestHandler 
endRequestHandler) {
        _id                 = id;
        _endRequestHandler  = endRequestHandler;
        _server             = server;
    }

    @property ubyte[] input() pure nothrow @safe {
        return _input;
    }

    @property RequestId id() pure const nothrow @safe {
        return _id;
    }

    @property void id(RequestId id) pure nothrow @safe {
        _id = id;
    }

    @property const(Params) params() pure const nothrow @safe {
        return _params;
    }

    private @property void params(Params params) pure nothrow @safe {
        _params = params;
    }

    @property Role role() pure const nothrow @safe {
        return _role;
    }

    private @property void role(Role role) pure nothrow @safe {
        _role = role;
    }

    @property SocketLifetimeOwner socketOwner() pure const nothrow @safe {
        return _socketOwner;
    }

    @property void socketOwner(SocketLifetimeOwner owner) pure nothrow @safe {
        _socketOwner = owner;
    }

    void write(T)(const(T[]) data, RecordType method = RecordType.stdOut) {
        assert(!_ended);
        // Break into chunks of 65535
        auto socket = _server._active;

        auto head = RecordHead(method, _id,
                cast(ushort)(data.length));
        socket.send(cast(ubyte[RecordHead.sizeof])head);
        socket.send(cast(ubyte[])data);

        head.contentLength = 0;
        socket.send(cast(ubyte[RecordHead.sizeof])head);
    }

    void end() {
        _endRequestHandler(_id, 0, ProtocolStatus.requestComplete);
        _ended = true;
    }
}

class Params {
    private string[string] _params;

    private this(string[string] params) {
        _params = params;
    }

    private this(in ubyte[] raw) pure nothrow @trusted {
        int i;
        do {
            auto keyLen = cast(ubyte) raw[i++];
            auto valLen = cast(ubyte) raw[i++];
            auto key    = cast(string)raw[i .. i+keyLen];
            i += keyLen;
            auto value  = cast(string)raw[i .. i+valLen];
            i += valLen;
            _params[key] = value;
        } while(i < raw.length);
    }

    string opDispatch(string key)() const pure nothrow @safe {
        auto v = key in _params;
        return v ? *v : "";
    }

    override string toString() {
        return to!string(_params);
    }
}

class ListenThread : Thread {
    this(string host, ushort port) {
        this(host, port, &processRequest);
    }

    this(string host, ushort port, RequestCallback callback) {
        _callback = callback;
        _listener = new TcpSocket();
        _listener.blocking = false;
        _listener.bind(new InternetAddress(host, port));
        debug writeln("listening to ", host, ":", port, " blocking:", 
_listener.blocking);
        super(&run);
    }

    protected void processRequest(Request request) {
        assert(false, " this should be overridden");
    }

private:
    Socket _listener;
    bool _quitting;
    Socket _active;
    Socket[] clients;
    RequestCallback _callback;
    enum MAX_CONNECTIONS = ubyte.max;

    void run() {
        ulong requests, reqPrSec, totMs, totHnReq, reqTmThisSec;
        ulong lastReq;
        auto sw = StopWatch(AutoStart.yes);
        clients.reserve(MAX_CONNECTIONS);
        auto ss = new SocketSet(MAX_CONNECTIONS+1);
        _listener.listen(MAX_CONNECTIONS);
        while(clients.length || !_quitting) {
            // Wait for incoming
            int select;
            do {
                ss.reset();
                ss.add(_listener);
                foreach(client; clients) {
                    ss.add(client);
                }
                // we'll time-out to check for quitting
                select = Socket.select(ss, null, null, 1000);
            } while(select <= 0 && !_quitting)
            if(_quitting) {
                _listener.shutdown(SocketShutdown.RECEIVE);
                continue;
            }

            // Process pending requests
            for(int i;i<clients.length;i++) {
                auto client = clients[i];
                if(!ss.isSet(client)) continue;
                debug writeln(client.handle(), " begin");
                _active = client;

                auto reqStart = sw.peek.hnsecs;
                auto request = readRequest(client);
                if(!request) {
                    auto lastErr = WSAGetLastError();
                    switch(lastErr) {
                        case 0:
                            break;
                        case 10053:
                            writeln(client.handle(), " ECONNABORT");
                            break;
                        case 10054:
                            writeln(client.handle(), " ECONNRESET");
                            break;
                        case 10035:
                            writeln(client.handle(), " EWOULDBLOCK");
                            continue;
                        default:
                            writeln(client.handle(), " ERR: ", lastErr);
                            assert(false, to!string(lastErr));
                            break;
                    }
                    closeClient(client);
                    // replace current with last element, and rerun on this 
index
                    clients[i] = clients[$-1];
                    clients = clients[0..$-1];
                    clients.assumeSafeAppend();
                    --i;
                    assert(clients.capacity);
                    continue;
                }
                assert(request.id, " shouldn't handle management requests");
                debug writeln(_active.handle(), " handling request ", 
request.id);
                // TODO: handle management requests
                _callback(request);
                auto shouldClose = request.socketOwner == 
SocketLifetimeOwner.application;
                // TODO: Make sure end is called
                if(shouldClose) {
                    client.shutdown(SocketShutdown.SEND);
                }

                auto reqEnd = sw.peek.hnsecs;
                requests++;
                reqPrSec++;
                auto used = cast(double)(reqEnd - reqStart);
                reqTmThisSec += used;
                totHnReq += cast(ulong)used;
                //writeln("REQ HANDLED IN ", used, " hnsec = ", used/1000, " 
msec = ", cast(double)totHnReq/cast(double)requests, " hnsec pr request, since 
last:", reqEnd-lastReq);
                lastReq = reqEnd;


                if(sw.peek.msecs >= 1_000) {
                    auto rps = cast(double)reqPrSec/(sw.peek.hnsecs/10000000.0);
                    totMs += sw.peek.hnsecs;
                    auto trps = cast(double)requests/(totMs/10000000.0);
                    sw.reset();
                    writeln("ReqPrSec: ", rps, " AvgPrCB: ", 
(cast(double)reqTmThisSec/cast(double)reqPrSec)/1000.0);
                    writeln("TOT: ", requests, " HNS: ", 10000000.0/trps);
                    reqPrSec = 0;
                    reqTmThisSec = 0;
                }
            }
            _active = null;

            // Accept new connections
            if(ss.isSet(_listener)) {
                assert(clients.capacity);
                auto client = _listener.accept();
                debug writeln(client.handle(), " accepted");
                client.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER,
                        std.socket.linger(1, 60));
                client.setOption(SocketOptionLevel.SOCKET, 
SocketOption.REUSEADDR, 1);
                clients ~= client;
            }
        }
        _listener.shutdown(SocketShutdown.RECEIVE);
        _listener.close();
    }

    void closeClient(Socket socket) {
        debug writeln(socket.handle(), " closing");
        socket.shutdown(SocketShutdown.BOTH);
        socket.close();
    }

    void endRequest(
            RequestId requestId,
            int applicationStatus,
            ProtocolStatus protocolStatus)
    {
        auto socket = _active;
        assert(socket);
        debug writeln(socket.handle(), " ending request ", requestId, " app: ", 
applicationStatus, " prot: ", protocolStatus);

        auto head = RecordHead(RecordType.endRequest, requestId, 
EndRequestBody.sizeof);
        socket.send(cast(ubyte[RecordHead.sizeof])head);
        auto endRequest = EndRequestBody(applicationStatus, protocolStatus);
        socket.send(cast(ubyte[EndRequestBody.sizeof])endRequest);
    }

    Request readRequest(Socket _socket) {
        static Request[RequestId] partialRequests;
        Request request;
        RecordHead head;
        static ubyte[RecordHead.sizeof] headBuf = void;
        static ubyte[ushort.max] content = void;
        bool readingStream;
        do {
            // Receive header
            assert(_socket.isAlive);
            debug writeln(_socket.handle(), " receive record");
            auto len = _socket.receive(headBuf);
            if(len == Socket.ERROR || len == 0) {
                debug writeln(_socket.handle(), " finished or error, 
numPartialRequests: ", partialRequests.length);

                return null;
            }
            // Get (partial) request
            head = cast(RecordHead)headBuf;
            debug writeln(_socket.handle(), " ", head);
            auto existing = head.requestId in partialRequests;
            if(!existing || head.type == RecordType.beginRequest) {
                partialRequests[head.requestId] =
                    request = new Request(head.requestId, this, &endRequest);
            } else {
                request = *existing;
            }

            // Read request data
            if(head.contentLength) {
                len = _socket.receive(content[0..head.contentLength]);
                assert(len == head.contentLength);
            }

            // Handle Record
            final switch(head.type) {
                case RecordType.beginRequest:
                    debug writeln(_socket.handle(), " BEGINREQUEST");
                    auto beginRequest = toStruct!(BeginRequestBody)(content);
                    request.role = beginRequest.role;
                    request.socketOwner = beginRequest.keepConnection ?
                        SocketLifetimeOwner.server :
                        SocketLifetimeOwner.application;
                    debug writeln(_socket.handle(), " Request: ", 
request.requestId, " role: ", request.role, " owner: ", request.socketOwner);
                    break;
                case RecordType.abortRequest:
                    writeln("ABORTREQUEST!");
                    assert(false);
                    break;
                case RecordType.endRequest:
                    writeln("ENDREQUEST!");
                    assert(false);
                    break;
                case RecordType.params:
                    debug writeln(_socket.handle(), " PARAMS");
                    if(head.contentLength) {
                        assert(!request.params);
                        debug writeln(_socket.handle(), "  Parsing params");
                        request.params = new 
Params(content[0..head.contentLength].dup);
                    } else {
                        // params finished
                        debug writeln(_socket.handle(), "  -> DONE");
                    }
                    break;
                case RecordType.stdIn:
                    if(head.contentLength) {
                        readingStream = true;
                        // TODO: use some appender
                        request._input ~= content[0..head.contentLength];
                        debug writeln(_socket.handle(), " STDIN! LEN:", 
head.contentLength, " - ", cast(char[])request._input);
                    } else {
                        readingStream = false;
                        debug writeln(_socket.handle(), " STDIN DONE! - ", 
cast(char[])request._input);
                    }
                    break;
                case RecordType.stdOut:
                    writeln("STDOUT!");
                    assert(false);
                    break;
                case RecordType.stdErr:
                    writeln("STDERR!");
                    assert(false);
                    break;
                case RecordType.data:
                    writeln("DATA!");
                    assert(false);
                    break;
                case RecordType.getValues:
                    writeln("GETVALUES!");
                    assert(false);
                    break;
                case RecordType.getValuesResult:
                    writeln("GETVALUESRESULT!");
                    assert(false);
                    break;
                case RecordType.unknownType:
                    writeln("UNKNOWNTYPE!");
                    assert(false);
                    break;
            }

            if(head.paddingLength) {
                len = _socket.receive(content[0..head.paddingLength]);
                assert(len == head.paddingLength);
                assert(len <= ubyte.max);
            }
        } while(!(head.type == RecordType.stdIn && head.contentLength == 0))
        debug writeln(_socket.handle(), " finished reading to stream");
        assert(request);
        partialRequests.remove(request.id);
        return request;
    }
}

void onRequest(Request request) {
    debug writeln("request: ", request.requestId, " creating response");
    //auto content = "<html><body>request: 
"~to!string(cast(string[string])request.params._params)~"</body></html>";
    auto content = "<html><body>Hello World</body></html>";
    auto http = "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\nContent-Type: 
text/html; 
charset=utf-8\r\nContent-Length:"~to!string(content.length)~"\r\n\r\n";
    request.write(http);
    request.write(content);
    request.end();
}

void main(string[] args) {
    auto listener = new ListenThread("localhost", to!ushort(args[1]), (Request 
r){ onRequest(r);});
    listener.start();
}


/********************************************************************************
 * FastCGI Protocol
 * http://www.fastcgi.com/drupal/node/6?q=node/22
 
********************************************************************************/

enum ubyte FastCGIVersion = 1;
enum RequestId FastCGIManagementRequestId = 0;

auto pack(A...)(in A arr) pure nothrow @safe
{
    enum requiredSize = A.length * A[0].sizeof;
    static assert(requiredSize <= long.sizeof);
    enum signed = is(A[0] == byte) || is(A[0] == short) ||
        is(A[0] == int) || is(A[0] == long);

    static if(requiredSize == 8 && signed)
        alias long R;
    else static if(requiredSize == 8 && !signed)
        alias ulong R;
    else static if(requiredSize == 4 && signed)
        alias int R;
    else static if(requiredSize == 4 && !signed)
        alias uint R;
    else static if(requiredSize == 2 && signed)
        alias short R;
    else static if(requiredSize == 2 && !signed)
        alias ushort R;
    else static if(requiredSize == 1 && signed)
        alias byte R;
    else static if(requiredSize == 1 && !signed)
        alias ubyte R;
    else
        static assert(false);

    R result;
    void add(int N)() {
        result |= arr[N] << ((A.length-N-1)*8);
        static if(N < A.length-1)
            add!(N+1)();
    }
    add!(0)();
    return result;
}

auto unpack(T)(const T v) pure nothrow @safe {
    enum N = T.sizeof/ubyte.sizeof;
    ubyte[N] result = void;
    void add(int I)() {
        result[I] = (v >> (I*8)) & 0xFF;
        static if(I < N-1)
            add!(I+1)();
    }
    add!(0)();
    return result;
}

enum RecordType : ubyte {
    beginRequest    = 1,
    abortRequest    = 2,
    endRequest      = 3,
    params          = 4,
    stdIn           = 5,
    stdOut          = 6,
    stdErr          = 7,
    data            = 8,
    getValues       = 9,
    getValuesResult = 10,
    unknownType     = 11,
}

alias ushort RequestId;

struct RecordHead {
    ubyte       protocolVersion = FastCGIVersion;
    RecordType  type;
    ubyte       requestIdB1;
    ubyte       requestIdB0;
    ubyte       contentLengthB1;
    ubyte       contentLengthB0;
    ubyte       paddingLength;
    ubyte       reserved;

    this(RecordType type, RequestId requestId, ushort contentLength) pure 
nothrow @safe {
        this.type               = type;
        this.requestId          = requestId;
        this.contentLength      = contentLength;
        this.protocolVersion    = FastCGIVersion;
    }

    pure @safe const nothrow invariant() {
        assert(protocolVersion == FastCGIVersion);
    }

    @property RequestId requestId() const pure nothrow @safe {
        return pack(requestIdB1, requestIdB0);
    }

    @property void requestId(RequestId id) pure nothrow @safe {
        auto unpacked   = unpack(id);
        requestIdB1     = unpacked[1];
        requestIdB0     = unpacked[0];
    }

    @property ushort contentLength() const pure nothrow @safe {
        return pack(contentLengthB1, contentLengthB0);
    }

    @property void contentLength(ushort len) pure nothrow @safe {
        auto unpacked   = unpack(len);
        contentLengthB1 = unpacked[1];
        contentLengthB0 = unpacked[0];
    }

    @property bool isManagementRecord() const pure nothrow @safe {
        return requestId == FastCGIManagementRequestId;
    }

    @property bool isApplicationRecord() const pure nothrow @safe {
        return !isManagementRecord;
    }
}

enum Role : short {
    responder   = 1,
    authorizer  = 2,
    filter      = 3
}

enum Flags {
    keepConnection = 1
}

struct BeginRequestBody {
    ubyte       roleB1;
    ubyte       roleB0;
    ubyte       flags;
    ubyte[5]    reserved;

    @property Role role() const pure nothrow @safe {
        return cast(Role)pack(roleB1, roleB0);
    }

    @property void role(Role role) pure nothrow @safe {
        auto unpacked   = unpack(role);
        roleB1          = unpacked[1];
        roleB0          = unpacked[0];
    }

    @property bool keepConnection() const pure nothrow @safe {
        return (flags & Flags.keepConnection) != 0;
    }
}

enum ProtocolStatus : ubyte {
    requestComplete             = 0,
    cannotMultiplexConnection   = 1,
    overloaded                  = 2,
    unknownRole                 = 3,
}

struct EndRequestBody {
    ubyte           appStatusB3;
    ubyte           appStatusB2;
    ubyte           appStatusB1;
    ubyte           appStatusB0;
    ProtocolStatus  protocolStatus;
    ubyte[3]        reserved;

    this(int appStatus, ProtocolStatus status) pure nothrow @safe {
        this.appStatus      = appStatus;
        this.protocolStatus = status;
    }

    @property int appStatus() const pure nothrow @safe {
        return pack(appStatusB3, appStatusB2, appStatusB1, appStatusB0);
    }

    @property void appStatus(int status) pure nothrow @safe {
        auto unpacked   = unpack(status);
        appStatusB3     = unpacked[3];
        appStatusB2     = unpacked[2];
        appStatusB1     = unpacked[1];
        appStatusB0     = unpacked[0];
    }
}

struct UnknownTypeBody {
    ubyte       type;
    ubyte[7]    reserved;
}

Reply via email to