Author: rhuijben
Date: Wed Nov 11 12:01:19 2015
New Revision: 1713817

URL: http://svn.apache.org/viewvc?rev=1713817&view=rev
Log:
Make the dechunk bucket properly implement readline and peek via
delegating to the underlying stream.

* buckets/dechunk_buckets.c
  (wait_for_chunk): New function, extracted from...
  (serf_dechunk_read): ... here, which is now a caller.
  (serf_dechunk_readline2): Implement via wait_for_chunk and
    delegating to underlying stream.
  (serf_dechunk_readline): Implement via serf_dechunk_readline2.
  (serf_dechunk_peek): Implement via wait_for_chunk and
    delegating to underlying stream.

* serf.h
  (serf_bucket_readline2): Fix obtaining v2 (fallback) pointer.

* test/test_buckets.c
  (test_dechunk_buckets): Extend test.

Modified:
    serf/trunk/buckets/dechunk_buckets.c
    serf/trunk/serf.h
    serf/trunk/test/test_buckets.c

Modified: serf/trunk/buckets/dechunk_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/dechunk_buckets.c?rev=1713817&r1=1713816&r2=1713817&view=diff
==============================================================================
--- serf/trunk/buckets/dechunk_buckets.c (original)
+++ serf/trunk/buckets/dechunk_buckets.c Wed Nov 11 12:01:19 2015
@@ -65,9 +65,7 @@ static void serf_dechunk_destroy_and_dat
     serf_default_destroy_and_data(bucket);
 }
 
-static apr_status_t serf_dechunk_read(serf_bucket_t *bucket,
-                                      apr_size_t requested,
-                                      const char **data, apr_size_t *len)
+static apr_status_t wait_for_chunk(serf_bucket_t *bucket)
 {
     dechunk_context_t *ctx = bucket->data;
     apr_status_t status;
@@ -112,51 +110,27 @@ static apr_status_t serf_dechunk_read(se
             }
             /* assert: status != 0 */
 
-            /* Note that we didn't actually read anything, so our callers
-             * don't get confused.
-             */
-            *len = 0;
-
             return status;
 
         case STATE_CHUNK:
-
-            if (requested > ctx->body_left) {
-                requested = ctx->body_left;
-            }
-
-            /* Delegate to the stream bucket to do the read. */
-            status = serf_bucket_read(ctx->stream, requested, data, len);
-            if (SERF_BUCKET_READ_ERROR(status))
-                return status;
-
-            /* Some data was read, so decrement the amount left and see
-             * if we're done reading this chunk.
-             */
-            ctx->body_left -= *len;
-            if (!ctx->body_left) {
-                ctx->state = STATE_TERM;
-                ctx->body_left = 2;     /* CRLF */
-            }
-
-            /* We need more data but there is no more available. */
-            if (ctx->body_left && APR_STATUS_IS_EOF(status)) {
-                return SERF_ERROR_TRUNCATED_HTTP_RESPONSE;
-            }
-
-            /* Return the data we just read. */
-            return status;
+            return APR_SUCCESS;
 
         case STATE_TERM:
+          {
             /* Delegate to the stream bucket to do the read. */
-            status = serf_bucket_read(ctx->stream, ctx->body_left, data, len);
+            const char *data;
+            apr_size_t len;
+
+            status = serf_bucket_read(ctx->stream,
+                                      (apr_size_t)ctx->body_left /* 2 or 1 */,
+                                      &data, &len);
             if (SERF_BUCKET_READ_ERROR(status))
                 return status;
 
             /* Some data was read, so decrement the amount left and see
              * if we're done reading the chunk terminator.
              */
-            ctx->body_left -= *len;
+            ctx->body_left -= len;
 
             /* We need more data but there is no more available. */
             if (ctx->body_left && APR_STATUS_IS_EOF(status))
@@ -167,16 +141,13 @@ static apr_status_t serf_dechunk_read(se
             }
 
             /* Don't return the CR of CRLF to the caller! */
-            *len = 0;
-
             if (status)
                 return status;
 
             break;
-
+          }
         case STATE_DONE:
             /* Just keep returning EOF */
-            *len = 0;
             return APR_EOF;
 
         default:
@@ -187,6 +158,125 @@ static apr_status_t serf_dechunk_read(se
     /* NOTREACHED */
 }
 
+static apr_status_t serf_dechunk_read(serf_bucket_t *bucket,
+                                      apr_size_t requested,
+                                      const char **data, apr_size_t *len)
+{
+    dechunk_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = wait_for_chunk(bucket);
+    if (status || ctx->state != STATE_CHUNK) {
+        *len = 0;
+        return status;
+    }
+
+    /* Don't overshoot */
+    if (requested > ctx->body_left) {
+        requested = (apr_size_t)ctx->body_left;
+    }
+
+    /* ### If requested is now ctx->body_left we might want to try
+           reading 2 extra bytes in an attempt to skip STATE_TERM
+           directly */
+
+    /* Delegate to the stream bucket to do the read. */
+    status = serf_bucket_read(ctx->stream, requested, data, len);
+    if (SERF_BUCKET_READ_ERROR(status))
+        return status;
+
+    /* Some data was read, so decrement the amount left and see
+     * if we're done reading this chunk. */
+    ctx->body_left -= *len;
+    if (!ctx->body_left) {
+        ctx->state = STATE_TERM;
+        ctx->body_left = 2;     /* CRLF */
+    }
+
+    /* We need more data but there is no more available. */
+    if (ctx->body_left && APR_STATUS_IS_EOF(status)) {
+        return SERF_ERROR_TRUNCATED_HTTP_RESPONSE;
+    }
+
+    /* Return the data we just read. */
+    return status;
+}
+
+static apr_status_t serf_dechunk_readline2(serf_bucket_t *bucket,
+                                           int accepted,
+                                           apr_size_t requested,
+                                           int *found,
+                                           const char **data,
+                                           apr_size_t *len)
+{
+    dechunk_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = wait_for_chunk(bucket);
+    if (status || ctx->state != STATE_CHUNK) {
+        *len = 0;
+        return status;
+    }
+
+    /* Don't overshoot */
+    if (requested > ctx->body_left) {
+        requested = (apr_size_t)ctx->body_left;
+    }
+
+    /* Delegate to the stream bucket to do the read. */
+    status = serf_bucket_readline2(ctx->stream, accepted, requested,
+                                   found, data, len);
+    if (SERF_BUCKET_READ_ERROR(status))
+        return status;
+
+    /* Some data was read, so decrement the amount left and see
+     * if we're done reading this chunk. */
+    ctx->body_left -= *len;
+    if (!ctx->body_left) {
+        ctx->state = STATE_TERM;
+        ctx->body_left = 2;     /* CRLF */
+    }
+
+    /* We need more data but there is no more available. */
+    if (ctx->body_left && APR_STATUS_IS_EOF(status)) {
+        return SERF_ERROR_TRUNCATED_HTTP_RESPONSE;
+    }
+
+    /* Return the data we just read. */
+    return status;
+}
+
+static apr_status_t serf_dechunk_readline(serf_bucket_t *bucket,
+                                          int accepted,
+                                          int *found,
+                                          const char **data,
+                                          apr_size_t *len)
+{
+    return serf_dechunk_readline2(bucket, accepted, SERF_READ_ALL_AVAIL,
+                                  found, data, len);
+}
+
+static apr_status_t serf_dechunk_peek(serf_bucket_t *bucket,
+                                      const char **data,
+                                      apr_size_t *len)
+{
+    dechunk_context_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = wait_for_chunk(bucket);
+    if (status) {
+        *len = 0;
+        return SERF_BUCKET_READ_ERROR(status) ? status : APR_SUCCESS;
+    }
+
+    status = serf_bucket_peek(ctx->stream, data, len);
+    if (!SERF_BUCKET_READ_ERROR(status) && *len > ctx->body_left)
+    {
+        *len = (apr_size_t)ctx->body_left;
+    }
+    return status;
+}
+
 static apr_status_t serf_dechunk_set_config(serf_bucket_t *bucket,
                                             serf_config_t *config)
 {
@@ -200,14 +290,14 @@ static apr_status_t serf_dechunk_set_con
 const serf_bucket_type_t serf_bucket_type_dechunk = {
     "DECHUNK",
     serf_dechunk_read,
-    serf_default_readline,
+    serf_dechunk_readline,
     serf_default_read_iovec,
     serf_default_read_for_sendfile,
     serf_buckets_are_v2,
-    serf_default_peek /* ### TODO */,
+    serf_dechunk_peek,
     serf_dechunk_destroy_and_data,
     serf_default_read_bucket,
-    serf_default_readline2,
+    serf_dechunk_readline2,
     serf_default_get_remaining,
     serf_dechunk_set_config,
 };

Modified: serf/trunk/serf.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf.h?rev=1713817&r1=1713816&r2=1713817&view=diff
==============================================================================
--- serf/trunk/serf.h (original)
+++ serf/trunk/serf.h Wed Nov 11 12:01:19 2015
@@ -1061,7 +1061,7 @@ const serf_bucket_type_t *serf_get_type(
 #define serf_bucket_peek(b,d,l) ((b)->type->peek(b,d,l))
 #define serf_bucket_destroy(b) ((b)->type->destroy(b))
 #define serf_bucket_readline2(b,a,r,f,d,l) \
-    SERF__RECREAD(b, (b)->type->readline2(b,a,r,f,d,l))
+    SERF__RECREAD(b, serf_get_type(b, 2)->readline2(b,a,r,f,d,l))
 #define serf_bucket_get_remaining(b) (serf_get_type(b, 2)->get_remaining(b))
 #define serf_bucket_set_config(b,c) (serf_get_type(b, 2)->set_config(b, c))
 

Modified: serf/trunk/test/test_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_buckets.c?rev=1713817&r1=1713816&r2=1713817&view=diff
==============================================================================
--- serf/trunk/test/test_buckets.c (original)
+++ serf/trunk/test/test_buckets.c Wed Nov 11 12:01:19 2015
@@ -1575,9 +1575,10 @@ static void test_dechunk_buckets(CuTest
     const int nr_of_actions = sizeof(actions) / sizeof(mockbkt_action);
     apr_status_t status;
     const char *body = "blabla";
-    const char *expected = apr_psprintf(tb->pool, "%s%s%s%s%s%s%s%s%s", body,
-                                        body, body, body, body, body, body,
-                                        body, body);
+    const char *expected_data = apr_psprintf(tb->pool, "%s%s%s%s%s%s%s%s%s",
+                                             body, body, body, body, body,
+                                             body, body, body, body);
+    const char *expected = expected_data;
 
     mock_bkt = serf_bucket_mock_create(actions, nr_of_actions, alloc);
     bkt = serf_bucket_dechunk_create(mock_bkt, alloc);
@@ -1604,6 +1605,34 @@ static void test_dechunk_buckets(CuTest
     CuAssert(tc, "Read less data than expected.", strlen(expected) == 0);
 
     serf_bucket_destroy(bkt);
+
+    mock_bkt = serf_bucket_mock_create(actions, nr_of_actions, alloc);
+    bkt = serf_bucket_dechunk_create(mock_bkt, alloc);
+    expected = expected_data;
+    do
+    {
+        const char *data;
+        apr_size_t len;
+        int found;
+
+        status = serf_bucket_readline(bkt, SERF_NEWLINE_ANY, &found, &data,
+                                      &len);
+
+        CuAssert(tc, "Got error during bucket reading.",
+                 !SERF_BUCKET_READ_ERROR(status));
+        CuAssert(tc, "Read more data than expected.",
+                 strlen(expected) >= len);
+        CuAssert(tc, "Read data is not equal to expected.",
+                 strncmp(expected, data, len) == 0);
+
+        expected += len;
+
+        CuAssertIntEquals(tc, SERF_NEWLINE_NONE, found);
+
+        if (len == 0 && status == APR_EAGAIN)
+            serf_bucket_mock_more_data_arrived(mock_bkt);
+    }
+    while (!APR_STATUS_IS_EOF(status));
 }
 
 static apr_status_t deflate_compress(const char **data, apr_size_t *len,


Reply via email to