http://git-wip-us.apache.org/repos/asf/trafficserver/blob/abe55a68/plugins/experimental/spdy/spdy.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/spdy/spdy.cc 
b/plugins/experimental/spdy/spdy.cc
new file mode 100644
index 0000000..ce91ee8
--- /dev/null
+++ b/plugins/experimental/spdy/spdy.cc
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ts/ts.h>
+#include <spdy/spdy.h>
+#include <base/logging.h>
+
+#include "io.h"
+#include "http.h"
+#include "protocol.h"
+
+#include <getopt.h>
+#include <limits>
+
+static bool use_system_resolver = false;
+
+static int spdy_vconn_io(TSCont, TSEvent, void *);
+
+static void
+recv_rst_stream(
+        const spdy::message_header& header,
+        spdy_io_control *           io,
+        const uint8_t __restrict *  ptr)
+{
+    spdy::rst_stream_message rst;
+
+    rst = spdy::rst_stream_message::parse(ptr, header.datalen);
+
+    debug_protocol("[%p/%u] received %s frame stream=%u status_code=%s (%u)",
+            io, rst.stream_id,
+            cstringof(header.control.type), rst.stream_id,
+            cstringof((spdy::error)rst.status_code), rst.status_code);
+
+    io->destroy_stream(rst.stream_id);
+}
+
+static void
+recv_syn_stream(
+        const spdy::message_header& header,
+        spdy_io_control *           io,
+        const uint8_t __restrict *  ptr)
+{
+    spdy::syn_stream_message    syn;
+    spdy_io_stream *            stream;
+
+    syn = spdy::syn_stream_message::parse(ptr, header.datalen);
+
+    debug_protocol(
+            "[%p/%u] received %s frame stream=%u associated=%u priority=%u",
+            io, syn.stream_id,
+            cstringof(header.control.type), syn.stream_id,
+            syn.associated_id, syn.priority);
+
+    if (!io->valid_client_stream_id(syn.stream_id)) {
+        debug_protocol("[%p/%u] invalid stream-id %u",
+                io, syn.stream_id, syn.stream_id);
+        spdy_send_reset_stream(io, syn.stream_id, spdy::PROTOCOL_ERROR);
+        return;
+    }
+
+    switch (header.control.version) {
+    case spdy::PROTOCOL_VERSION_2: // fallthru
+    case spdy::PROTOCOL_VERSION_3: break;
+    default:
+        debug_protocol("[%p/%u] bad protocol version %d",
+                io, syn.stream_id, header.control.version);
+        spdy_send_reset_stream(io, syn.stream_id, spdy::PROTOCOL_ERROR);
+        return;
+    }
+
+    spdy::key_value_block kvblock(
+            spdy::key_value_block::parse(
+                    (spdy::protocol_version)header.control.version,
+                    io->decompressor,
+                    ptr + spdy::syn_stream_message::size,
+                    header.datalen - spdy::syn_stream_message::size)
+    );
+
+    if ((stream = io->create_stream(syn.stream_id)) == 0) {
+        debug_protocol("[%p/%u] failed to create stream %u",
+                io, syn.stream_id, syn.stream_id);
+        spdy_send_reset_stream(io, syn.stream_id, spdy::INVALID_STREAM);
+        return;
+    }
+
+    stream->io = io;
+    stream->version = (spdy::protocol_version)header.control.version;
+
+    if (!kvblock.url().is_complete()) {
+        debug_protocol("[%p/%u] incomplete URL", io, stream->stream_id);
+        // 3.2.1; missing URL, protocol error; 400 Bad Request
+        http_send_error(stream, TS_HTTP_STATUS_BAD_REQUEST);
+        spdy_send_reset_stream(io, stream->stream_id, spdy::CANCEL);
+        io->destroy_stream(stream->stream_id);
+        return;
+    }
+
+    spdy_io_stream::open_options options = spdy_io_stream::open_none;
+    if (use_system_resolver) {
+        options = spdy_io_stream::open_with_system_resolver;
+    }
+
+    std::lock_guard<spdy_io_stream::lock_type> lk(stream->lock);
+    if (!stream->open(kvblock, options)) {
+        io->destroy_stream(stream->stream_id);
+    }
+}
+
+static void
+recv_ping(
+        const spdy::message_header& header,
+        spdy_io_control *           io,
+        const uint8_t __restrict *  ptr)
+{
+    spdy::ping_message ping;
+
+    ping = spdy::ping_message::parse(ptr, header.datalen);
+
+    debug_protocol("[%p] received PING id=%u", io, ping.ping_id);
+
+    // Client must send even ping-ids. Ignore the odd ones since
+    // we never send them.
+    if ((ping.ping_id % 2) == 0) {
+        return;
+    }
+
+    spdy_send_ping(io, (spdy::protocol_version)header.control.version, 
ping.ping_id);
+}
+
+static void
+dispatch_spdy_control_frame(
+        const spdy::message_header& header,
+        spdy_io_control *           io,
+        const uint8_t __restrict *  ptr)
+{
+    switch (header.control.type) {
+    case spdy::CONTROL_SYN_STREAM:
+        recv_syn_stream(header, io, ptr);
+        break;
+    case spdy::CONTROL_SYN_REPLY:
+    case spdy::CONTROL_RST_STREAM:
+        recv_rst_stream(header, io, ptr);
+        break;
+    case spdy::CONTROL_PING:
+        recv_ping(header, io, ptr);
+        break;
+    case spdy::CONTROL_SETTINGS:
+    case spdy::CONTROL_GOAWAY:
+    case spdy::CONTROL_HEADERS:
+    case spdy::CONTROL_WINDOW_UPDATE:
+        debug_protocol(
+            "[%p] SPDY control frame, version=%u type=%s flags=0x%x, %u bytes",
+            io, header.control.version, cstringof(header.control.type),
+            header.flags, header.datalen);
+        break;
+    default:
+        // SPDY 2.2.1 - MUST ignore unrecognized control frames
+        TSError("[spdy] ignoring invalid control frame type %u", 
header.control.type);
+    }
+
+    io->reenable();
+}
+
+static size_t
+count_bytes_available(
+        TSIOBuffer          buffer,
+        TSIOBufferReader    reader)
+{
+    TSIOBufferBlock block;
+    size_t count = 0;
+
+    block = TSIOBufferStart(buffer);
+    while (block) {
+        const char * ptr;
+        int64_t nbytes;
+
+        ptr = TSIOBufferBlockReadStart(block, reader, &nbytes);
+        if (ptr && nbytes) {
+            count += nbytes;
+        }
+
+        block = TSIOBufferBlockNext(block);
+    }
+
+    return count;
+}
+
+static void
+consume_spdy_frame(spdy_io_control * io)
+{
+    spdy::message_header    header;
+    TSIOBufferBlock         blk;
+    const uint8_t *         ptr;
+    int64_t                 nbytes;
+
+next_frame:
+
+    blk = TSIOBufferReaderStart(io->input.reader);
+    ptr = (const uint8_t *)TSIOBufferBlockReadStart(blk, io->input.reader, 
&nbytes);
+    if (!ptr) {
+        // This should not fail because we only try to consume the header when
+        // there are enough bytes to read the header. Experimentally, however,
+        // it does fail. I wonder why.
+        TSError("TSIOBufferBlockReadStart failed unexpectedly");
+        return;
+    }
+
+    if (nbytes < spdy::message_header::size) {
+        // We should never get here, because we check for space before
+        // entering. Unfortunately this does happen :(
+        debug_plugin("short read %" PRId64 " bytes, expected at least %u, real 
count %zu",
+                nbytes, spdy::message_header::size,
+                count_bytes_available(io->input.buffer, io->input.reader));
+        return;
+    }
+
+    header = spdy::message_header::parse(ptr, (size_t)nbytes);
+    TSAssert(header.datalen > 0); // XXX
+
+    if (header.is_control) {
+        if (header.control.version != spdy::PROTOCOL_VERSION) {
+            TSError("[spdy] client is version %u, but we implement version %u",
+                header.control.version, spdy::PROTOCOL_VERSION);
+        }
+    } else {
+        debug_protocol("[%p] SPDY data frame, stream=%u flags=0x%x, %u bytes",
+            io, header.data.stream_id, header.flags, header.datalen);
+    }
+
+    if (header.datalen >= spdy::MAX_FRAME_LENGTH) {
+        // XXX puke
+    }
+
+    if (header.datalen <= (nbytes - spdy::message_header::size)) {
+        // We have all the data in-hand ... parse it.
+        io->input.consume(spdy::message_header::size);
+        io->input.consume(header.datalen);
+
+        ptr += spdy::message_header::size;
+
+        if (header.is_control) {
+            dispatch_spdy_control_frame(header, io, ptr);
+        } else {
+            TSError("[spdy] no data frame support yet");
+        }
+
+        if (TSIOBufferReaderAvail(io->input.reader) >= 
spdy::message_header::size) {
+            goto next_frame;
+        }
+    }
+
+    // Push the high water mark to the end of the frame so that we don't get
+    // called back until we have the whole thing.
+    io->input.watermark(spdy::message_header::size + header.datalen);
+}
+
+static int
+spdy_vconn_io(TSCont contp, TSEvent ev, void * edata)
+{
+    TSVIO               vio = (TSVIO)edata;
+    int                 nbytes;
+    spdy_io_control *   io;
+
+    (void)vio;
+
+    // Experimentally, we recieve the read or write TSVIO pointer as the
+    // callback data.
+    //debug_plugin("received IO event %s, VIO=%p", cstringof(ev), vio);
+
+    switch (ev) {
+    case TS_EVENT_VCONN_READ_READY:
+    case TS_EVENT_VCONN_READ_COMPLETE:
+        io = spdy_io_control::get(contp);
+        nbytes = TSIOBufferReaderAvail(io->input.reader);
+        debug_plugin("received %d bytes", nbytes);
+        if ((unsigned)nbytes >= spdy::message_header::size) {
+            consume_spdy_frame(io);
+        }
+
+        // XXX frame parsing can throw. If it does, best to catch it, log it
+        // and drop the connection.
+        break;
+    case TS_EVENT_VCONN_WRITE_READY:
+    case TS_EVENT_VCONN_WRITE_COMPLETE:
+        // No need to handle write events. We have already pushed all the data
+        // we have into the write buffer.
+        break;
+    case TS_EVENT_VCONN_EOS: // fallthru
+    default:
+        if (ev != TS_EVENT_VCONN_EOS) {
+            debug_plugin("unexpected accept event %s", cstringof(ev));
+        }
+        io = spdy_io_control::get(contp);
+        TSVConnClose(io->vconn);
+        release(io);
+    }
+
+    return TS_EVENT_NONE;
+}
+
+static int
+spdy_accept_io(TSCont contp, TSEvent ev, void * edata)
+{
+    TSVConn             vconn = (TSVConn)edata;;
+    spdy_io_control *   io = nullptr;
+
+    TSVIO read_vio, write_vio;
+
+    switch (ev) {
+    case TS_EVENT_NET_ACCEPT:
+        io = retain(new spdy_io_control(vconn));
+        io->input.watermark(spdy::message_header::size);
+        io->output.watermark(spdy::message_header::size);
+        // XXX is contp leaked here?
+        contp = TSContCreate(spdy_vconn_io, TSMutexCreate());
+        TSContDataSet(contp, io);
+        read_vio = TSVConnRead(vconn, contp, io->input.buffer, 
std::numeric_limits<int64_t>::max());
+        write_vio = TSVConnWrite(vconn, contp, io->output.reader, 
std::numeric_limits<int64_t>::max());
+        debug_protocol("accepted new SPDY session %p", io);
+        break;
+    default:
+        debug_plugin("unexpected accept event %s", cstringof(ev));
+    }
+
+    return TS_EVENT_NONE;
+}
+
+extern "C" void
+TSPluginInit(int argc, const char * argv[])
+{
+    static const struct option longopts[] = {
+        { "system-resolver", no_argument, NULL, 's' },
+        { NULL, 0, NULL, 0 }
+    };
+
+    TSPluginRegistrationInfo info;
+
+    info.plugin_name = (char *)"spdy";
+    info.vendor_name = (char *)"James Peach";
+    info.support_email = (char *)"[email protected]";
+
+    if (TSPluginRegister(TS_SDK_VERSION_3_0, &info) != TS_SUCCESS) {
+        TSError("[spdy] Plugin registration failed");
+    }
+
+    debug_plugin("initializing");
+
+    for (;;) {
+        switch (getopt_long(argc, (char * const *)argv, "s", longopts, NULL)) {
+        case 's':
+            use_system_resolver = true;
+            break;
+        case -1:
+            goto init;
+        default:
+            TSError("[spdy] usage: spdy.so [--system-resolver]");
+        }
+    }
+
+init:
+    TSReleaseAssert(
+        TSNetAcceptNamedProtocol(TSContCreate(spdy_accept_io, TSMutexCreate()),
+        TS_NPN_PROTOCOL_SPDY_2) == TS_SUCCESS);
+
+    debug_plugin("registered named protocol endpoint for %s",
+            TS_NPN_PROTOCOL_SPDY_2);
+}
+
+/* vim: set sw=4 tw=79 ts=4 et ai : */

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/abe55a68/plugins/experimental/spdy/stream.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/spdy/stream.cc 
b/plugins/experimental/spdy/stream.cc
new file mode 100644
index 0000000..b7eb4a4
--- /dev/null
+++ b/plugins/experimental/spdy/stream.cc
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ts/ts.h>
+#include <spdy/spdy.h>
+#include <base/logging.h>
+#include <base/inet.h>
+#include "io.h"
+#include "protocol.h"
+#include "http.h"
+
+#include <netdb.h>
+#include <limits>
+
+// NOTE: Reference counting SPDY streams.
+//
+// First, the primary protocol handler owns a reference count on each
+// SPDY stream. However, this reference count can be released at almost
+// any time when a RST request is received. So when we submit a request
+// that for any asynchronous processing, we must hold an additional
+// reference count in order to keep the stream alive until the request
+// completes.
+//
+// Second, each stream keeps a reference to the SPDY IO control block
+// which has an independent lifetime. This means that when we keep the
+// stream alive (by taking a reference count), we also need to take a
+// reference on the control block.
+
+static int spdy_stream_io(TSCont, TSEvent, void *);
+
+static bool
+IN(const spdy_io_stream * s, spdy_io_stream::http_state_type h)
+{
+    return s->http_state & h;
+}
+
+static void
+ENTER(spdy_io_stream * s, spdy_io_stream::http_state_type h)
+{
+    s->http_state |= h;
+}
+
+static void
+LEAVE(spdy_io_stream * s, spdy_io_stream::http_state_type h)
+{
+    s->http_state &= ~h;
+}
+
+static bool
+initiate_client_request(
+        spdy_io_stream *        stream,
+        const struct sockaddr * addr,
+        TSCont                  contp)
+{
+    TSReleaseAssert(stream->vconn == nullptr);
+
+    stream->vconn = TSHttpConnect(addr);
+    if (stream->vconn) {
+        TSVConnRead(stream->vconn, contp, stream->input.buffer, 
std::numeric_limits<int64_t>::max());
+        TSVConnWrite(stream->vconn, contp, stream->output.reader, 
std::numeric_limits<int64_t>::max());
+    }
+
+    return true;
+}
+
+static bool
+write_http_request(spdy_io_stream * stream)
+{
+    spdy_io_buffer      iobuf;
+    scoped_mbuffer      buffer;
+    scoped_http_header  header(buffer.get(), stream->kvblock);
+    int64_t             nwritten = 0;
+
+    if (!header) {
+        return false;
+    }
+
+    debug_http_header(stream, buffer.get(), header);
+
+    // XXX Surely there's a better way to send the HTTP headers than forcing
+    // ATS to reparse what we already have in pre-parsed form?
+    TSHttpHdrPrint(buffer.get(), header, iobuf.buffer);
+
+    TSIOBufferBlock blk = TSIOBufferReaderStart(iobuf.reader);
+    while (blk) {
+        const char *    ptr;
+        int64_t         nbytes;
+
+        ptr = TSIOBufferBlockReadStart(blk, iobuf.reader, &nbytes);
+        if (ptr == nullptr || nbytes == 0) {
+            goto next;
+        }
+
+        nwritten += TSIOBufferWrite(stream->output.buffer, ptr, nbytes);
+
+next:
+        blk = TSIOBufferBlockNext(blk);
+    }
+
+    // XXX is this needed?
+    TSIOBufferProduce(stream->output.buffer, nwritten);
+    return true;
+}
+
+static bool
+read_http_headers(spdy_io_stream * stream)
+{
+    if (TSIsDebugTagSet("spdy.http")) {
+        debug_http("[%p/%u] received %" PRId64 " header bytes",
+                stream, stream->stream_id,
+                TSIOBufferReaderAvail(stream->input.reader));
+    }
+
+    if (stream->hparser.parse(stream->input.reader) < 0) {
+        // XXX handle header parsing error
+        return false;
+    }
+
+    return true;
+}
+
+static int
+spdy_stream_io(TSCont contp, TSEvent ev, void * edata)
+{
+    union {
+        TSHostLookupResult dns;
+        TSVIO vio;
+    } context;
+
+    spdy_io_stream * stream = spdy_io_stream::get(contp);
+
+    debug_http("[%p/%u] received %s event",
+            stream, stream->stream_id, cstringof(ev));
+
+    if (!stream->is_open()) {
+        debug_protocol("[%p/%u] received %s on closed stream",
+                stream->io, stream->stream_id, cstringof(ev));
+        release(stream->io);
+        release(stream);
+        return TS_EVENT_NONE;
+    }
+
+    std::lock_guard<spdy_io_stream::lock_type> lk(stream->lock);
+
+    switch (ev) {
+    case TS_EVENT_HOST_LOOKUP:
+        context.dns = (TSHostLookupResult)edata;
+        stream->action = nullptr;
+
+        if (context.dns) {
+            inet_address addr(TSHostLookupResultAddrGet(context.dns));
+            debug_http("[%p/%u] resolved %s => %s",
+                    stream->io, stream->stream_id,
+                    stream->kvblock.url().hostport.c_str(), cstringof(addr));
+            addr.port() = htons(80); // XXX should be parsed from hostport
+            if (initiate_client_request(stream, addr.saddr(), contp)) {
+                ENTER(stream, spdy_io_stream::http_send_headers);
+                retain(stream);
+                retain(stream->io);
+            }
+
+        } else {
+            // Experimentally, if the DNS lookup fails, web proxies return 502
+            // Bad Gateway.
+            http_send_error(stream, TS_HTTP_STATUS_BAD_GATEWAY);
+        }
+
+        release(stream->io);
+        release(stream);
+        return TS_EVENT_NONE;
+
+    case TS_EVENT_VCONN_WRITE_READY:
+        context.vio = (TSVIO)edata;
+
+        if (IN(stream, spdy_io_stream::http_send_headers)) {
+            // The output VIO is ready. Write the HTTP request to the origin
+            // server and kick the VIO to send it.
+            if (write_http_request(stream)) {
+                TSVIOReenable(context.vio);
+                LEAVE(stream, spdy_io_stream::http_send_headers);
+                ENTER(stream, spdy_io_stream::http_receive_headers);
+            }
+        }
+
+        return TS_EVENT_NONE;
+
+    case TS_EVENT_VCONN_WRITE_COMPLETE:
+        debug_http("ignoring %s event", cstringof(ev));
+        return TS_EVENT_NONE;
+
+    case TS_EVENT_VCONN_READ_READY:
+    case TS_EVENT_VCONN_READ_COMPLETE:
+    case TS_EVENT_VCONN_EOS:
+        context.vio = (TSVIO)edata;
+
+        if (IN(stream, spdy_io_stream::http_receive_headers)) {
+            if (read_http_headers(stream)) {
+                LEAVE(stream, spdy_io_stream::http_receive_headers);
+                ENTER(stream, spdy_io_stream::http_send_headers);
+                ENTER(stream, spdy_io_stream::http_receive_content);
+            }
+        }
+
+        // Parsing the headers might have completed and had more data left
+        // over. If there's any data still buffered we can push it out now.
+        if (IN(stream, spdy_io_stream::http_send_headers)) {
+            http_send_response(stream, stream->hparser.mbuffer.get(),
+                        stream->hparser.header.get());
+            LEAVE(stream, spdy_io_stream::http_send_headers);
+        }
+
+        if (IN(stream, spdy_io_stream::http_receive_content)) {
+            http_send_content(stream, stream->input.reader);
+        }
+
+        if (ev == TS_EVENT_VCONN_EOS || ev == TS_EVENT_VCONN_READ_COMPLETE) {
+            stream->http_state = spdy_io_stream::http_closed;
+            spdy_send_data_frame(stream, spdy::FLAG_FIN, nullptr, 0);
+            TSVConnClose(stream->vconn);
+        }
+
+        // Kick the IO control block write VIO to make it send the
+        // SPDY frames we spooled.
+        stream->io->reenable();
+
+        if (IN(stream, spdy_io_stream::http_closed)) {
+            stream->close();
+        }
+
+        return TS_EVENT_NONE;
+
+    default:
+        debug_plugin("unexpected stream event %s", cstringof(ev));
+    }
+
+    return TS_EVENT_NONE;
+}
+
+static bool
+block_and_resolve_host(
+        spdy_io_stream * stream,
+        const std::string& hostport)
+{
+    int error;
+    struct addrinfo * res0 = NULL;
+
+    // XXX split the host and port and stash the port in the resulting sockaddr
+    error = getaddrinfo(hostport.c_str(), "80", NULL, &res0);
+    if (error != 0) {
+        debug_http("failed to resolve hostname '%s', %s",
+                hostport.c_str(), gai_strerror(error));
+        http_send_error(stream, TS_HTTP_STATUS_BAD_GATEWAY);
+        return false;
+    }
+
+    inet_address addr(res0->ai_addr);
+
+    freeaddrinfo(res0);
+
+    debug_http("[%p/%u] resolved %s => %s",
+            stream, stream->stream_id,
+            hostport.c_str(), cstringof(addr));
+    addr.port() = htons(80); // XXX should be parsed from hostport
+
+    if (initiate_client_request(stream, addr.saddr(), stream->continuation)) {
+        ENTER(stream, spdy_io_stream::http_send_headers);
+        return true;
+    }
+
+    return false;
+}
+
+static bool
+initiate_host_resolution(
+        spdy_io_stream * stream,
+        const std::string& hostport)
+{
+    // XXX split the host and port and stash the port in the resulting sockaddr
+    stream->action = TSHostLookup(stream->continuation, hostport.c_str(), 
hostport.size());
+    if (TSActionDone(stream->action)) {
+        stream->action = NULL;
+    }
+
+    debug_http("resolving hostname '%s'", hostport.c_str());
+    return true;
+}
+
+spdy_io_stream::spdy_io_stream(unsigned s)
+    : stream_id(s), http_state(0), action(nullptr), vconn(nullptr),
+    continuation(nullptr), kvblock(), io(nullptr),
+    input(), output(), hparser()
+{
+    this->continuation = TSContCreate(spdy_stream_io, TSMutexCreate());
+    TSContDataSet(this->continuation, this);
+}
+
+spdy_io_stream::~spdy_io_stream()
+{
+    TSReleaseAssert(this->action == nullptr);
+    TSReleaseAssert(this->vconn == nullptr);
+
+    if (this->continuation) {
+        TSContDestroy(this->continuation);
+    }
+}
+
+void
+spdy_io_stream::close()
+{
+    if (this->action) {
+        TSActionCancel(this->action);
+        this->action = nullptr;
+    }
+
+    if (this->vconn) {
+        TSVConnClose(this->vconn);
+        this->action = nullptr;
+    }
+
+    this->http_state = http_closed;
+}
+
+bool
+spdy_io_stream::open(
+        spdy::key_value_block& kv,
+        open_options options)
+{
+    TSReleaseAssert(this->io != nullptr);
+
+    if (this->is_closed()) {
+        this->kvblock = kv;
+
+        retain(this);
+        retain(this->io);
+
+        ENTER(this, spdy_io_stream::http_resolve_host);
+        bool success = (options & open_with_system_resolver)
+            ? block_and_resolve_host(this, kvblock.url().hostport)
+            : initiate_host_resolution(this, kvblock.url().hostport);
+
+        if (!success) {
+            release(this);
+            release(this->io);
+        }
+
+        // On the success path, the resulting continuation callback will
+        // release the refcount we are holding.
+
+        return success;
+    }
+
+    return false;
+}
+
+/* vim: set sw=4 ts=4 tw=79 et : */

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/abe55a68/plugins/experimental/spdy/strings.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/spdy/strings.cc 
b/plugins/experimental/spdy/strings.cc
new file mode 100644
index 0000000..f45005e
--- /dev/null
+++ b/plugins/experimental/spdy/strings.cc
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ts/ts.h>
+#include <spdy/spdy.h>
+#include <base/logging.h>
+
+template<> std::string
+stringof<TSEvent>(const TSEvent& ev)
+{
+    static const detail::named_value<unsigned> event_names[] =
+    {
+        { "TS_EVENT_NONE", 0 },
+        { "TS_EVENT_IMMEDIATE", 1 },
+        { "TS_EVENT_TIMEOUT", 2 },
+        { "TS_EVENT_ERROR", 3 },
+        { "TS_EVENT_CONTINUE", 4 },
+        { "TS_EVENT_VCONN_READ_READY", 100 },
+        { "TS_EVENT_VCONN_WRITE_READY", 101 },
+        { "TS_EVENT_VCONN_READ_COMPLETE", 102 },
+        { "TS_EVENT_VCONN_WRITE_COMPLETE", 103 },
+        { "TS_EVENT_VCONN_EOS", 104 },
+        { "TS_EVENT_VCONN_INACTIVITY_TIMEOUT", 105 },
+        { "TS_EVENT_NET_CONNECT", 200 },
+        { "TS_EVENT_NET_CONNECT_FAILED", 201 },
+        { "TS_EVENT_NET_ACCEPT", 202 },
+        { "TS_EVENT_NET_ACCEPT_FAILED", 204 },
+        { "TS_EVENT_INTERNAL_206", 206 },
+        { "TS_EVENT_INTERNAL_207", 207 },
+        { "TS_EVENT_INTERNAL_208", 208 },
+        { "TS_EVENT_INTERNAL_209", 209 },
+        { "TS_EVENT_INTERNAL_210", 210 },
+        { "TS_EVENT_INTERNAL_211", 211 },
+        { "TS_EVENT_INTERNAL_212", 212 },
+        { "TS_EVENT_HOST_LOOKUP", 500 },
+        { "TS_EVENT_CACHE_OPEN_READ", 1102 },
+        { "TS_EVENT_CACHE_OPEN_READ_FAILED", 1103 },
+        { "TS_EVENT_CACHE_OPEN_WRITE", 1108 },
+        { "TS_EVENT_CACHE_OPEN_WRITE_FAILED", 1109 },
+        { "TS_EVENT_CACHE_REMOVE", 1112 },
+        { "TS_EVENT_CACHE_REMOVE_FAILED", 1113 },
+        { "TS_EVENT_CACHE_SCAN", 1120 },
+        { "TS_EVENT_CACHE_SCAN_FAILED", 1121 },
+        { "TS_EVENT_CACHE_SCAN_OBJECT", 1122 },
+        { "TS_EVENT_CACHE_SCAN_OPERATION_BLOCKED", 1123 },
+        { "TS_EVENT_CACHE_SCAN_OPERATION_FAILED", 1124 },
+        { "TS_EVENT_CACHE_SCAN_DONE", 1125 },
+        { "TS_EVENT_CACHE_LOOKUP", 1126 },
+        { "TS_EVENT_CACHE_READ", 1127 },
+        { "TS_EVENT_CACHE_DELETE", 1128 },
+        { "TS_EVENT_CACHE_WRITE", 1129 },
+        { "TS_EVENT_CACHE_WRITE_HEADER", 1130 },
+        { "TS_EVENT_CACHE_CLOSE", 1131 },
+        { "TS_EVENT_CACHE_LOOKUP_READY", 1132 },
+        { "TS_EVENT_CACHE_LOOKUP_COMPLETE", 1133 },
+        { "TS_EVENT_CACHE_READ_READY", 1134 },
+        { "TS_EVENT_CACHE_READ_COMPLETE", 1135 },
+        { "TS_EVENT_INTERNAL_1200", 1200 },
+        { "TS_AIO_EVENT_DONE", 3900 },
+        { "TS_EVENT_HTTP_CONTINUE", 60000 },
+        { "TS_EVENT_HTTP_ERROR", 60001 },
+        { "TS_EVENT_HTTP_READ_REQUEST_HDR", 60002 },
+        { "TS_EVENT_HTTP_OS_DNS", 60003 },
+        { "TS_EVENT_HTTP_SEND_REQUEST_HDR", 60004 },
+        { "TS_EVENT_HTTP_READ_CACHE_HDR", 60005 },
+        { "TS_EVENT_HTTP_READ_RESPONSE_HDR", 60006 },
+        { "TS_EVENT_HTTP_SEND_RESPONSE_HDR", 60007 },
+        { "TS_EVENT_HTTP_REQUEST_TRANSFORM", 60008 },
+        { "TS_EVENT_HTTP_RESPONSE_TRANSFORM", 60009 },
+        { "TS_EVENT_HTTP_SELECT_ALT", 60010 },
+        { "TS_EVENT_HTTP_TXN_START", 60011 },
+        { "TS_EVENT_HTTP_TXN_CLOSE", 60012 },
+        { "TS_EVENT_HTTP_SSN_START", 60013 },
+        { "TS_EVENT_HTTP_SSN_CLOSE", 60014 },
+        { "TS_EVENT_HTTP_CACHE_LOOKUP_COMPLETE", 60015 },
+        { "TS_EVENT_HTTP_PRE_REMAP", 60016 },
+        { "TS_EVENT_HTTP_POST_REMAP", 60017 },
+        { "TS_EVENT_MGMT_UPDATE", 60100 },
+        { "TS_EVENT_INTERNAL_60200", 60200 },
+        { "TS_EVENT_INTERNAL_60201", 60201 },
+        { "TS_EVENT_INTERNAL_60202", 60202 },
+    };
+
+    return detail::match(event_names, (unsigned)ev);
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/abe55a68/plugins/experimental/spdy/tests/stubs.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/spdy/tests/stubs.cc 
b/plugins/experimental/spdy/tests/stubs.cc
new file mode 100644
index 0000000..1bb8c69
--- /dev/null
+++ b/plugins/experimental/spdy/tests/stubs.cc
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdarg.h>
+#include <stdio.h>
+
+extern "C" int
+TSIsDebugTagSet(const char *)
+{
+    return 1;
+}
+
+extern "C" void
+TSDebug(const char * tag, const char * fmt, ...)
+{
+    if (TSIsDebugTagSet(tag)) {
+        va_list args;
+        va_start(args, fmt);
+        vfprintf(stderr, fmt, args);
+        va_end(args);
+    }
+}
+
+/* vim: set sw=4 ts=4 tw=79 et : */

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/abe55a68/plugins/experimental/spdy/tests/zstream_test.cc
----------------------------------------------------------------------
diff --git a/plugins/experimental/spdy/tests/zstream_test.cc 
b/plugins/experimental/spdy/tests/zstream_test.cc
new file mode 100644
index 0000000..1039a9c
--- /dev/null
+++ b/plugins/experimental/spdy/tests/zstream_test.cc
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <spdy/zstream.h>
+#include <spdy/spdy.h>
+#include <assert.h>
+#include <string.h>
+#include <vector>
+#include <array>
+#include <random>
+#include <algorithm>
+
+#define CHUNKSIZE 128
+
+// Test initial state invariants.
+void initstate()
+{
+    spdy::zstream<spdy::compress> zin;
+    spdy::zstream<spdy::decompress> zout;
+
+//    assert(zin.drained());
+//    assert(zout.drained());
+}
+
+// Test basic compress/decompress cycle.
+void roundtrip()
+{
+    ssize_t ret;
+    char text[CHUNKSIZE];
+    char inbuf[CHUNKSIZE];
+    char outbuf[CHUNKSIZE];
+
+    spdy::zstream<spdy::compress> zin;
+    spdy::zstream<spdy::decompress> zout;
+
+    memset(text, 0xaaaaaaaa, sizeof(text));
+    memset(inbuf, 0xaaaaaaaa, sizeof(inbuf));
+
+    memset(outbuf, 0, sizeof(outbuf));
+    zin.input(inbuf, sizeof(inbuf));
+    ret = zin.consume(outbuf, sizeof(outbuf));
+    assert(ret > 0); // no error
+
+    memset(inbuf, 0, sizeof(inbuf));
+    zout.input(outbuf, ret);
+    ret = zout.consume(inbuf, sizeof(inbuf));
+    assert(ret > 0); // no error
+
+    assert(memcmp(text, inbuf, sizeof(inbuf)) == 0);
+}
+
+void shortbuf()
+{
+    ssize_t ret;
+    char outbuf[8];
+    std::minstd_rand0 rand0;
+    spdy::zstream<spdy::compress> zin;
+    std::array<std::minstd_rand0::result_type, CHUNKSIZE> text;
+
+    // random fill so it doesn't compress well
+    std::for_each(text.begin(), text.end(),
+           [&rand0](decltype(text)::value_type& v) { v = rand0(); } );
+
+    zin.input(text.data(), text.size() * sizeof(decltype(text)::value_type));
+    do {
+        ret = zin.consume(outbuf, sizeof(outbuf));
+    } while (ret != 0);
+}
+
+void compress_kvblock()
+{
+    spdy::key_value_block           kvblock;
+    std::vector<uint8_t>            hdrs;
+    std::vector<uint8_t>            check;
+    spdy::zstream<spdy::compress>   compress;
+    spdy::zstream<spdy::decompress> expand;
+    ssize_t nbytes, ret;
+
+    kvblock["key1"] = "value1";
+    kvblock["key2"] = "value2";
+    kvblock["key3"] = "value3";
+    kvblock["key4"] = "value4";
+
+    hdrs.resize(kvblock.nbytes(spdy::PROTOCOL_VERSION_2));
+    nbytes = spdy::key_value_block::marshall(spdy::PROTOCOL_VERSION_2,
+            compress, kvblock, &hdrs[0], hdrs.capacity());
+    hdrs.resize(nbytes);
+
+    nbytes = 0;
+    check.resize(kvblock.nbytes(spdy::PROTOCOL_VERSION_2));
+    expand.input(&hdrs[0], hdrs.size());
+    do {
+        ret = expand.consume(&check[nbytes], check.size() - nbytes);
+        nbytes += ret;
+    } while (ret > 0);
+
+    assert(ret == 0);
+}
+
+void spdy_decompress()
+{
+    const uint8_t pkt[] =
+    {
+        /* SYN_REPLY header
+        0x80, 0x03, 0x00, 0x02, 0x00, 0x00, 0x00, 0xd8,
+        0x00, 0x00, 0x00, 0x01,
+        */
+
+                    0x78, 0x9c, 0x34, 0xcf, 0x41, 0x6b,
+        0xc2, 0x40, 0x10, 0x05, 0xe0, 0x01, 0xd3, 0xe2,
+        0xa1, 0x56, 0xe8, 0xa9, 0x17, 0x61, 0x7f, 0x40,
+        0x37, 0xee, 0x64, 0x89, 0x36, 0x11, 0x0f, 0xc1,
+        0x56, 0x2f, 0xea, 0xa1, 0x49, 0xed, 0x79, 0x93,
+        0x8c, 0x89, 0xa0, 0x1b, 0x49, 0x46, 0x69, 0xfe,
+        0x7d, 0xa5, 0xea, 0xe9, 0xc1, 0xe3, 0xe3, 0xc1,
+        0x83, 0x2e, 0xf4, 0xa2, 0x2c, 0xa3, 0x23, 0xcb,
+        0x2f, 0x63, 0x0b, 0x6a, 0xe0, 0x21, 0x6d, 0xf9,
+        0x12, 0x9d, 0xa8, 0x20, 0xe8, 0x20, 0x6a, 0x78,
+        0x9e, 0x55, 0x96, 0xc9, 0xb2, 0x5c, 0x92, 0x2d,
+        0xb8, 0x04, 0xc7, 0x0b, 0x46, 0x23, 0x78, 0xba,
+        0xb7, 0x49, 0x7b, 0x24, 0x78, 0x65, 0xfa, 0xe5,
+        0x61, 0xc9, 0x87, 0xfd, 0x44, 0x64, 0xa5, 0xa9,
+        0x1b, 0xe2, 0xe9, 0x77, 0x32, 0x97, 0xef, 0xe0,
+        0x7c, 0x18, 0x26, 0x18, 0xfc, 0x50, 0xfe, 0x26,
+        0xb4, 0x12, 0xeb, 0xea, 0x2c, 0x3c, 0x85, 0x28,
+        0x94, 0x1f, 0xaa, 0x20, 0xf4, 0xb5, 0x58, 0xac,
+        0x12, 0xe8, 0x2d, 0x4d, 0xc3, 0x72, 0x55, 0xe5,
+        0xbb, 0xed, 0x8e, 0xf2, 0x9b, 0x56, 0x81, 0x98,
+        0x53, 0x7a, 0xd5, 0x38, 0x0e, 0x51, 0x87, 0xe8,
+        0xff, 0xeb, 0xc7, 0x98, 0xea, 0x33, 0xd5, 0xf0,
+        0x12, 0x25, 0xf1, 0x50, 0xbb, 0xe8, 0xa2, 0x3c,
+        0xd9, 0x86, 0x4d, 0xba, 0x27, 0x70, 0x36, 0xa6,
+        0x6e, 0xa1, 0x7f, 0xbb, 0xf4, 0x69, 0xb3, 0xcb,
+        0xa6, 0x2d, 0xfe, 0x00, 0x00, 0x00, 0xff, 0xff
+    };
+
+
+    char outbuf[16384];
+    ssize_t ret;
+    spdy::zstream<spdy::decompress> zout;
+
+    zout.input(pkt, sizeof(pkt));
+    do {
+        ret = zout.consume(outbuf, sizeof(outbuf));
+    } while (ret > 0);
+
+    assert(ret == 0);
+}
+
+void spdy_headers()
+{
+
+    const uint8_t pkt[] =
+    {
+        /* SYN_STREAM header
+        0x80, 0x02, 0x00, 0x01, 0x01, 0x00, 0x00, 0xde,
+        0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
+        0x80, 0x00,
+        */
+                    0x38, 0xea, 0xdf, 0xa2, 0x51, 0xb2,
+        0x62, 0xe0, 0x60, 0xe0, 0x47, 0xcb, 0x5a, 0x0c,
+        0x82, 0x20, 0x8d, 0x3a, 0x50, 0x9d, 0x3a, 0xc5,
+        0x29, 0xc9, 0x19, 0x0c, 0x7c, 0xa8, 0xc1, 0xcf,
+        0xc0, 0x68, 0xc0, 0xc0, 0x02, 0xca, 0x5c, 0x0c,
+        0x5c, 0x25, 0x19, 0x89, 0x85, 0x45, 0x15, 0x05,
+        0x45, 0x29, 0xf9, 0x0c, 0x6c, 0xb9, 0xc0, 0x0c,
+        0x9d, 0x9f, 0xc2, 0xc0, 0xe2, 0xe1, 0xea, 0xe8,
+        0xc2, 0xc0, 0x56, 0x0c, 0x4c, 0x04, 0xb9, 0xa9,
+        0x40, 0x75, 0x25, 0x25, 0x05, 0x0c, 0xcc, 0x20,
+        0xcb, 0x18, 0xf5, 0x19, 0xb8, 0x10, 0x39, 0x84,
+        0xa1, 0xd4, 0x37, 0xbf, 0x2a, 0x33, 0x27, 0x27,
+        0x51, 0xdf, 0x54, 0xcf, 0x40, 0x41, 0xc3, 0x37,
+        0x31, 0x39, 0x33, 0xaf, 0x24, 0xbf, 0x38, 0xc3,
+        0x5a, 0xc1, 0x13, 0x68, 0x57, 0x8e, 0x02, 0x50,
+        0x40, 0xc1, 0x3f, 0x58, 0x21, 0x42, 0xc1, 0xd0,
+        0x20, 0xde, 0x3c, 0xde, 0x48, 0x53, 0xc1, 0x11,
+        0x18, 0x1c, 0xa9, 0xe1, 0xa9, 0x49, 0xde, 0x99,
+        0x25, 0xfa, 0xa6, 0xc6, 0xa6, 0x7a, 0x46, 0x0a,
+        0x00, 0x69, 0x78, 0x7b, 0x84, 0xf8, 0xfa, 0xe8,
+        0x28, 0xe4, 0x64, 0x66, 0xa7, 0x2a, 0xb8, 0xa7,
+        0x26, 0x67, 0xe7, 0x6b, 0x2a, 0x38, 0x67, 0x00,
+        0x33, 0x7e, 0xaa, 0xbe, 0x21, 0xd0, 0x50, 0x3d,
+        0x0b, 0x73, 0x13, 0x3d, 0x43, 0x03, 0x33, 0x85,
+        0xe0, 0xc4, 0xb4, 0xc4, 0xa2, 0x4c, 0x88, 0x26,
+        0x06, 0x76, 0xa8, 0xf7, 0x19, 0x38, 0x60, 0xa1,
+        0x02, 0x00, 0x00, 0x00, 0xff, 0xff
+    };
+
+    char outbuf[16384];
+    ssize_t ret;
+    spdy::zstream<spdy::decompress> zout;
+
+    zout.input(pkt, sizeof(pkt));
+    do {
+        ret = zout.consume(outbuf, sizeof(outbuf));
+    } while (ret > 0);
+
+    assert(ret == 0);
+}
+
+int main(void)
+{
+    initstate();
+    roundtrip();
+    shortbuf();
+    compress_kvblock();
+    spdy_headers();
+    spdy_decompress();
+    return 0;
+}
+
+/* vim: set sw=4 ts=4 tw=79 et : */

Reply via email to