Here's a version of this patch which I consider final.

Thanks for your tips on using DSA.  No crashes now.

I am confused about not needing dsa_attach the second time around.  If I
do that, the dsa handle has been 0x7f'd, which I don't understand since
it is supposed to be allocated in TopMemoryContext.  I didn't dig too
deep to try and find what is causing that behavior.  Once we do, it's
easy to remove the dsa_detach/dsa_attach calls.

I added a new SQL-callable function to invoke summarization of an
individual page range.  That is what I wanted to do in vacuum (rather
than a scan of the complete index), and it seems independently useful.

I also removed the behavior that on index creation the final partial
block range is always summarized.  It's pointless.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/doc/src/sgml/brin.sgml b/doc/src/sgml/brin.sgml
index 5bf11dc..5140a38 100644
--- a/doc/src/sgml/brin.sgml
+++ b/doc/src/sgml/brin.sgml
@@ -74,9 +74,14 @@
    tuple; those tuples remain unsummarized until a summarization run is
    invoked later, creating initial summaries.
    This process can be invoked manually using the
-   <function>brin_summarize_new_values(regclass)</function> function,
-   or automatically when <command>VACUUM</command> processes the table.
+   <function>brin_summarize_range(regclass, bigint)</function> or
+   <function>brin_summarize_new_values(regclass)</function> functions;
+   automatically when <command>VACUUM</command> processes the table;
+   or by automatic summarization executed by autovacuum, as insertions
+   occur.  (This last trigger is disabled by default and can be enabled
+   with the <literal>autosummarize</literal> parameter.)
   </para>
+
  </sect2>
 </sect1>
 
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 6887eab..25c18d1 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19685,6 +19685,13 @@ postgres=# SELECT * FROM 
pg_walfile_name_offset(pg_stop_backup());
       </row>
       <row>
        <entry>
+        <literal><function>brin_summarize_range(<parameter>index</> 
<type>regclass</>, <parameter>blockNumber</> 
<type>bigint</type>)</function></literal>
+       </entry>
+       <entry><type>integer</type></entry>
+       <entry>summarize the page range covering the given block, if not 
already summarized</entry>
+      </row>
+      <row>
+       <entry>
         <literal><function>gin_clean_pending_list(<parameter>index</> 
<type>regclass</>)</function></literal>
        </entry>
        <entry><type>bigint</type></entry>
@@ -19700,7 +19707,8 @@ postgres=# SELECT * FROM 
pg_walfile_name_offset(pg_stop_backup());
     that are not currently summarized by the index; for any such range
     it creates a new summary index tuple by scanning the table pages.
     It returns the number of new page range summaries that were inserted
-    into the index.
+    into the index.  <function>brin_summarize_range</> does the same, except
+    it only summarizes the range that covers the given block number.
    </para>
 
    <para>
diff --git a/doc/src/sgml/ref/create_index.sgml 
b/doc/src/sgml/ref/create_index.sgml
index 7163b03..83ee7d3 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -382,7 +382,7 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS 
] <replaceable class=
    </variablelist>
 
    <para>
-    <acronym>BRIN</> indexes accept a different parameter:
+    <acronym>BRIN</> indexes accept different parameters:
    </para>
 
    <variablelist>
@@ -396,6 +396,16 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS 
] <replaceable class=
     </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><literal>autosummarize</></term>
+    <listitem>
+    <para>
+     Defines whether a summarization run is invoked for the previous page
+     range whenever an insertion is detected on the next one.
+    </para>
+    </listitem>
+   </varlistentry>
    </variablelist>
   </refsect2>
 
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index b22563b..707d04e 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -26,6 +26,7 @@
 #include "catalog/pg_am.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "utils/builtins.h"
@@ -60,10 +61,12 @@ typedef struct BrinOpaque
        BrinDesc   *bo_bdesc;
 } BrinOpaque;
 
+#define BRIN_ALL_BLOCKRANGES   InvalidBlockNumber
+
 static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
                                                   BrinRevmap *revmap, 
BlockNumber pagesPerRange);
 static void terminate_brin_buildstate(BrinBuildState *state);
-static void brinsummarize(Relation index, Relation heapRel,
+static void brinsummarize(Relation index, Relation heapRel, BlockNumber 
pageRange,
                          double *numSummarized, double *numExisting);
 static void form_and_insert_tuple(BrinBuildState *state);
 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
@@ -126,8 +129,11 @@ brinhandler(PG_FUNCTION_ARGS)
  * with those of the new tuple.  If the tuple values are not consistent with
  * the summary tuple, we need to update the index tuple.
  *
+ * If autosummarization is enabled, check if we need to summarize the previous
+ * page range.
+ *
  * If the range is not currently summarized (i.e. the revmap returns NULL for
- * it), there's nothing to do.
+ * it), there's nothing to do for this tuple.
  */
 bool
 brininsert(Relation idxRel, Datum *values, bool *nulls,
@@ -136,30 +142,57 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
                   IndexInfo *indexInfo)
 {
        BlockNumber pagesPerRange;
+       BlockNumber     origHeapBlk;
+       BlockNumber     heapBlk;
        BrinDesc   *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
        BrinRevmap *revmap;
        Buffer          buf = InvalidBuffer;
        MemoryContext tupcxt = NULL;
        MemoryContext oldcxt = CurrentMemoryContext;
+       bool            autosummarize = BrinGetAutoSummarize(idxRel);
 
        revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
 
+       /*
+        * origHeapBlk is the block number where the insertion occurred.  
heapBlk
+        * is the first block in the corresponding page range.
+        */
+       origHeapBlk = ItemPointerGetBlockNumber(heaptid);
+       heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
+
        for (;;)
        {
                bool            need_insert = false;
                OffsetNumber off;
-               BrinTuple  *brtup;
+               BrinTuple  *brtup = NULL;
                BrinMemTuple *dtup;
-               BlockNumber heapBlk;
                int                     keyno;
 
                CHECK_FOR_INTERRUPTS();
 
-               heapBlk = ItemPointerGetBlockNumber(heaptid);
-               /* normalize the block number to be the first block in the 
range */
-               heapBlk = (heapBlk / pagesPerRange) * pagesPerRange;
-               brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, 
NULL,
-                                                                               
 BUFFER_LOCK_SHARE, NULL);
+               /*
+                * If auto-summarization is enabled and we just inserted the 
first
+                * tuple into the first block of a new non-first page range, 
request a
+                * summarization run of the previous range.
+                */
+               if (autosummarize &&
+                       heapBlk > 0 &&
+                       heapBlk == origHeapBlk &&
+                       ItemPointerGetOffsetNumber(heaptid) == 
FirstOffsetNumber)
+               {
+                       BlockNumber lastPageRange = heapBlk - 1;
+
+                       brtup = brinGetTupleForHeapBlock(revmap, lastPageRange, 
&buf, &off, NULL,
+                                                                               
         BUFFER_LOCK_SHARE, NULL);
+                       if (!brtup)
+                               AutoVacuumRequestWork(AVW_BRINSummarizeRange,
+                                                                         
RelationGetRelid(idxRel),
+                                                                         
lastPageRange);
+               }
+
+               if (!brtup)
+                       brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, 
&off,
+                                                                               
         NULL, BUFFER_LOCK_SHARE, NULL);
 
                /* if range is unsummarized, there's nothing to do */
                if (!brtup)
@@ -664,9 +697,6 @@ brinbuild(Relation heap, Relation index, IndexInfo 
*indexInfo)
        reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
                                                                   
brinbuildCallback, (void *) state);
 
-       /* process the final batch */
-       form_and_insert_tuple(state);
-
        /* release resources */
        idxtuples = state->bs_numtuples;
        brinRevmapTerminate(state->bs_rmAccess);
@@ -747,7 +777,7 @@ brinvacuumcleanup(IndexVacuumInfo *info, 
IndexBulkDeleteResult *stats)
 
        brin_vacuum_scan(info->index, info->strategy);
 
-       brinsummarize(info->index, heapRel,
+       brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES,
                                  &stats->num_index_tuples, 
&stats->num_index_tuples);
 
        heap_close(heapRel, AccessShareLock);
@@ -765,7 +795,8 @@ brinoptions(Datum reloptions, bool validate)
        BrinOptions *rdopts;
        int                     numoptions;
        static const relopt_parse_elt tab[] = {
-               {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, 
pagesPerRange)}
+               {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, 
pagesPerRange)},
+               {"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, 
autosummarize)}
        };
 
        options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
@@ -792,12 +823,35 @@ brinoptions(Datum reloptions, bool validate)
 Datum
 brin_summarize_new_values(PG_FUNCTION_ARGS)
 {
+       Datum   relation = PG_GETARG_DATUM(0);
+
+       return DirectFunctionCall2(brin_summarize_range,
+                                                          relation,
+                                                          
Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
+}
+
+/*
+ * SQL-callable function to summarize the indicated page range, if not already
+ * summarized.  If the second argument is BRIN_ALL_BLOCKRANGES, all
+ * unsummarized ranges are summarized.
+ */
+Datum
+brin_summarize_range(PG_FUNCTION_ARGS)
+{
        Oid                     indexoid = PG_GETARG_OID(0);
+       int64           heapBlk64 = PG_GETARG_INT64(1);
+       BlockNumber     heapBlk;
        Oid                     heapoid;
        Relation        indexRel;
        Relation        heapRel;
        double          numSummarized = 0;
 
+       if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+                                errmsg("invalid block number " INT64_FORMAT, 
heapBlk64)));
+       heapBlk = (BlockNumber) heapBlk64;
+
        /*
         * We must lock table before index to avoid deadlocks.  However, if the
         * passed indexoid isn't an index then IndexGetRelation() will fail.
@@ -837,7 +891,7 @@ brin_summarize_new_values(PG_FUNCTION_ARGS)
                                                
RelationGetRelationName(indexRel))));
 
        /* OK, do it */
-       brinsummarize(indexRel, heapRel, &numSummarized, NULL);
+       brinsummarize(indexRel, heapRel, heapBlk, &numSummarized, NULL);
 
        relation_close(indexRel, ShareUpdateExclusiveLock);
        relation_close(heapRel, ShareUpdateExclusiveLock);
@@ -1063,17 +1117,17 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState 
*state, Relation heapRel,
 }
 
 /*
- * Scan a complete BRIN index, and summarize each page range that's not already
- * summarized.  The index and heap must have been locked by caller in at
- * least ShareUpdateExclusiveLock mode.
+ * Summarize page ranges that are not already summarized.  If pageRange is
+ * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
+ * page range containing the given heap page number is scanned.
  *
  * For each new index tuple inserted, *numSummarized (if not NULL) is
  * incremented; for each existing tuple, *numExisting (if not NULL) is
  * incremented.
  */
 static void
-brinsummarize(Relation index, Relation heapRel, double *numSummarized,
-                         double *numExisting)
+brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
+                         double *numSummarized, double *numExisting)
 {
        BrinRevmap *revmap;
        BrinBuildState *state = NULL;
@@ -1082,15 +1136,40 @@ brinsummarize(Relation index, Relation heapRel, double 
*numSummarized,
        BlockNumber heapBlk;
        BlockNumber pagesPerRange;
        Buffer          buf;
+       BlockNumber startBlk;
+       BlockNumber endBlk;
+
+       /* determine range of pages to process; nothing to do for an empty 
table */
+       heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
+       if (heapNumBlocks == 0)
+               return;
 
        revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 
+       if (pageRange == BRIN_ALL_BLOCKRANGES)
+       {
+               startBlk = 0;
+               endBlk = heapNumBlocks;
+       }
+       else
+       {
+               startBlk = (pageRange / pagesPerRange) * pagesPerRange;
+               /* Nothing to do if start point is beyond end of table */
+               if (startBlk > heapNumBlocks)
+               {
+                       brinRevmapTerminate(revmap);
+                       return;
+               }
+               endBlk = startBlk + pagesPerRange;
+               if (endBlk > heapNumBlocks)
+                       endBlk = heapNumBlocks;
+       }
+
        /*
         * Scan the revmap to find unsummarized items.
         */
        buf = InvalidBuffer;
-       heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
-       for (heapBlk = 0; heapBlk < heapNumBlocks; heapBlk += pagesPerRange)
+       for (heapBlk = startBlk; heapBlk < endBlk; heapBlk += pagesPerRange)
        {
                BrinTuple  *tup;
                OffsetNumber off;
diff --git a/src/backend/access/brin/brin_revmap.c 
b/src/backend/access/brin/brin_revmap.c
index 0de6999..3937ffd 100644
--- a/src/backend/access/brin/brin_revmap.c
+++ b/src/backend/access/brin/brin_revmap.c
@@ -205,7 +205,11 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber 
heapBlk,
        /* normalize the heap block number to be the first page in the range */
        heapBlk = (heapBlk / revmap->rm_pagesPerRange) * 
revmap->rm_pagesPerRange;
 
-       /* Compute the revmap page number we need */
+       /*
+        * Compute the revmap page number we need.  If Invalid is returned 
(i.e.,
+        * the revmap page hasn't been created yet), the requested page range is
+        * not summarized.
+        */
        mapBlk = revmap_get_blkno(revmap, heapBlk);
        if (mapBlk == InvalidBlockNumber)
        {
@@ -281,13 +285,13 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber 
heapBlk,
                        {
                                tup = (BrinTuple *) PageGetItem(page, lp);
 
-                               if (tup->bt_blkno == heapBlk)
-                               {
-                                       if (size)
-                                               *size = ItemIdGetLength(lp);
-                                       /* found it! */
-                                       return tup;
-                               }
+                               if (tup->bt_blkno != heapBlk)
+                                       elog(ERROR, "expected blkno %u, got 
%u", heapBlk, tup->bt_blkno);
+
+                               if (size)
+                                       *size = ItemIdGetLength(lp);
+                               /* found it! */
+                               return tup;
                        }
                }
 
diff --git a/src/backend/access/common/reloptions.c 
b/src/backend/access/common/reloptions.c
index 72e1253..9da287d 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -94,6 +94,15 @@ static relopt_bool boolRelOpts[] =
 {
        {
                {
+                       "autosummarize",
+                       "Enables automatic summarization on this BRIN index",
+                       RELOPT_KIND_BRIN,
+                       AccessExclusiveLock
+               },
+               false
+       },
+       {
+               {
                        "autovacuum_enabled",
                        "Enables autovacuum in this relation",
                        RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
diff --git a/src/backend/postmaster/autovacuum.c 
b/src/backend/postmaster/autovacuum.c
index 33ca749..6f4b6e8 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -92,7 +92,9 @@
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "tcop/tcopprot.h"
+#include "utils/dsa.h"
 #include "utils/fmgroids.h"
+#include "utils/fmgrprotos.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -252,9 +254,10 @@ typedef enum
  * av_runningWorkers the WorkerInfo non-free queue
  * av_startingWorker pointer to WorkerInfo currently being started (cleared by
  *                                     the worker itself as soon as it's up 
and running)
+ * av_dsa_handle       handle for allocatable shared memory
  *
  * This struct is protected by AutovacuumLock, except for av_signal and parts
- * of the worker list (see above).
+ * of the worker list (see above).  av_dsa_handle is readable unlocked.
  *-------------
  */
 typedef struct
@@ -264,6 +267,8 @@ typedef struct
        dlist_head      av_freeWorkers;
        dlist_head      av_runningWorkers;
        WorkerInfo      av_startingWorker;
+       dsa_handle      av_dsa_handle;
+       dsa_pointer     av_workitems;
 } AutoVacuumShmemStruct;
 
 static AutoVacuumShmemStruct *AutoVacuumShmem;
@@ -278,6 +283,32 @@ static MemoryContext DatabaseListCxt = NULL;
 /* Pointer to my own WorkerInfo, valid on each worker */
 static WorkerInfo MyWorkerInfo = NULL;
 
+/*
+ * Autovacuum workitem array, stored in AutoVacuumShmem->av_workitems.  This
+ * list is mostly protected by AutovacuumLock, except that if it's marked
+ * 'active' other processes must not modify the work-identifying members,
+ * though changing the list pointers is okay.
+ */
+typedef struct AutoVacuumWorkItem
+{
+       AutoVacuumWorkItemType avw_type;
+       Oid                     avw_database;
+       Oid                     avw_relation;
+       BlockNumber     avw_blockNumber;
+       bool            avw_active;
+       dsa_pointer     avw_next;       /* doubly linked list pointers */
+       dsa_pointer     avw_prev;
+} AutoVacuumWorkItem;
+
+#define NUM_WORKITEMS  256
+typedef struct
+{
+       dsa_pointer             avs_usedItems;
+       dsa_pointer             avs_freeItems;
+} AutovacWorkItems;
+
+static dsa_area        *AutoVacuumDSA = NULL;
+
 /* PID of launcher, valid only in worker while shutting down */
 int                    AutovacuumLauncherPid = 0;
 
@@ -316,11 +347,16 @@ static AutoVacOpts *extract_autovac_opts(HeapTuple tup,
 static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
                                                  PgStat_StatDBEntry *shared,
                                                  PgStat_StatDBEntry *dbentry);
+static void perform_work_item(AutoVacuumWorkItem *workitem);
 static void autovac_report_activity(autovac_table *tab);
+static void autovac_report_workitem(AutoVacuumWorkItem *workitem,
+                                               const char *nspname, const char 
*relname);
 static void av_sighup_handler(SIGNAL_ARGS);
 static void avl_sigusr2_handler(SIGNAL_ARGS);
 static void avl_sigterm_handler(SIGNAL_ARGS);
 static void autovac_refresh_stats(void);
+static void remove_wi_from_list(dsa_pointer *list, dsa_pointer wi_ptr);
+static void add_wi_to_list(dsa_pointer *list, dsa_pointer wi_ptr);
 
 
 
@@ -574,6 +610,28 @@ AutoVacLauncherMain(int argc, char *argv[])
         */
        rebuild_database_list(InvalidOid);
 
+       /*
+        * Set up our DSA so that backends can install work-item requests.  It 
may
+        * already exist as created by a previous launcher.
+        */
+       if (!AutoVacuumShmem->av_dsa_handle)
+       {
+               LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+               AutoVacuumDSA = dsa_create(LWTRANCHE_AUTOVACUUM);
+               /* make sure it doesn't go away even if we do */
+               dsa_pin(AutoVacuumDSA);
+               dsa_pin_mapping(AutoVacuumDSA);
+               AutoVacuumShmem->av_dsa_handle = dsa_get_handle(AutoVacuumDSA);
+               /* delay array allocation until first request */
+               AutoVacuumShmem->av_workitems = InvalidDsaPointer;
+               LWLockRelease(AutovacuumLock);
+       }
+       else
+       {
+               AutoVacuumDSA = dsa_attach(AutoVacuumShmem->av_dsa_handle);
+               dsa_pin_mapping(AutoVacuumDSA);
+       }
+
        /* loop until shutdown request */
        while (!got_SIGTERM)
        {
@@ -1617,6 +1675,14 @@ AutoVacWorkerMain(int argc, char *argv[])
        {
                char            dbname[NAMEDATALEN];
 
+               if (AutoVacuumShmem->av_dsa_handle)
+               {
+                       /* First use of DSA in this worker, so attach to it */
+                       Assert(!AutoVacuumDSA);
+                       AutoVacuumDSA = 
dsa_attach(AutoVacuumShmem->av_dsa_handle);
+                       dsa_pin_mapping(AutoVacuumDSA);
+               }
+
                /*
                 * Report autovac startup to the stats collector.  We 
deliberately do
                 * this before InitPostgres, so that the last_autovac_time will 
get
@@ -2467,6 +2533,69 @@ deleted:
        }
 
        /*
+        * Perform additional work items, as requested by backends.
+        */
+       if (AutoVacuumShmem->av_workitems)
+       {
+               dsa_pointer             wi_ptr;
+               AutovacWorkItems *workitems;
+
+               LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+               /*
+                * Scan the list of pending items, and process the inactive 
ones in our
+                * database.
+                */
+               workitems = (AutovacWorkItems *)
+                       dsa_get_address(AutoVacuumDSA, 
AutoVacuumShmem->av_workitems);
+               wi_ptr = workitems->avs_usedItems;
+
+               while (wi_ptr != InvalidDsaPointer)
+               {
+                       AutoVacuumWorkItem      *workitem;
+
+                       workitem = (AutoVacuumWorkItem *)
+                               dsa_get_address(AutoVacuumDSA, wi_ptr);
+
+                       if (workitem->avw_database == MyDatabaseId && 
!workitem->avw_active)
+                       {
+                               dsa_pointer             next_ptr;
+
+                               /* claim this one */
+                               workitem->avw_active = true;
+
+                               LWLockRelease(AutovacuumLock);
+
+                               perform_work_item(workitem);
+
+                               /*
+                                * Check for config changes before acquiring 
lock for further
+                                * jobs.
+                                */
+                               CHECK_FOR_INTERRUPTS();
+                               if (got_SIGHUP)
+                               {
+                                       got_SIGHUP = false;
+                                       ProcessConfigFile(PGC_SIGHUP);
+                               }
+
+                               LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+                               /* Put the array item back for the next user */
+                               next_ptr = workitem->avw_next;
+                               remove_wi_from_list(&workitems->avs_usedItems, 
wi_ptr);
+                               add_wi_to_list(&workitems->avs_freeItems, 
wi_ptr);
+                               wi_ptr = next_ptr;
+                       }
+                       else
+                               wi_ptr = workitem->avw_next;
+               }
+
+               /* all done */
+               LWLockRelease(AutovacuumLock);
+       }
+
+       /*
         * We leak table_toast_map here (among other things), but since we're
         * going away soon, it's not a problem.
         */
@@ -2499,6 +2628,103 @@ deleted:
 }
 
 /*
+ * Execute a previously registered work item.
+ */
+static void
+perform_work_item(AutoVacuumWorkItem *workitem)
+{
+       char       *cur_datname = NULL;
+       char       *cur_nspname = NULL;
+       char       *cur_relname = NULL;
+
+       /*
+        * Note we do not store table info in MyWorkerInfo, since this is not
+        * vacuuming proper.
+        */
+
+       /*
+        * Save the relation name for a possible error message, to avoid a
+        * catalog lookup in case of an error.  If any of these return NULL,
+        * then the relation has been dropped since last we checked; skip it.
+        * Note: they must live in a long-lived memory context because we call
+        * vacuum and analyze in different transactions.
+        */
+
+       cur_relname = get_rel_name(workitem->avw_relation);
+       cur_nspname = 
get_namespace_name(get_rel_namespace(workitem->avw_relation));
+       cur_datname = get_database_name(MyDatabaseId);
+       if (!cur_relname || !cur_nspname || !cur_datname)
+               goto deleted2;
+
+       autovac_report_workitem(workitem, cur_nspname, cur_datname);
+
+       /*
+        * We will abort the current work item if something errors out, and
+        * continue with the next one; in particular, this happens if we are
+        * interrupted with SIGINT.  Note that this means that the work item 
list
+        * can be lossy.
+        */
+       PG_TRY();
+       {
+               /* have at it */
+               MemoryContextSwitchTo(TopTransactionContext);
+
+               switch (workitem->avw_type)
+               {
+                       case AVW_BRINSummarizeRange:
+                               DirectFunctionCall2(brin_summarize_range,
+                                                                       
ObjectIdGetDatum(workitem->avw_relation),
+                                                                       
Int64GetDatum((int64) workitem->avw_blockNumber));
+                               break;
+                       default:
+                               elog(WARNING, "unrecognized work item found: 
type %d",
+                                        workitem->avw_type);
+                               break;
+               }
+
+               /*
+                * Clear a possible query-cancel signal, to avoid a late 
reaction
+                * to an automatically-sent signal because of vacuuming the
+                * current table (we're done with it, so it would make no sense 
to
+                * cancel at this point.)
+                */
+               QueryCancelPending = false;
+       }
+       PG_CATCH();
+       {
+               /*
+                * Abort the transaction, start a new one, and proceed with the
+                * next table in our list.
+                */
+               HOLD_INTERRUPTS();
+               errcontext("processing work entry for relation \"%s.%s.%s\"",
+                                  cur_datname, cur_nspname, cur_relname);
+               EmitErrorReport();
+
+               /* this resets the PGXACT flags too */
+               AbortOutOfAnyTransaction();
+               FlushErrorState();
+               MemoryContextResetAndDeleteChildren(PortalContext);
+
+               /* restart our transaction for the following operations */
+               StartTransactionCommand();
+               RESUME_INTERRUPTS();
+       }
+       PG_END_TRY();
+
+       /* We intentionally do not set did_vacuum here */
+
+       /* be tidy */
+deleted2:
+       if (cur_datname)
+               pfree(cur_datname);
+       if (cur_nspname)
+               pfree(cur_nspname);
+       if (cur_relname)
+               pfree(cur_relname);
+}
+
+/*
  * extract_autovac_opts
  *
  * Given a relation's pg_class tuple, return the AutoVacOpts portion of
@@ -2946,6 +3172,45 @@ autovac_report_activity(autovac_table *tab)
 }
 
 /*
+ * autovac_report_workitem
+ *             Report to pgstat that autovacuum is processing a work item
+ */
+static void
+autovac_report_workitem(AutoVacuumWorkItem *workitem,
+                                               const char *nspname, const char 
*relname)
+{
+       char    activity[MAX_AUTOVAC_ACTIV_LEN + 12 + 2];
+       char    blk[12 + 2];
+       int             len;
+
+       switch (workitem->avw_type)
+       {
+               case AVW_BRINSummarizeRange:
+                       snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
+                                        "autovacuum: BRIN summarize");
+                       break;
+       }
+
+       /*
+        * Report the qualified name of the relation, and the block number if 
any
+        */
+       len = strlen(activity);
+
+       if (BlockNumberIsValid(workitem->avw_blockNumber))
+               snprintf(blk, sizeof(blk), " %u", workitem->avw_blockNumber);
+       else
+               blk[0] = '\0';
+
+       snprintf(activity + len, MAX_AUTOVAC_ACTIV_LEN - len,
+                        " %s.%s%s", nspname, relname, blk);
+
+       /* Set statement_timestamp() to current time for pg_stat_activity */
+       SetCurrentStatementStartTimestamp();
+
+       pgstat_report_activity(STATE_RUNNING, activity);
+}
+
+/*
  * AutoVacuumingActive
  *             Check GUC vars and report whether the autovacuum process should 
be
  *             running.
@@ -2959,6 +3224,113 @@ AutoVacuumingActive(void)
 }
 
 /*
+ * Request one work item to the next autovacuum run processing our database.
+ */
+void
+AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId,
+                                         BlockNumber blkno)
+{
+       AutovacWorkItems *workitems;
+       dsa_pointer             wi_ptr;
+       AutoVacuumWorkItem *workitem;
+
+       LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+       /*
+        * It may be useful to de-duplicate the list upon insertion.  For the 
only
+        * currently existing caller, this is not necessary.
+        */
+
+       /* First use in this process?  Set up DSA */
+       if (!AutoVacuumDSA)
+       {
+               if (!AutoVacuumShmem->av_dsa_handle)
+               {
+                       /* autovacuum launcher not started; nothing can be done 
*/
+                       LWLockRelease(AutovacuumLock);
+                       return;
+               }
+               AutoVacuumDSA = dsa_attach(AutoVacuumShmem->av_dsa_handle);
+               dsa_pin_mapping(AutoVacuumDSA);
+       }
+
+       /* First use overall?  Allocate work items array */
+       if (AutoVacuumShmem->av_workitems == InvalidDsaPointer)
+       {
+               int             i;
+               AutovacWorkItems *workitems;
+
+               AutoVacuumShmem->av_workitems =
+                       dsa_allocate_extended(AutoVacuumDSA,
+                                                                 
sizeof(AutovacWorkItems) +
+                                                                 NUM_WORKITEMS 
* sizeof(AutoVacuumWorkItem),
+                                                                 
DSA_ALLOC_NO_OOM);
+               /* if out of memory, silently disregard the request */
+               if (AutoVacuumShmem->av_workitems == InvalidDsaPointer)
+               {
+                       LWLockRelease(AutovacuumLock);
+                       dsa_detach(AutoVacuumDSA);
+                       AutoVacuumDSA = NULL;
+                       return;
+               }
+
+               /* Initialize each array entry as a member of the free list */
+               workitems = dsa_get_address(AutoVacuumDSA, 
AutoVacuumShmem->av_workitems);
+
+               workitems->avs_usedItems = InvalidDsaPointer;
+               workitems->avs_freeItems = InvalidDsaPointer;
+               for (i = 0; i < NUM_WORKITEMS; i++)
+               {
+                       /* XXX surely there is a simpler way to do this */
+                       wi_ptr = AutoVacuumShmem->av_workitems + 
sizeof(AutovacWorkItems) +
+                               sizeof(AutoVacuumWorkItem) * i;
+                       workitem = (AutoVacuumWorkItem *) 
dsa_get_address(AutoVacuumDSA, wi_ptr);
+
+                       workitem->avw_type = 0;
+                       workitem->avw_database = InvalidOid;
+                       workitem->avw_relation = InvalidOid;
+                       workitem->avw_active = false;
+
+                       /* put this item in the free list */
+                       workitem->avw_next = workitems->avs_freeItems;
+                       workitems->avs_freeItems = wi_ptr;
+               }
+       }
+
+       workitems = (AutovacWorkItems *)
+               dsa_get_address(AutoVacuumDSA, AutoVacuumShmem->av_workitems);
+
+       /* If array is full, disregard the request */
+       if (workitems->avs_freeItems == InvalidDsaPointer)
+       {
+               LWLockRelease(AutovacuumLock);
+               dsa_detach(AutoVacuumDSA);
+               AutoVacuumDSA = NULL;
+               return;
+       }
+
+       /* remove workitem struct from free list ... */
+       wi_ptr = workitems->avs_freeItems;
+       remove_wi_from_list(&workitems->avs_freeItems, wi_ptr);
+
+       /* ... initialize it ... */
+       workitem = dsa_get_address(AutoVacuumDSA, wi_ptr);
+       workitem->avw_type = type;
+       workitem->avw_database = MyDatabaseId;
+       workitem->avw_relation = relationId;
+       workitem->avw_blockNumber = blkno;
+       workitem->avw_active = false;
+
+       /* ... and put it on autovacuum's to-do list */
+       add_wi_to_list(&workitems->avs_usedItems, wi_ptr);
+
+       LWLockRelease(AutovacuumLock);
+
+       dsa_detach(AutoVacuumDSA);
+       AutoVacuumDSA = NULL;
+}
+
+/*
  * autovac_init
  *             This is called at postmaster initialization.
  *
@@ -3079,3 +3451,59 @@ autovac_refresh_stats(void)
 
        pgstat_clear_snapshot();
 }
+
+/*
+ * Simplistic open-coded list implementation for objects stored in DSA.
+ * Each item is doubly linked, but we have no tail pointer, and the "prev"
+ * element of the first item is null, not the list.
+ */
+
+/*
+ * Remove a work item from the given list.
+ */
+static void
+remove_wi_from_list(dsa_pointer *list, dsa_pointer wi_ptr)
+{
+       AutoVacuumWorkItem *workitem = dsa_get_address(AutoVacuumDSA, wi_ptr);
+       dsa_pointer             next = workitem->avw_next;
+       dsa_pointer             prev = workitem->avw_prev;
+
+       workitem->avw_next = workitem->avw_prev = InvalidDsaPointer;
+
+       if (next != InvalidDsaPointer)
+       {
+               workitem = dsa_get_address(AutoVacuumDSA, next);
+               workitem->avw_prev = prev;
+       }
+
+       if (prev != InvalidDsaPointer)
+       {
+               workitem = dsa_get_address(AutoVacuumDSA, prev);
+               workitem->avw_next = next;
+       }
+       else
+               *list = next;
+}
+
+/*
+ * Add a workitem to the given list
+ */
+static void
+add_wi_to_list(dsa_pointer *list, dsa_pointer wi_ptr)
+{
+       if (*list == InvalidDsaPointer)
+       {
+               /* list is empty; item is now singleton */
+               *list = wi_ptr;
+       }
+       else
+       {
+               AutoVacuumWorkItem *workitem = dsa_get_address(AutoVacuumDSA, 
wi_ptr);
+               AutoVacuumWorkItem *old = dsa_get_address(AutoVacuumDSA, *list);
+
+               /* Put item at head of list */
+               workitem->avw_next = *list;
+               old->avw_prev = wi_ptr;
+               *list = wi_ptr;
+       }
+}
diff --git a/src/backend/storage/lmgr/lwlock.c 
b/src/backend/storage/lmgr/lwlock.c
index 3e13394..c4313a5 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
 
        if (LWLockTrancheArray == NULL)
        {
-               LWLockTranchesAllocated = 64;
+               LWLockTranchesAllocated = 72;
                LWLockTrancheArray = (char **)
                        MemoryContextAllocZero(TopMemoryContext,
                                                  LWLockTranchesAllocated * 
sizeof(char *));
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index 896824a..3f4c29b 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -22,6 +22,7 @@ typedef struct BrinOptions
 {
        int32           vl_len_;                /* varlena header (do not touch 
directly!) */
        BlockNumber pagesPerRange;
+       bool            autosummarize;
 } BrinOptions;
 
 #define BRIN_DEFAULT_PAGES_PER_RANGE   128
@@ -29,5 +30,9 @@ typedef struct BrinOptions
        ((relation)->rd_options ? \
         ((BrinOptions *) (relation)->rd_options)->pagesPerRange : \
          BRIN_DEFAULT_PAGES_PER_RANGE)
+#define BrinGetAutoSummarize(relation) \
+       ((relation)->rd_options ? \
+        ((BrinOptions *) (relation)->rd_options)->autosummarize : \
+         false)
 
 #endif   /* BRIN_H */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 1132a60..1b7ab2a 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -564,6 +564,8 @@ DATA(insert OID = 335 (  brinhandler        PGNSP PGUID 12 
1 0 0 0 f f f f t f v s 1 0
 DESCR("brin index access method handler");
 DATA(insert OID = 3952 (  brin_summarize_new_values PGNSP PGUID 12 1 0 0 0 f f 
f f t f v s 1 0 23 "2205" _null_ _null_ _null_ _null_ _null_ 
brin_summarize_new_values _null_ _null_ _null_ ));
 DESCR("brin: standalone scan new table pages");
+DATA(insert OID = 3999 (  brin_summarize_range PGNSP PGUID 12 1 0 0 0 f f f f 
t f v s 2 0 23 "2205 20" _null_ _null_ _null_ _null_ _null_ 
brin_summarize_range _null_ _null_ _null_ ));
+DESCR("brin: standalone scan new table pages");
 
 DATA(insert OID = 338 (  amvalidate            PGNSP PGUID 12 1 0 0 0 f f f f 
t f v s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_   amvalidate _null_ 
_null_ _null_ ));
 DESCR("validate an operator class");
diff --git a/src/include/postmaster/autovacuum.h 
b/src/include/postmaster/autovacuum.h
index 99d7f09..174e91a 100644
--- a/src/include/postmaster/autovacuum.h
+++ b/src/include/postmaster/autovacuum.h
@@ -14,6 +14,15 @@
 #ifndef AUTOVACUUM_H
 #define AUTOVACUUM_H
 
+/*
+ * Other processes can request specific work from autovacuum, identified by
+ * AutoVacuumWorkItem elements.
+ */
+typedef enum
+{
+       AVW_BRINSummarizeRange
+} AutoVacuumWorkItemType;
+
 
 /* GUC variables */
 extern bool autovacuum_start_daemon;
@@ -60,6 +69,9 @@ extern void AutovacuumWorkerIAm(void);
 extern void AutovacuumLauncherIAm(void);
 #endif
 
+extern void AutoVacuumRequestWork(AutoVacuumWorkItemType type,
+                                         Oid relationId, BlockNumber blkno);
+
 /* shared memory stuff */
 extern Size AutoVacuumShmemSize(void);
 extern void AutoVacuumShmemInit(void);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 0cd45bb..9105f3d 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -211,6 +211,7 @@ typedef enum BuiltinTrancheIds
        LWTRANCHE_BUFFER_MAPPING,
        LWTRANCHE_LOCK_MANAGER,
        LWTRANCHE_PREDICATE_LOCK_MANAGER,
+       LWTRANCHE_AUTOVACUUM,
        LWTRANCHE_PARALLEL_QUERY_DSA,
        LWTRANCHE_TBM,
        LWTRANCHE_FIRST_USER_DEFINED
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to