Author: rhuijben
Date: Wed Nov 11 16:46:44 2015
New Revision: 1713889

URL: http://svn.apache.org/viewvc?rev=1713889&view=rev
Log:
Add a few states to the deflate bucket state machine to allow using it
for compression too.

Zlib already abstracts everything to a stream and we already had all the
necessary plumbing in place.

I think the original idea was to expose this via the same function as
deflating, but as we call this a deflate bucket while we only supported
inflating it is kind of hard to invent good names for the 'format' argument
to support compression.

* buckets/deflate_buckets.c
  (deflate_state_t): Add some initial and a finish state. Document memLevel 
usage.
  (serf_bucket_deflate_create): Tweak memlevel.
  (serf_bucket_deflate_compress_create): New function.
  (serf_deflate_refill): Tell ZLib that we have the final part. Only push back
    when necessary. Produce verify header when done.
  (serf_deflate_wait_for_data): Implement compress states.

* serf_bucket_types.h
  (serf_bucket_deflate_compress_create): New function.

* test/test_buckets.c
  (test_linebuf_fetch_crlf): Remove unused var.
  (test_deflate_compress_buckets): Add initial test.

Modified:
    serf/trunk/buckets/deflate_buckets.c
    serf/trunk/serf_bucket_types.h
    serf/trunk/test/test_buckets.c

Modified: serf/trunk/buckets/deflate_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/deflate_buckets.c?rev=1713889&r1=1713888&r2=1713889&view=diff
==============================================================================
--- serf/trunk/buckets/deflate_buckets.c (original)
+++ serf/trunk/buckets/deflate_buckets.c Wed Nov 11 16:46:44 2015
@@ -48,7 +48,7 @@ typedef struct deflate_context_t {
 
     int format;                 /* Are we 'deflate' or 'gzip'? */
 
-    enum {
+    enum deflate_state_t {
         STATE_READING_HEADER,   /* reading the gzip header */
         STATE_HEADER,           /* read the gzip header */
         STATE_INIT,             /* init'ing zlib functions */
@@ -57,6 +57,11 @@ typedef struct deflate_context_t {
         STATE_VERIFY,           /* verifying the final gzip CRC */
         STATE_FINISH,           /* clean up after reading body */
         STATE_DONE,             /* body is done; we'll return EOF here */
+
+        /* When handling things the other way around */
+        STATE_WRITING_HEADER,   /* produces a gzip header */
+        STATE_COMPRESS_INIT,    /* initializes zlib for compression */
+        STATE_COMPRESS_FINISH,  /* clean up after producing body */
     } state;
 
     z_stream zstream;
@@ -64,7 +69,8 @@ typedef struct deflate_context_t {
     unsigned char buffer[DEFLATE_BUFFER_SIZE];
     unsigned long crc;
     int windowSize;
-    int memLevel;
+    int memLevel;              /* -1 when decompressing.
+                                  Otherwise the memlevel to use*/
     int bufferSize;
 
     /* How much of the chunk, or the terminator, do we have left to read? */
@@ -121,7 +127,48 @@ serf_bucket_t *serf_bucket_deflate_creat
     ctx->stream_left = ctx->stream_size = DEFLATE_MAGIC_SIZE;
 
     ctx->windowSize = DEFLATE_WINDOW_SIZE;
-    ctx->memLevel = DEFLATE_MEMLEVEL;
+    ctx->memLevel = -1;
+    ctx->bufferSize = DEFLATE_BUFFER_SIZE;
+
+    return serf_bucket_create(&serf_bucket_type_deflate, allocator, ctx);
+}
+
+serf_bucket_t *serf_bucket_deflate_compress_create(
+    serf_bucket_t *stream,
+    int memlevel,
+    int format,
+    serf_bucket_alloc_t *allocator)
+{
+    deflate_context_t *ctx;
+
+    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+    ctx->stream = stream;
+    ctx->stream_status = APR_SUCCESS;
+    ctx->inflate_stream = serf_bucket_aggregate_create(allocator);
+    ctx->format = format;
+    ctx->crc = 0;
+    ctx->config = NULL;
+    /* zstream must be NULL'd out. */
+    memset(&ctx->zstream, 0, sizeof(ctx->zstream));
+
+    switch (ctx->format) {
+        case SERF_DEFLATE_GZIP:
+            ctx->state = STATE_WRITING_HEADER;
+            break;
+        case SERF_DEFLATE_DEFLATE:
+            /* deflate doesn't have a header. */
+            ctx->state = STATE_COMPRESS_INIT;
+            break;
+      default:
+            /* Not reachable */
+            return NULL;
+    }
+
+    /* Initial size of gzip header. */
+    ctx->stream_left = ctx->stream_size = DEFLATE_MAGIC_SIZE;
+
+    ctx->windowSize = DEFLATE_WINDOW_SIZE;
+    ctx->memLevel = (memlevel > 0) ? memlevel : DEFLATE_MEMLEVEL;
     ctx->bufferSize = DEFLATE_BUFFER_SIZE;
 
     return serf_bucket_create(&serf_bucket_type_deflate, allocator, ctx);
@@ -151,6 +198,7 @@ static apr_status_t serf_deflate_refill(
     deflate_context_t *ctx = bucket->data;
     apr_status_t status;
     int zRC;
+    int flush_v = Z_NO_FLUSH;
 
     /* We have nothing buffered. Fetch more. */
 
@@ -181,6 +229,9 @@ static apr_status_t serf_deflate_refill(
             return status;
         }
 
+        if (APR_STATUS_IS_EOF(ctx->stream_status))
+            flush_v = Z_FINISH;
+
         /* Make valgrind happy and explictly initialize next_in to specific
           * value for empty buffer. */
         if (private_len) {
@@ -194,7 +245,10 @@ static apr_status_t serf_deflate_refill(
 
     while (1) {
 
-        zRC = inflate(&ctx->zstream, Z_NO_FLUSH);
+        if (ctx->memLevel < 0)
+            zRC = inflate(&ctx->zstream, flush_v);
+        else
+            zRC = deflate(&ctx->zstream, flush_v);
 
         /* We're full or zlib requires more space. Either case, clear
             out our buffer, reset, and return. */
@@ -234,30 +288,60 @@ static apr_status_t serf_deflate_refill(
 
             ctx->zstream.avail_out = ctx->bufferSize;
 
-            /* Push back the remaining data to be read. */
-            tmp = serf_bucket_aggregate_create(bucket->allocator);
-            serf_bucket_set_config(tmp, ctx->config);
-            serf_bucket_aggregate_prepend(tmp, ctx->stream);
-            ctx->stream = tmp;
-
-            /* We now need to take the remaining avail_in and
-              * throw it in ctx->stream so our next read picks it up.
-              */
-            tmp = SERF_BUCKET_SIMPLE_STRING_LEN(
+            if (ctx->zstream.avail_in) {
+                /* Push back the remaining data to be read. */
+                tmp = serf_bucket_aggregate_create(bucket->allocator);
+                serf_bucket_set_config(tmp, ctx->config);
+                serf_bucket_aggregate_prepend(tmp, ctx->stream);
+                ctx->stream = tmp;
+
+                /* We now need to take the remaining avail_in and
+                 * throw it in ctx->stream so our next read picks it up.
+                 */
+
+                tmp = SERF_BUCKET_SIMPLE_STRING_LEN(
                                 (const char*)ctx->zstream.next_in,
                                               ctx->zstream.avail_in,
                                               bucket->allocator);
-            serf_bucket_aggregate_prepend(ctx->stream, tmp);
+                serf_bucket_aggregate_prepend(ctx->stream, tmp);
+            }
 
             switch (ctx->format) {
             case SERF_DEFLATE_GZIP:
-                ctx->stream_left = ctx->stream_size =
-                    DEFLATE_VERIFY_SIZE;
-                ctx->state++;
+                if (ctx->memLevel >= 0) {
+                     char *verify_header = serf_bucket_mem_alloc(
+                                                bucket->allocator,
+                                                DEFLATE_VERIFY_SIZE);
+
+                     verify_header[0] = (ctx->crc >> 24) & 0xFF;
+                     verify_header[1] = (ctx->crc >> 16) & 0xFF;
+                     verify_header[2] = (ctx->crc >>  8) & 0xFF;
+                     verify_header[3] =  ctx->crc        & 0xFF;
+
+                     verify_header[4] = (ctx->zstream.total_out >> 24) & 0xFF;
+                     verify_header[5] = (ctx->zstream.total_out >> 16) & 0xFF;
+                     verify_header[6] = (ctx->zstream.total_out >>  8) & 0xFF;
+                     verify_header[7] =  ctx->zstream.total_out        & 0xFF;
+
+                     serf_bucket_aggregate_append(
+                              ctx->inflate_stream,
+                              serf_bucket_simple_own_create(verify_header,
+                                                            
DEFLATE_VERIFY_SIZE,
+                                                            
bucket->allocator));
+                     ctx->state = STATE_COMPRESS_FINISH;
+                }
+                else {
+                    ctx->stream_left = ctx->stream_size =
+                                              DEFLATE_VERIFY_SIZE;
+                    ctx->state++;
+                }
                 break;
             case SERF_DEFLATE_DEFLATE:
                 /* Deflate does not have a verify footer. */
-                ctx->state = STATE_FINISH;
+                if (ctx->memLevel >= 0)
+                    ctx->state = STATE_COMPRESS_FINISH;
+                else
+                    ctx->state = STATE_FINISH;
                 break;
             default:
                 /* Not reachable */
@@ -390,6 +474,42 @@ static apr_status_t serf_deflate_wait_fo
         case STATE_DONE:
             /* We're done inflating.  Use our finished buffer. */
             return ctx->inflate_stream ? APR_SUCCESS : APR_EOF;
+
+
+        case STATE_WRITING_HEADER:
+            {
+              char *header = serf_bucket_mem_calloc(bucket->allocator,
+                                                    DEFLATE_MAGIC_SIZE);
+              memcpy(header, deflate_magic, sizeof(deflate_magic));
+              header[2] = Z_DEFLATED;
+              /* No mtime. DOS/Default OS */
+
+              serf_bucket_aggregate_append(
+                      ctx->inflate_stream,
+                      serf_bucket_simple_own_create(header, DEFLATE_MAGIC_SIZE,
+                                                    bucket->allocator));
+              ctx->state++;
+              break;
+            }
+        case STATE_COMPRESS_INIT:
+            zRC = deflateInit2(&ctx->zstream, Z_DEFAULT_STRATEGY, Z_DEFLATED,
+                               ctx->windowSize, ctx->memLevel, 
Z_DEFAULT_STRATEGY);
+            if (zRC != Z_OK) {
+                serf__log(LOGLVL_ERROR, LOGCOMP_COMPR, __FILE__, ctx->config,
+                          "deflateInit2 error %d - %s\n",
+                          zRC, ctx->zstream.msg);
+                return SERF_ERROR_DECOMPRESSION_FAILED;
+            }
+            ctx->zstream.next_out = ctx->buffer;
+            ctx->zstream.avail_out = ctx->bufferSize;
+            ctx->state = STATE_INFLATE;
+            break;
+        case STATE_COMPRESS_FINISH:
+            deflateEnd(&ctx->zstream);
+            serf_bucket_aggregate_prepend(ctx->stream, ctx->inflate_stream);
+            ctx->inflate_stream = NULL;
+            ctx->state++;
+            break;
         default:
             /* Not reachable */
             return APR_EGENERAL;

Modified: serf/trunk/serf_bucket_types.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_bucket_types.h?rev=1713889&r1=1713888&r2=1713889&view=diff
==============================================================================
--- serf/trunk/serf_bucket_types.h (original)
+++ serf/trunk/serf_bucket_types.h Wed Nov 11 16:46:44 2015
@@ -481,6 +481,12 @@ serf_bucket_t *serf_bucket_deflate_creat
     serf_bucket_alloc_t *allocator,
     int format);
 
+serf_bucket_t *serf_bucket_deflate_compress_create(
+    serf_bucket_t *stream,
+    int memlevel,
+    int format,
+    serf_bucket_alloc_t *allocator);
+
 
 /* ==================================================================== */
 

Modified: serf/trunk/test/test_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_buckets.c?rev=1713889&r1=1713888&r2=1713889&view=diff
==============================================================================
--- serf/trunk/test/test_buckets.c (original)
+++ serf/trunk/test/test_buckets.c Wed Nov 11 16:46:44 2015
@@ -2004,7 +2004,6 @@ static void test_linebuf_fetch_crlf(CuTe
     serf_bucket_t *bkt;
     serf_linebuf_t linebuf;
     serf_bucket_type_t *unfriendly;
-    apr_status_t status;
     
     serf_bucket_alloc_t *alloc = test__create_bucket_allocator(tc, tb->pool);
 
@@ -2264,6 +2263,26 @@ static void test_limit_buckets(CuTest *t
   }
 }
 
+static void test_deflate_compress_buckets(CuTest *tc)
+{
+    test_baton_t *tb = tc->testBaton;
+    serf_bucket_alloc_t *alloc = tb->bkt_alloc;
+    serf_bucket_t *bkt;
+    int i;
+    const char *body = "12345678901234567890" CRLF
+                       "12345678901234567890" CRLF
+                       "12345678901234567890" CRLF;
+
+    for (i = SERF_DEFLATE_GZIP; i <= SERF_DEFLATE_DEFLATE; i++) {
+        bkt = SERF_BUCKET_SIMPLE_STRING(body, alloc);
+        bkt = serf_bucket_deflate_compress_create(bkt, 0, i, alloc);
+        bkt = serf_bucket_deflate_create(bkt, alloc, i);
+
+        read_and_check_bucket(tc, bkt, body);
+        serf_bucket_destroy(bkt);
+    }
+}
+
 /* Basic test for unframe buckets. */
 static void test_http2_unframe_buckets(CuTest *tc)
 {
@@ -2771,6 +2790,7 @@ CuSuite *test_buckets(void)
     SUITE_ADD_TEST(suite, test_deflate_buckets);
     SUITE_ADD_TEST(suite, test_prefix_buckets);
     SUITE_ADD_TEST(suite, test_limit_buckets);
+    SUITE_ADD_TEST(suite, test_deflate_compress_buckets);
     SUITE_ADD_TEST(suite, test_http2_unframe_buckets);
     SUITE_ADD_TEST(suite, test_http2_unpad_buckets);
     SUITE_ADD_TEST(suite, test_hpack_huffman_decode);


Reply via email to