Author: gstein
Date: Fri Jun 17 05:44:02 2011
New Revision: 1136777

URL: http://svn.apache.org/viewvc?rev=1136777&view=rev
Log:
Continued work on the pausing mechanic in the XML parser of ra_serf.

The in-memory blocks no longer use a bucket allocator, since that is
generally internal to serf's bucket types. Not a big deal, as we can just
use a simple linked-list of reusable blocks.

This adds code for writing content to the "pending" content, and begins
the code for injecting that content into the XML parser when ready.

The update editor now contains logic for pausing and un-pausing the parser
as certain conditions are met. It also makes the appropriate call to
process the pending content.

However: no logic is changed in the network reading portion, or the normal
parser injection. In essence, the "pause" state is ignored and the
"pending" content is never accumulated. Further development and testing is
needed. This revision is a continued preview of the direction, for
feedback and code review.

* subversion/libsvn_ra_serf/ra_serf.h:
  (svn_ra_serf__xml_parser_t): remove PAUSE_ALLOC. extend the
    documentation for the PAUSED flag, and how the surrounding system
    should interact with that, and the PENDING content.
  (svn_ra_serf__process_pending): new declaration

* subversion/libsvn_ra_serf/util.c:
  (struct pending_buffer_t): fix type of NEXT link
  (svn_ra_serf__pending_t): add AVAIL member for tracking free blocks of
    buffers for in-memory content
  (write_to_pending): when the network needs to save content (that cannot
    go into the XML parser), this function will be used to write it into
    the parser context's PENDING structure.
  (inject_to_parser): allow NULL for the SL param, and adjust the returned
    error string appropriately.
  (svn_ra_serf__process_pending): new function to process the PENDING
    structure of a parser context. it currently handles in-memory content,
    but not (yet) files.

* subversion/libsvn_ra_serf/update.c:
  (REQUEST_COUNT_TO_PAUSE, REQUEST_COUNT_TO_RESUME): new symbols (and
    docco) that provide bounds for pausing and un-pausing the XML parsing
    process from the network.
  (report_context_t): add PARSER_CTX to enable the callbacks to pause the
    parsing
  (fetch_file, end_report): if the outstanding requests are too high, then
    pause the XML parsing.
  (finish_report): store the PARSER_CTX into the report context. within
    the report (and network) processing loop, possibly re-enable XML
    parsing if the request count gets low enough. call over to the
    XML parsing code to process any pending content, as appropriate.
  (): remove two strange #undef's (MAX_NR_OF_CONNS, EXP_REQS_PER_CONN)

Modified:
    subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h
    subversion/trunk/subversion/libsvn_ra_serf/update.c
    subversion/trunk/subversion/libsvn_ra_serf/util.c

Modified: subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h
URL: 
http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h?rev=1136777&r1=1136776&r2=1136777&view=diff
==============================================================================
--- subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h (original)
+++ subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h Fri Jun 17 05:44:02 
2011
@@ -591,21 +591,24 @@ struct svn_ra_serf__xml_parser_t {
   /* If an error occurred, this value will be non-NULL. */
   svn_error_t *error;
 
-  /* If "pausing" is to be allowed, then this must refer to the connection's
-     bucket allocator. It will be used to hold content in memory, then
-     return it once it has been consumed.
-
-     NOTE: if this field is NULL, then pausing is NOT allowed.  */
-  serf_bucket_alloc_t *pause_alloc;
-
-  /* If pausing is allowed, then callbacks can set this value to pause
-     the XML parsing. Note that additional elements may be parsed once
-     this value is set (as the current buffer is consumed; no further
-     buffers will be parsed until PAUSED is cleared).
-
-     At some point, the callbacks should clear this value. The main
-     serf_context_run() loop will notice the change and resume delivery
-     of content to the XML parser.  */
+  /* Deciding whether to pause, or not, is performed within the parsing
+     callbacks. If a callback decides to set this flag, then the loop
+     driving the parse (generally, a series of calls to serf_context_run())
+     is going to need to coordinate the un-pausing of the parser by
+     processing pending content. Thus, deciding to pause the parser is a
+     coordinate effort rather than merely setting this flag.
+
+     When an XML parsing callback sets this flag, note that additional
+     elements may be parsed (as the current buffer is consumed). At some
+     point, the flag will be recognized and arriving network content will
+     be stashed away in the PENDING structure (see below).
+
+     At some point, the controlling loop should clear this value. The
+     underlying network processing will note the change and begin passing
+     content into the XML callbacks.
+
+     Note that the controlling loop should also process pending content
+     since the arriving network content will typically finish first.  */
   svn_boolean_t paused;
 
   /* While the XML parser is paused, content arriving from the server
@@ -772,6 +775,12 @@ svn_ra_serf__xml_push_state(svn_ra_serf_
 void
 svn_ra_serf__xml_pop_state(svn_ra_serf__xml_parser_t *parser);
 
+
+svn_error_t *
+svn_ra_serf__process_pending(svn_ra_serf__xml_parser_t *parser,
+                             apr_pool_t *scratch_pool);
+
+
 /*
  * Add the appropriate serf buckets to @a agg_bucket represented by
  * the XML * @a tag and @a value.

Modified: subversion/trunk/subversion/libsvn_ra_serf/update.c
URL: 
http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_ra_serf/update.c?rev=1136777&r1=1136776&r2=1136777&view=diff
==============================================================================
--- subversion/trunk/subversion/libsvn_ra_serf/update.c (original)
+++ subversion/trunk/subversion/libsvn_ra_serf/update.c Fri Jun 17 05:44:02 2011
@@ -71,6 +71,25 @@ typedef enum report_state_e {
     NEED_PROP_NAME,
 } report_state_e;
 
+
+/* While we process the REPORT response, we will queue up GET and PROPFIND
+   requests. For a very large checkout, it is very easy to queue requests
+   faster than they are resolved. Thus, we need to pause the XML processing
+   (which queues more requests) to avoid queueing too many, with their
+   attendant memory costs. When the queue count drops low enough, we will
+   resume XML processing.
+
+   Note that we don't want the count to drop to zero. We have multiple
+   connections that we want to keep busy. These are also heuristic numbers
+   since network and parsing behavior (ie. it doesn't pause immediately)
+   can make the measurements quite imprecise.
+
+   We measure outstanding requests as the sum of ACTIVE_FETCHES and
+   ACTIVE_PROPFINDS in the report_context_t structure.  */
+#define REQUEST_COUNT_TO_PAUSE 1000
+#define REQUEST_COUNT_TO_RESUME 100
+
+
 /* Forward-declare our report context. */
 typedef struct report_context_t report_context_t;
 
@@ -330,6 +349,8 @@ struct report_context_t {
   /* Are we done parsing the REPORT response? */
   svn_boolean_t done;
 
+  /* The XML parser context for the REPORT response.  */
+  svn_ra_serf__xml_parser_t *parser_ctx;
 };
 
 
@@ -1276,6 +1297,9 @@ fetch_file(report_context_t *ctx, report
       SVN_ERR(handle_propchange_only(info, info->pool));
     }
 
+  if (ctx->active_fetches + ctx->active_propfinds > REQUEST_COUNT_TO_PAUSE)
+    ctx->parser_ctx->paused = TRUE;
+
   return SVN_NO_ERROR;
 }
 
@@ -1775,6 +1799,10 @@ end_report(svn_ra_serf__xml_parser_t *pa
           SVN_ERR_ASSERT(info->dir->propfind);
 
           ctx->active_propfinds++;
+
+          if (ctx->active_fetches + ctx->active_propfinds
+              > REQUEST_COUNT_TO_PAUSE)
+            ctx->parser_ctx->paused = TRUE;
         }
       else
         {
@@ -2331,6 +2359,8 @@ finish_report(void *report_baton,
   handler->response_handler = svn_ra_serf__handle_xml_parser;
   handler->response_baton = parser_ctx;
 
+  report->parser_ctx = parser_ctx;
+
   svn_ra_serf__request_create(handler);
 
   /* Open the first extra connection. */
@@ -2346,6 +2376,7 @@ finish_report(void *report_baton,
       int i;
       apr_status_t status;
 
+      /* Note: this throws out the old ITERPOOL_INNER.  */
       svn_pool_clear(iterpool);
 
       /* We need to be careful between the outer and inner ITERPOOLs,
@@ -2479,6 +2510,19 @@ finish_report(void *report_baton,
         }
       report->done_fetches = NULL;
 
+      /* If the parser is paused, and the number of active requests has
+         dropped far enough, then resume parsing.  */
+      if (parser_ctx->paused
+          && (report->active_fetches + report->active_propfinds
+              < REQUEST_COUNT_TO_RESUME))
+        parser_ctx->paused = FALSE;
+
+      /* If we have not paused the parser and it looks like data MAY be
+         present (we can't know for sure because of the private structure),
+         then go process the pending content.  */
+      if (!parser_ctx->paused && parser_ctx->pending != NULL)
+        SVN_ERR(svn_ra_serf__process_pending(parser_ctx, iterpool_inner));
+
       /* Debugging purposes only! */
       for (i = 0; i < sess->num_conns; i++)
         {
@@ -2498,8 +2542,7 @@ finish_report(void *report_baton,
   svn_pool_destroy(iterpool);
   return svn_error_return(err);
 }
-#undef MAX_NR_OF_CONNS
-#undef EXP_REQS_PER_CONN
+
 
 static svn_error_t *
 abort_report(void *report_baton,

Modified: subversion/trunk/subversion/libsvn_ra_serf/util.c
URL: 
http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_ra_serf/util.c?rev=1136777&r1=1136776&r2=1136777&view=diff
==============================================================================
--- subversion/trunk/subversion/libsvn_ra_serf/util.c (original)
+++ subversion/trunk/subversion/libsvn_ra_serf/util.c Fri Jun 17 05:44:02 2011
@@ -61,7 +61,8 @@
 struct pending_buffer_t {
   apr_size_t size;
   char data[PARSE_CHUNK_SIZE];
-  struct pending_memnode_t *next;
+
+  struct pending_buffer_t *next;
 };
 
 
@@ -77,6 +78,10 @@ struct svn_ra_serf__pending_t {
   struct pending_buffer_t *head;
   struct pending_buffer_t *tail;
 
+  /* Available blocks for storing pending data. These were allocated
+     previously, then the data consumed and returned to this list.  */
+  struct pending_buffer_t *avail;
+
   /* Once MEMORY_SIZE exceeds SPILL_SIZE, then arriving content will be
      appended to the (temporary) file indicated by SPILL.  */
   apr_file_t *spill;
@@ -1211,6 +1216,85 @@ add_done_item(svn_ra_serf__xml_parser_t 
 
 
 static svn_error_t *
+write_to_pending(svn_ra_serf__xml_parser_t *ctx,
+                 const char *data,
+                 apr_size_t len,
+                 apr_pool_t *scratch_pool)
+{
+  struct pending_buffer_t *pb;
+
+  /* The caller should not have provided us more than we can store into
+     a single memory block.  */
+  SVN_ERR_ASSERT(len <= PARSE_CHUNK_SIZE);
+
+  if (ctx->pending == NULL)
+    ctx->pending = apr_pcalloc(ctx->pool, sizeof(*ctx->pending));
+
+  /* We do not (yet) have a spill file, but the amount stored in memory
+     has grown too large. Create the file and place the pending data into
+     the temporary file.  */
+  if (ctx->pending->spill == NULL
+      && ctx->pending->memory_size > SPILL_SIZE)
+    {
+      SVN_ERR(svn_io_open_unique_file3(&ctx->pending->spill,
+                                       NULL /* temp_path */,
+                                       NULL /* dirpath */,
+                                       svn_io_file_del_on_pool_cleanup,
+                                       ctx->pool, scratch_pool));
+    }
+
+  /* Once a spill file has been constructed, then we need to put all
+     arriving data into the file. We will no longer attempt to hold it
+     in memory.  */
+  if (ctx->pending->spill != NULL)
+    {
+      /* NOTE: we assume the file position is at the END. The caller should
+         ensure this, so that we will append.  */
+      SVN_ERR(svn_io_file_write_full(ctx->pending->spill, data, len,
+                                     NULL, scratch_pool));
+    }
+
+  /* We're still within bounds of holding the pending information in
+     memory. Get a buffer (already available, or alloc it), copy the data
+     there, and link it into our pending data.  */
+  if (ctx->pending->avail != NULL)
+    {
+      pb = ctx->pending->avail;
+      ctx->pending->avail = pb->next;
+    }
+  else
+    {
+      pb = apr_palloc(ctx->pool, sizeof(*pb));
+    }
+  /* NOTE: *pb is uninitialized. All fields must be stored.  */
+
+  pb->size = len;
+  memcpy(pb->data, data, len);
+  pb->next = NULL;
+
+  /* Start a list of buffers, or append to the end of the linked list
+     of buffers.  */
+  if (ctx->pending->tail == NULL)
+    {
+      ctx->pending->head = pb;
+      ctx->pending->tail = pb;
+    }
+  else
+    {
+      ctx->pending->tail->next = pb;
+      ctx->pending->tail = pb;
+    }
+
+  /* We need to record how much is buffered in memory. Once we reach
+     SPILL_SIZE (or thereabouts, it doesn't have to be precise), then
+     we'll switch to putting the content into a file.  */
+  ctx->pending->memory_size += len;
+
+  return SVN_NO_ERROR;
+}
+
+
+static svn_error_t *
 inject_to_parser(svn_ra_serf__xml_parser_t *ctx,
                  const char *data,
                  apr_size_t len,
@@ -1220,9 +1304,15 @@ inject_to_parser(svn_ra_serf__xml_parser
 
   xml_status = XML_Parse(ctx->xmlp, data, len, 0);
   if (xml_status == XML_STATUS_ERROR && !ctx->ignore_errors)
-    return svn_error_createf(SVN_ERR_RA_DAV_MALFORMED_DATA, NULL,
-                             _("XML parsing failed: (%d %s)"),
-                             sl->code, sl->reason);
+    {
+      if (sl == NULL)
+        return svn_error_createf(SVN_ERR_RA_DAV_MALFORMED_DATA, NULL,
+                                 _("XML parsing failed"));
+
+      return svn_error_createf(SVN_ERR_RA_DAV_MALFORMED_DATA, NULL,
+                               _("XML parsing failed: (%d %s)"),
+                               sl->code, sl->reason);
+    }
 
   if (ctx->error && !ctx->ignore_errors)
     return svn_error_return(ctx->error);
@@ -1231,6 +1321,51 @@ inject_to_parser(svn_ra_serf__xml_parser
 }
 
 
+svn_error_t *
+svn_ra_serf__process_pending(svn_ra_serf__xml_parser_t *parser,
+                             apr_pool_t *scratch_pool)
+{
+  /* Fast path exit: already paused, or nothing to do.  */
+  if (parser->paused || parser->pending == NULL)
+    return SVN_NO_ERROR;
+
+  /* Empty out memory buffers until we run out, or we get paused again.  */
+  while (parser->pending->head != NULL)
+    {
+      struct pending_buffer_t *pb = parser->pending->head;
+      svn_error_t *err;
+
+      if (parser->pending->tail == pb)
+        parser->pending->head = parser->pending->tail = NULL;
+      else
+        parser->pending->head = pb->next;
+      parser->pending->memory_size -= pb->size;
+
+      err = inject_to_parser(parser, pb->data, pb->size, NULL);
+
+      /* Return the block to the "available" list.  */
+      pb->next = parser->pending->avail;
+      parser->pending->avail = pb;
+
+      if (err)
+        return svn_error_return(err);
+
+      /* If the callbacks paused us, then we're done for now.  */
+      if (parser->paused)
+        return SVN_NO_ERROR;
+    }
+
+  /* If we don't have a spill file, then we've exhausted all
+     pending content.  */
+  if (parser->pending->spill == NULL)
+    return SVN_NO_ERROR;
+
+  /* ### read the spill file...  */
+
+  return SVN_NO_ERROR;
+}
+
+
 /* Implements svn_ra_serf__response_handler_t */
 svn_error_t *
 svn_ra_serf__handle_xml_parser(serf_request_t *request,


Reply via email to