Author: rhuijben
Date: Fri Oct 30 00:40:16 2015
New Revision: 1711394
URL: http://svn.apache.org/viewvc?rev=1711394&view=rev
Log:
Complete the implementation of the copy buckets as noted in the TODO
comments.
* buckets/copy_buckets.c
(serf_copy_read): Add implementation.
(serf_copy_read_iovec): Add the missing pieces.
(serf_copy_get_remaining): New function.
(serf_copy_destroy): Destroy hold buffer.
(serf_bucket_type_copy): Add get_remaining.
* test/test_buckets.c
(discard_data): Move helper function here.
(test_serf_default_read_iovec): Rename this function to...
(test_copy_bucket): ... this. Remove '#if 0' and extend.
(discard_data): Move up.
(test_buckets): Re-add item in order.
Modified:
serf/trunk/buckets/copy_buckets.c
serf/trunk/test/test_buckets.c
Modified: serf/trunk/buckets/copy_buckets.c
URL:
http://svn.apache.org/viewvc/serf/trunk/buckets/copy_buckets.c?rev=1711394&r1=1711393&r2=1711394&view=diff
==============================================================================
--- serf/trunk/buckets/copy_buckets.c (original)
+++ serf/trunk/buckets/copy_buckets.c Fri Oct 30 00:40:16 2015
@@ -62,12 +62,60 @@ static apr_status_t serf_copy_read(serf_
apr_size_t requested,
const char **data, apr_size_t *len)
{
- /* ### peek to see how much is easily available. if it is MIN_SIZE,
- ### then a read() would (likely) get that same amount. otherwise,
- ### we should read an iovec and concatenate the result. */
+ copy_context_t *ctx = bucket->data;
+ apr_status_t status;
+ const char *wdata;
+ apr_size_t peek_len;
+ struct iovec vecs[16];
+ int vecs_used;
+ apr_size_t fetched;
+
+ status = serf_bucket_peek(ctx->wrapped, &wdata, &peek_len);
+
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ *len = 0;
+ return status;
+ }
+
+ /* Can we just return the peeked result? */
+ if (status || requested <= peek_len || ctx->min_size <= peek_len) {
- /* ### fix this return code */
- return APR_SUCCESS;
+ return serf_bucket_read(ctx->wrapped, requested, data, len);
+ }
+
+ if (! ctx->hold_buf)
+ ctx->hold_buf = serf_bucket_mem_alloc(bucket->allocator,
+ ctx->min_size);
+
+ /* Reduce requested to fit in our buffer if necessary*/
+ if (requested > ctx->min_size)
+ requested = ctx->min_size;
+
+ fetched = 0;
+ while (fetched < requested) {
+ int i;
+ status = serf_bucket_read_iovec(ctx->wrapped, requested - fetched,
+ 16, vecs, &vecs_used);
+
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ if (fetched > 0)
+ status = APR_EAGAIN;
+ break;
+ }
+
+ for (i = 0; i < vecs_used; i++) {
+ memcpy(ctx->hold_buf + fetched, vecs[i].iov_base, vecs[i].iov_len);
+ fetched += vecs[i].iov_len;
+ }
+
+ if (status)
+ break;
+ }
+
+ *data = ctx->hold_buf;
+ *len = fetched;
+
+ return status;
}
@@ -93,9 +141,26 @@ static apr_status_t serf_copy_read_iovec
copy_context_t *ctx = bucket->data;
apr_status_t status;
apr_size_t total;
+ apr_size_t fetched;
int i;
- status = serf_bucket_read_iovec(bucket, requested,
+ /* If somebody really wants to call us for 1 iovec, call the function
+ that already implements the copying for this */
+ if (vecs_size == 1) {
+ *vecs_used = 1;
+ const char *data;
+ apr_size_t len;
+
+ status = serf_copy_read(bucket, requested, &data, &len);
+
+ vecs[0].iov_base = (void*)data;
+ vecs[0].iov_len = len;
+ *vecs_used = 1;
+
+ return status;
+ }
+
+ status = serf_bucket_read_iovec(ctx->wrapped, requested,
vecs_size, vecs, vecs_used);
/* There are four possible results:
@@ -125,12 +190,49 @@ static apr_status_t serf_copy_read_iovec
/* Copy what we have into our buffer. Then continue reading to get at
least MIN_SIZE or REQUESTED bytes of data. */
+ if (! ctx->hold_buf)
+ ctx->hold_buf = serf_bucket_mem_alloc(bucket->allocator,
+ ctx->min_size);
/* ### copy into HOLD_BUF. then read/append some more. */
+ fetched = 0;
+ for (i = 0; i < *vecs_used; i++) {
+ memcpy(ctx->hold_buf + fetched, vecs[i].iov_base, vecs[i].iov_len);
+ fetched += vecs[i].iov_len;
+ }
+
/* ### point vecs[0] at HOLD_BUF. */
+ vecs[0].iov_base = ctx->hold_buf;
+ vecs[0].iov_len = fetched;
- return status;
+ while (TRUE) {
+ int v_used;
+
+ status = serf_bucket_read_iovec(ctx->wrapped, requested - fetched,
+ vecs_size - 1, &vecs[1], &v_used);
+
+ if (SERF_BUCKET_READ_ERROR(status)) {
+ *vecs_used = 1;
+ return APR_EAGAIN;
+ }
+
+ for (i = 1; i <= v_used; i++)
+ total += vecs[i].iov_len;
+
+ if (status || total >= ctx->min_size || total == requested) {
+ *vecs_used = v_used + 1;
+ return status;
+ }
+
+ for (i = 1; i <= v_used; i++) {
+ memcpy(ctx->hold_buf + fetched, vecs[i].iov_base,
+ vecs[i].iov_len);
+ fetched += vecs[i].iov_len;
+ }
+
+ vecs[0].iov_len = fetched;
+ }
}
@@ -168,12 +270,20 @@ static apr_status_t serf_copy_peek(serf_
return serf_bucket_peek(ctx->wrapped, data, len);
}
+static apr_uint64_t serf_copy_get_remaining(serf_bucket_t *bucket)
+{
+ copy_context_t *ctx = bucket->data;
+
+ return serf_bucket_get_remaining(ctx->wrapped);
+}
+
static void serf_copy_destroy(serf_bucket_t *bucket)
{
-/* copy_context_t *ctx = bucket->data;*/
+ copy_context_t *ctx = bucket->data;
- /* ### kill the HOLD_BUF */
+ if (ctx->hold_buf)
+ serf_bucket_mem_free(bucket->allocator, ctx->hold_buf);
serf_default_destroy_and_data(bucket);
}
@@ -198,6 +308,6 @@ const serf_bucket_type_t serf_bucket_typ
serf_copy_peek,
serf_copy_destroy,
serf_copy_read_bucket,
- serf_default_get_remaining,
+ serf_copy_get_remaining,
serf_copy_set_config,
};
Modified: serf/trunk/test/test_buckets.c
URL:
http://svn.apache.org/viewvc/serf/trunk/test/test_buckets.c?rev=1711394&r1=1711393&r2=1711394&view=diff
==============================================================================
--- serf/trunk/test/test_buckets.c (original)
+++ serf/trunk/test/test_buckets.c Fri Oct 30 00:40:16 2015
@@ -161,6 +161,28 @@ void readlines_and_check_bucket(CuTest *
CuAssert(tc, "Read less data than expected.", strlen(expected) == 0);
}
+static apr_status_t discard_data(serf_bucket_t *bkt,
+ apr_size_t *read_len)
+{
+ const char *data;
+ apr_size_t data_len;
+ apr_status_t status;
+ apr_size_t read;
+
+ read = 0;
+
+ do
+ {
+ status = serf_bucket_read(bkt, SERF_READ_ALL_AVAIL, &data, &data_len);
+
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ read += data_len;
+ }
+ } while(status == APR_SUCCESS);
+
+ *read_len = read;
+ return status;
+}
/******************************** TEST CASES
**********************************/
@@ -981,20 +1003,18 @@ static void test_response_bucket_peek_at
}
#undef EXP_RESPONSE
-/* ### this test is useful, but needs to switch to the new COPY bucket
- ### to test the behavior. */
-#if 0
-
/* Test that the internal function serf_default_read_iovec, used by many
bucket types, groups multiple buffers in one iovec. */
-static void test_serf_default_read_iovec(CuTest *tc)
+static void test_copy_bucket(CuTest *tc)
{
test_baton_t *tb = tc->testBaton;
apr_status_t status;
- serf_bucket_t *bkt, *aggbkt;
- struct iovec tgt_vecs[32];
+ serf_bucket_t *bkt, *aggbkt, *copybkt;
+ struct iovec tgt_vecs[2];
int vecs_used, i;
apr_size_t actual_len = 0;
+ const char *data;
+ apr_size_t len;
serf_bucket_alloc_t *alloc = serf_bucket_allocator_create(tb->pool, NULL,
NULL);
@@ -1005,6 +1025,7 @@ static void test_serf_default_read_iovec
/* Test 1: multiple children, should be read in one iovec. */
aggbkt = serf_bucket_aggregate_create(alloc);
+ copybkt = serf_bucket_copy_create(aggbkt, 1024, alloc);
bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY, 20, alloc);
serf_bucket_aggregate_append(aggbkt, bkt);
@@ -1013,15 +1034,61 @@ static void test_serf_default_read_iovec
bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY+40, strlen(BODY)-40, alloc);
serf_bucket_aggregate_append(aggbkt, bkt);
- status = serf_default_read_iovec(aggbkt, SERF_READ_ALL_AVAIL, 32, tgt_vecs,
- &vecs_used);
+ CuAssertIntEquals(tc, strlen(BODY),
+ (int)serf_bucket_get_remaining(aggbkt));
+ CuAssertIntEquals(tc, strlen(BODY),
+ (int)serf_bucket_get_remaining(copybkt));
+
+ /* When < min_size, everything should be read in one go */
+ status = serf_bucket_read(copybkt, SERF_READ_ALL_AVAIL, &data, &len);
+ CuAssertIntEquals(tc, APR_EOF, status);
+ CuAssertIntEquals(tc, strlen(BODY), len);
+
+
+ /* Just reuse the existing aggbkt. Fill again */
+ copybkt = serf_bucket_copy_create(aggbkt, 35, alloc);
+ bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY, 20, alloc);
+ serf_bucket_aggregate_append(aggbkt, bkt);
+ bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY + 20, 20, alloc);
+ serf_bucket_aggregate_append(aggbkt, bkt);
+ bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY + 40, strlen(BODY) - 40, alloc);
+ serf_bucket_aggregate_append(aggbkt, bkt);
+
+ CuAssertIntEquals(tc, strlen(BODY),
+ (int)serf_bucket_get_remaining(copybkt));
+
+ /* When, requesting more than min_size, but more than in the first chunk
+ we will get min_size */
+ status = serf_bucket_read(copybkt, SERF_READ_ALL_AVAIL, &data, &len);
+ CuAssertIntEquals(tc, APR_SUCCESS, status);
+ CuAssertIntEquals(tc, 35, len);
+
+ /* We can read the rest */
+ CuAssertIntEquals(tc, APR_EOF, discard_data(copybkt, &len));
+ CuAssertIntEquals(tc, strlen(BODY) - 35, len);
+
+ /* Just reuse the existing aggbkt. Fill again */
+ copybkt = serf_bucket_copy_create(aggbkt, 45, alloc);
+ bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY, 20, alloc);
+ serf_bucket_aggregate_append(aggbkt, bkt);
+ bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY + 20, 20, alloc);
+ serf_bucket_aggregate_append(aggbkt, bkt);
+ bkt = SERF_BUCKET_SIMPLE_STRING_LEN(BODY + 40, strlen(BODY) - 40, alloc);
+ serf_bucket_aggregate_append(aggbkt, bkt);
+
+ CuAssertIntEquals(tc, strlen(BODY),
+ (int)serf_bucket_get_remaining(copybkt));
+
+ /* Now test if we can read everything as two vecs */
+ status = serf_bucket_read_iovec(copybkt, SERF_READ_ALL_AVAIL,
+ 2, tgt_vecs, &vecs_used);
+
CuAssertIntEquals(tc, APR_EOF, status);
for (i = 0; i < vecs_used; i++)
actual_len += tgt_vecs[i].iov_len;
CuAssertIntEquals(tc, strlen(BODY), actual_len);
}
-#endif
/* Test that serf doesn't hang in an endless loop when a linebuf is in
split-CRLF state. */
@@ -1537,29 +1604,6 @@ static void test_deflate_buckets(CuTest
apr_pool_destroy(iterpool);
}
-static apr_status_t discard_data(serf_bucket_t *bkt,
- apr_size_t *read_len)
-{
- const char *data;
- apr_size_t data_len;
- apr_status_t status;
- apr_size_t read;
-
- read = 0;
-
- do
- {
- status = serf_bucket_read(bkt, SERF_READ_ALL_AVAIL, &data, &data_len);
-
- if (!SERF_BUCKET_READ_ERROR(status)) {
- read += data_len;
- }
- } while(status == APR_SUCCESS);
-
- *read_len = read;
- return status;
-}
-
static apr_status_t hold_open(void *baton, serf_bucket_t *aggbkt)
{
test_baton_t *tb = baton;
@@ -2176,8 +2220,6 @@ static void test_http2_frame_bucket_basi
{
test_baton_t *tb = tc->testBaton;
serf_bucket_alloc_t *alloc;
- serf_bucket_t *hpack;
- apr_size_t sz;
serf_bucket_t *body_in;
serf_bucket_t *frame_in;
serf_bucket_t *frame_out;
@@ -2238,9 +2280,11 @@ CuSuite *test_buckets(void)
SUITE_ADD_TEST(suite, test_aggregate_buckets);
SUITE_ADD_TEST(suite, test_aggregate_bucket_readline);
SUITE_ADD_TEST(suite, test_header_buckets);
+ SUITE_ADD_TEST(suite, test_copy_bucket);
SUITE_ADD_TEST(suite, test_linebuf_crlf_split);
SUITE_ADD_TEST(suite, test_response_no_body_expected);
SUITE_ADD_TEST(suite, test_random_eagain_in_response);
+ SUITE_ADD_TEST(suite, test_linebuf_fetch_crlf);
SUITE_ADD_TEST(suite, test_dechunk_buckets);
SUITE_ADD_TEST(suite, test_deflate_buckets);
SUITE_ADD_TEST(suite, test_http2_unframe_buckets);
@@ -2255,11 +2299,7 @@ CuSuite *test_buckets(void)
SUITE_ADD_TEST(suite, test_deflate_4GBplus_buckets);
#endif
-#if 0
- SUITE_ADD_TEST(suite, test_serf_default_read_iovec);
-#endif
-
- SUITE_ADD_TEST(suite, test_linebuf_fetch_crlf);
+
return suite;
}