On Fri, Apr 03, 2020 at 12:24:50PM +0900, Masahiko Sawada wrote:
> On Sat, 28 Mar 2020 at 21:19, Julien Rouhaud <rjuju...@gmail.com> wrote:
> >
> The current patch still checks SearchSysCacheExists1() before
> relation_open. Why do we need to call SearchSysCacheExists1() here? I
> think if the given relation doesn't exist, relation_open() will raise
> an error "could not open relation with OID %u".
> 
> +   /* Open the relation if it exists.  */
> +   if (SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
> +   {
> +       relation = relation_open(relid, AccessShareLock);
> +   }

Oops yes sorry about that.  Fixed.


> > > 8.
> > > +       if (PageIsVerified(buffer, blkno))
> > > +       {
> > > +           /*
> > > +            * If the page is really new, there won't by any checksum to 
> > > be
> > > +            * computed or expected.
> > > +            */
> > > +           *chk_expected = *chk_found = NoComputedChecksum;
> > > +           return true;
> > > +       }
> > > +       else
> > > +       {
> > > +           /*
> > > +            * There's a corruption, but since this affect PageIsNew, we
> > > +            * can't compute a checksum, so set NoComputedChecksum for the
> > > +            * expected checksum.
> > > +            */
> > > +           *chk_expected = NoComputedChecksum;
> > > +           *chk_found = hdr->pd_checksum;
> > > +       }
> > > +       return false;
> > >
> > > * I think the 'else' is not necessary here.
> >
> > AFAICT it's, see below.
> >
> > > * Setting *chk_expected and *chk_found seems useless when we return
> > > true. The caller doesn't use them.
> >
> > Indeed, fixed.
> 
> The patch still sets values to both?
> 
> +       if (PageIsVerified(buffer, blkno))
> +       {
> +           /* No corruption. */
> +           *chk_expected = *chk_found = NoComputedChecksum;
> +           return true;
> +       }


Sorry again, fixed.


> > > * Should we forcibly overwrite ignore_checksum_failure to off in
> > > pg_check_relation()? Otherwise, this logic seems not work fine.
> > >
> > > * I don't understand why we cannot compute a checksum in case where a
> > > page looks like a new page but is actually corrupted. Could you please
> > > elaborate on that?
> >
> > PageIsVerified has a different behavior depending on whether the page looks 
> > new
> > or not.  If the page looks like new, it only checks that it's indeed a new
> > page, and otherwise try to verify the checksum.
> >
> > Also, pg_check_page() has an assert to make sure that the page isn't (or 
> > don't
> > look like) new.
> >
> > So it seems to me that the 'else' is required to properly detect a real or 
> > fake
> > PageIsNew, and try to compute checksums only when required.
> 
> Thank you for your explanation! I understand.
> 
> I thought we can arrange the code to something like:
> 
> if (PageIsNew(hdr))
> {
>     if (PageIsVerified(hdr))
>     {
>         *chk_expected = *chk_found = NoComputedChecksum;
>         return true;
>     }
> 
>     *chk_expected = NoComputedChecksum;
>     *chk_found = hdr->pd_checksum;
>     return false;
> }
> 
> But since it's not a critical problem you can ignore it.


I like it, so done!


> > > 8.
> > > +   {
> > > +       {"checksum_cost_page_hit", PGC_USERSET, RESOURCES_CHECKSUM_DELAY,
> > > +           gettext_noop("Checksum cost for a page found in the buffer 
> > > cache."),
> > > +           NULL
> > > +       },
> > > +       &ChecksumCostPageHit,
> > > +       1, 0, 10000,
> > > +       NULL, NULL, NULL
> > > +   },
> > >
> > > * There is no description about the newly added four GUC parameters in 
> > > the doc.
> > >
> > > * We need to put new GUC parameters into postgresql.conf.sample as well.
> >
> > Fixed both.
> >
> > > * The patch doesn't use checksum_cost_page_hit at all.
> >
> > Indeed, I also realized that while working on previous issues.  I removed it
> > and renamed checksum_cost_page_miss to checksum_cost_page.
> 
> Perhaps we can use checksum_cost_page_hit when we found the page in
> the shared buffer but it's marked as dirty?


The thing is that when the buffer is dirty, we won't do any additional check,
thus not adding any overhead.  What may be needed here is to account for the
locking overhead (in all cases), so that if all (or almost all) the buffers are
dirty and in shared buffers the execution can be throttled.  I don't know how
much an issue it can be, but if that's something to be fixes then page_hit
doesn't look like the right answer for that.


> I've read the latest patch and here is random comments:
> 
> 1.
> +       /*
> +        * Add a page miss cost, as we're always reading outside the shared
> +        * buffers.
> +        */
> +       /* Add a page cost. */
> +       ChecksumCostBalance += ChecksumCostPage;
> 
> There are duplicate comments.

Fixed.


> 2.
> +       /* Dirty pages are ignored as they'll be flushed soon. */
> +       if (buf_state & BM_DIRTY)
> +           checkit = false;
> 
> Should we check the buffer if it has BM_TAG_VALID as well here? I
> thought there might be a possibility that BufTableLookup() returns a
> buf_Id but its buffer tag is not valid for example when the previous
> read failed after inserting the buffer tag to the buffer table.


Good point, fixed.


> 3.
> +   /* Add a page cost. */
> +   ChecksumCostBalance += ChecksumCostPage;
> +
> +   return checkit;
> +}
> 
> The check_get_buffer() seems to be slightly complex to me but when we
> reached the end of this function we always return true. Similarly, in
> the case where we read the block while holding a partition lock we
> always return true as well. Is my understanding right? If so, it might
> be better to put some assertions.


Yes it's a little bit complex.  I used this approach to avoid the need to
release the locks all over the place, but maybe this doesn't really improve
things.  I added asserts and comments anyway as suggested, thanks.


> 4.
> @@ -10825,6 +10825,14 @@
>    proallargtypes => '{oid,text,int8,timestamptz}', proargmodes => 
> '{i,o,o,o}',
>    proargnames => '{tablespace,name,size,modification}',
>    prosrc => 'pg_ls_tmpdir_1arg' },
> +{ oid => '9147', descr => 'check data integrity for one or all relations',
> +  proname => 'pg_check_relation', proisstrict => 'f', procost => '10000',
> +  prorows => '20', proretset => 't', proparallel => 'r',
> +  provolatile => 'v', prorettype => 'record', proargtypes => 'regclass text',
> +  proallargtypes => '{regclass,text,oid,int4,int8,int4,int4}',
> +  proargmodes => '{i,i,o,o,o,o,o}',
> +  proargnames =>
> '{relation,fork,relid,forknum,failed_blocknum,expected_checksum,found_checksum}',
> +  prosrc => 'pg_check_relation' },
> 
> Why is the pg_check_relation() is not a strict function? I think
> prostrict can be 'true' for this function and we can drop checking if
> the first argument is NULL.


That's because the fork is still optional.  While this could be made mandatory
without much problems, I think we'll eventually want to add a way to check only
a subset of a fork, so it seemed to me that is wasn't worth changing that now.


> 5.
> +       memset(values, 0, sizeof(values));
> +       memset(nulls, 0, sizeof(nulls));
> 
> I think we can do memset right before setting values to them, that is,
> after checking (!found_in_sb && !force_lock).


Indeed, done!


> 6.
> +static bool
> +check_buffer(char *buffer, uint32 blkno, uint16 *chk_expected,
> +                  uint16 *chk_found)
> +{
> +   PageHeader  hdr = (PageHeader) buffer;
> +
> +   Assert(chk_expected && chk_found);
> +
> +   if (PageIsNew(hdr))
> +   {
> +       /*
> +        * Check if the page is really new or if there's a corruption that
> +        * affected PageIsNew detection.  Note that PageIsVerified won't try 
> to
> +        * detect checksum corruption in this case, so there's no risk of
> +        * duplicated corruption report.
> +        */
> +       if (PageIsVerified(buffer, blkno))
> 
> How about using Page instead of PageHeader? Looking at other codes,
> ISTM we usually pass Page to both PageIsNew() and PageIsVerified().


Agreed, done.


> 7.
> +       <entry>
> +        <literal><function>pg_check_relation(<parameter>relation</parameter>
> <type>oid</type>[, <parameter>fork</parameter>
> <type>text</type>])</function></literal>.
> +       </entry>
> 
> +{ oid => '9147', descr => 'check data integrity for one or all relations',
> +  proname => 'pg_check_relation', proisstrict => 'f', procost => '10000',
> +  prorows => '20', proretset => 't', proparallel => 'r',
> +  provolatile => 'v', prorettype => 'record', proargtypes => 'regclass text',
> +  proallargtypes => '{regclass,text,oid,int4,int8,int4,int4}',
> +  proargmodes => '{i,i,o,o,o,o,o}',
> +  proargnames =>
> '{relation,fork,relid,forknum,failed_blocknum,expected_checksum,found_checksum}',
> +  prosrc => 'pg_check_relation' },
> 
> The function argument data types don't match in the doc and function
> declaretion. relation is 'oid' in the doc but is 'regclass' in the
> function declaretion.


Fixed.


> 8.
> +#define SRF_COLS           5   /* Number of output arguments in the SRF */
> 
> Looking at similar built-in functions that return set of records they
> use a more specific name for the number of returned columns such as
> PG_STAT_GET_WAL_SENDERS_COLS and PG_GET_SHMEM_SIZES_COLS. How about
> PG_CHECK_RELATION_COLS?
> 
> check_relation_fork() seems to quite depends on pg_check_relation()
> because the returned tuplestore is specified by pg_check_relation().
> It's just an idea but to improve reusability, how about moving
> check_relation_fork() to checksumfunc.c? That is, in checksumfuncs.c
> while iterating all blocks we call a new function in checksum.c, say
> check_one_block() function, which has the following part and is
> responsible for getting, checking the specified block and returning a
> boolean indicating whether the block has corruption or not, along with
> chk_found and chk_expected:
> 
>         /*
>          * To avoid too much overhead, the buffer will be first read without
>          * the locks that would guarantee the lack of false positive, as such
>          * events should be quite rare.
>          */
> Retry:
>         if (!check_get_buffer(relation, forknum, blkno, buffer, force_lock,
>                               &found_in_sb))
>             continue;
> 
>         if (check_buffer(buffer, blkno, &chk_expected, &chk_found))
>             continue;
> 
>         /*
>          * If we get a failure and the buffer wasn't found in shared buffers,
>          * reread the buffer with suitable lock to avoid false positive.  See
>          * check_get_buffer for more details.
>          */
>         if (!found_in_sb && !force_lock)
>         {
>             force_lock = true;
>             goto Retry;
>         }
> 
> A new function in checksumfuncs.c or pg_check_relation will be
> responsible for storing the result to the tuplestore. That way,
> check_one_block() will be useful for other use when we want to check
> if the particular block has corruption with low overhead.


Yes, I agree that passing the tuplestore isn't an ideal approach and some
refactoring should probably happen.  One thing is that this wouldn't be
"check_one_block()" but "check_one_block_on_disk()" (which could also be from
the OS cache).  I'm not sure how useful it's in itself.  It also raises some
concerns about the throttling.  I didn't change that for now, but I hope
there'll be some other feedback about it.
>From ba7d80691ef6f34b0771fc72f38da7fc8a9c0a72 Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <jrouh...@vmware.com>
Date: Mon, 4 Nov 2019 08:40:23 +0100
Subject: [PATCH v6] Add a pg_check_relation() function.

This functions checks the validity of the checksums for all non-dirty blocks of
a given relation, and optionally a given fork, and returns the list of all
blocks that don't match, along with the expected and found checksums.

Author: Julien Rouhaud
Reviewed-by: Michael Paquier, Masahiko Sawada
Discussion: 
https://postgr.es/m/CAOBaU_aVvMjQn%3Dge5qPiJOPMmOj5%3Dii3st5Q0Y%2BWuLML5sR17w%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  85 ++++
 doc/src/sgml/func.sgml                        |  51 +++
 src/backend/catalog/system_views.sql          |   7 +
 src/backend/storage/page/checksum.c           | 418 +++++++++++++++++-
 src/backend/utils/adt/Makefile                |   1 +
 src/backend/utils/adt/checksumfuncs.c         |  96 ++++
 src/backend/utils/init/globals.c              |   7 +
 src/backend/utils/misc/guc.c                  |  33 ++
 src/backend/utils/misc/postgresql.conf.sample |   6 +
 src/include/catalog/pg_proc.dat               |   8 +
 src/include/miscadmin.h                       |   7 +
 src/include/storage/checksum.h                |   7 +
 src/include/utils/guc_tables.h                |   1 +
 src/test/Makefile                             |   3 +-
 src/test/check_relation/.gitignore            |   2 +
 src/test/check_relation/Makefile              |  23 +
 src/test/check_relation/README                |  23 +
 .../check_relation/t/01_checksums_check.pl    | 276 ++++++++++++
 18 files changed, 1050 insertions(+), 4 deletions(-)
 create mode 100644 src/backend/utils/adt/checksumfuncs.c
 create mode 100644 src/test/check_relation/.gitignore
 create mode 100644 src/test/check_relation/Makefile
 create mode 100644 src/test/check_relation/README
 create mode 100644 src/test/check_relation/t/01_checksums_check.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c4d6ed4bbc..259793650f 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2038,6 +2038,91 @@ include_dir 'conf.d'
      </note>
     </sect2>
 
+    <sect2 id="runtime-config-resource-checksum-verification-cost">
+     <title>Cost-based Checksum Verification Delay</title>
+
+     <para>
+      During the execution of <xref linkend="functions-check-relation-note"/>
+      function, the system maintains an internal counter that keeps track of
+      the estimated cost of the various I/O operations that are performed.
+      When the accumulated cost reaches a limit (specified by
+      <varname>checksum_cost_limit</varname>), the process performing the
+      operation will sleep for a short period of time, as specified by
+      <varname>checksum_cost_delay</varname>. Then it will reset the counter
+      and continue execution.
+     </para>
+
+     <para>
+      This feature is disabled by default. To enable it, set the
+      <varname>checksum_cost_delay</varname> variable to a nonzero
+      value.
+     </para>
+
+     <variablelist>
+      <varlistentry id="guc-checksum-cost-delay" 
xreflabel="checksum_cost_delay">
+       <term><varname>checksum_cost_delay</varname> (<type>floating 
point</type>)
+       <indexterm>
+        <primary><varname>checksum_cost_delay</varname> configuration 
parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         The amount of time that the process will sleep
+         when the cost limit has been exceeded.
+         If this value is specified without units, it is taken as milliseconds.
+         The default value is zero, which disables the cost-based checksum
+         verification delay feature.  Positive values enable cost-based
+         checksum verification.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry id="guc-checksum-cost-page" xreflabel="checksum_cost_page">
+       <term><varname>checksum_cost_page</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>checksum_cost_page</varname> configuration 
parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         The estimated cost for verifying a buffer, whether it's found in the
+         shared buffer cache or not. It represents the cost to lock the buffer
+         pool, lookup the shared hash table, read the content of the page from
+         disk and compute its checksum.  The default value is 10.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry id="guc-checksum-cost-limit" 
xreflabel="checksum_cost_limit">
+       <term><varname>checksum_cost_limit</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>checksum_cost_limit</varname> configuration 
parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         The accumulated cost that will cause the verification process to 
sleep.
+         The default value is 200.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+
+     <note>
+      <para>
+       There are certain operations that hold critical locks and should
+       therefore complete as quickly as possible.  Cost-based checksum
+       verification delays do not occur during such operations.  Therefore it
+       is possible that the cost accumulates far higher than the specified
+       limit.  To avoid uselessly long delays in such cases, the actual delay
+       is calculated as <varname>checksum_cost_delay</varname> *
+       <varname>accumulated_balance</varname> /
+       <varname>checksum_cost_limit</varname> with a maximum of
+       <varname>checksum_cost_delay</varname> * 4.
+      </para>
+     </note>
+    </sect2>
+
     <sect2 id="runtime-config-resource-background-writer">
      <title>Background Writer</title>
 
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 4d88b45e72..75bcf3484f 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -21896,6 +21896,57 @@ SELECT (pg_stat_file('filename')).modification;
 
   </sect2>
 
+  <sect2 id="functions-data-sanity">
+   <title>Data Sanity Functions</title>
+
+   <para>
+    The functions shown in <xref linkend="functions-data-sanity-table"/>
+    provide means to check for health of data file in a cluster.
+   </para>
+
+   <table id="functions-data-sanity-table">
+    <title>Data Sanity Functions</title>
+    <tgroup cols="3">
+     <thead>
+      <row><entry>Name</entry> <entry>Return Type</entry> 
<entry>Description</entry>
+      </row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry>
+        <literal><function>pg_check_relation(<parameter>relation</parameter> 
<type>regclass</type>[, <parameter>fork</parameter> 
<type>text</type>])</function></literal>
+       </entry>
+       <entry><type>setof record</type></entry>
+       <entry>Validate the checksums for all blocks of a given relation, and
+       optionally the given fork.</entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+   <indexterm>
+    <primary>pg_check_relation</primary>
+   </indexterm>
+   <para id="functions-check-relation-note" xreflabel="pg_check_relation">
+    <function>pg_check_relation</function> iterates over all the blocks of a
+    given relation and verify their checksum.  If provided,
+    <replaceable>fork</replaceable> should be <literal>'main'</literal> for the
+    main data fork, <literal>'fsm'</literal> for the free space map,
+    <literal>'vm'</literal> for the visibility map, or
+    <literal>'init'</literal> for the initialization fork, and only this
+    specific fork will be verifies, otherwise all forks will.  The function
+    returns the list of blocks for which the found checksum doesn't match the
+    expected one.  See <xref
+    linkend="runtime-config-resource-checksum-verification-cost"/> for
+    information on how to configure cost-based verification delay.  You must be
+    a member of the <literal>pg_read_all_stats</literal> role to use this
+    function.  It can only be used if data checksums are enabled.  See <xref
+    linkend="app-initdb-data-checksums"/> for more information.
+   </para>
+
+  </sect2>
+
   </sect1>
 
   <sect1 id="functions-trigger">
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 813ea8bfc3..f289afda6d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1415,6 +1415,13 @@ LANGUAGE internal
 STRICT IMMUTABLE PARALLEL SAFE
 AS 'unicode_is_normalized';
 
+CREATE OR REPLACE FUNCTION pg_check_relation(
+  IN relation regclass, IN fork text DEFAULT NULL::text,
+  OUT relid oid, OUT forknum integer, OUT failed_blocknum bigint,
+  OUT expected_checksum integer, OUT found_checksum integer)
+  RETURNS SETOF record VOLATILE LANGUAGE internal AS 'pg_check_relation'
+  PARALLEL RESTRICTED;
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
diff --git a/src/backend/storage/page/checksum.c 
b/src/backend/storage/page/checksum.c
index e010691c9f..0529e8204a 100644
--- a/src/backend/storage/page/checksum.c
+++ b/src/backend/storage/page/checksum.c
@@ -15,8 +15,420 @@
 
 #include "storage/checksum.h"
 /*
- * The actual code is in storage/checksum_impl.h.  This is done so that
- * external programs can incorporate the checksum code by #include'ing
- * that file from the exported Postgres headers.  (Compare our CRC code.)
+ * The actual checksum computation code is in storage/checksum_impl.h.  This
+ * is done so that external programs can incorporate the checksum code by
+ * #include'ing that file from the exported Postgres headers.  (Compare our
+ * CRC code.)
  */
 #include "storage/checksum_impl.h"
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "catalog/pg_authid_d.h"
+#include "commands/dbcommands.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/buf_internals.h"
+#include "storage/checksum.h"
+#include "storage/lockdefs.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+
+/*
+ * A zero checksum can never be computed, see pg_checksum_page() */
+#define NoComputedChecksum     0
+
+/* ----------------
+ * The rest of this module provides a set of functions that can be used to
+ * safely check all checksums on a running cluster.
+ *
+ * Please note that those only perform standard buffered reads, and don't try
+ * to bypass or discard the operating system cache.  If you want to check the
+ * actual storage, you have to discard the operating system cache before
+ * running those functions.
+ *
+ * To avoid torn page and possible false positive when reading data, and
+ * keeping overhead as low as possible, the following heuristics are used:
+ *
+ * - a shared LWLock is taken on the target buffer pool partition mapping, and
+ *   we detect if a block is in shared_buffers or not.  See check_get_buffer()
+ *   comments for more details about the locking strategy.
+ *
+ * - if a block is dirty in shared_buffers, it's ignored as it'll be flushed to
+ *   disk either before the end of the next checkpoint or during recovery in
+ *   case of unsafe shutdown
+ *
+ * - if a block is otherwise found in shared_buffers, an IO lock is taken on
+ *   the block and the block is then read from storage, ignoring the block in
+ *   shared_buffers
+ *
+ * - if a block is not found in shared_buffers, the LWLock is released and the
+ *   block is read from disk without taking any lock.  If an error is detected,
+ *   the read block will be discarded and retrieved again while holding the
+ *   LWLock.  This is because an error due to concurrent write is possible but
+ *   very unlikely, so it's better to have an optimistic approach to limit
+ *   locking overhead
+ *
+ * The check can be performed using an SQL function, returning the list of
+ * problematic blocks.
+ * ----------------
+ */
+
+static bool check_buffer(char *buffer, uint32 blkno, uint16 *chk_expected,
+                                                          uint16 *chk_found);
+static void check_relation_fork(TupleDesc tupdesc, Tuplestorestate *tupstore,
+                                                                         
Relation relation, ForkNumber forknum);
+static void check_delay_point(void);
+static bool check_get_buffer(Relation relation, ForkNumber forknum,
+                                                        BlockNumber blkno, 
char *buffer, bool needlock,
+                                                        bool *found_in_sb);
+
+/*
+ * Perform a checksum check on the passed page.  Returns whether the page is
+ * valid or not, and assign the expected and found checksum in chk_expected and
+ * chk_found, respectively.  Note that a page can look like new but could be
+ * the result of a corruption.  We still check for this case, but we can't
+ * compute its checksum as pg_checksum_page() is explicitly checking for 
non-new
+ * pages, so NoComputedChecksum will be set in chk_found.
+ */
+static bool
+check_buffer(char *buffer, uint32 blkno, uint16 *chk_expected,
+                                  uint16 *chk_found)
+{
+       Page            page = (Page) buffer;
+       PageHeader      hdr = (PageHeader) page;
+
+       Assert(chk_expected && chk_found);
+
+       if (PageIsNew(page))
+       {
+               /*
+                * Check if the page is really new or if there's a corruption 
that
+                * affected PageIsNew detection.  Note that PageIsVerified 
won't try to
+                * detect checksum corruption in this case, so there's no risk 
of
+                * duplicated corruption report.
+                */
+               if (PageIsVerified(page, blkno))
+               {
+                       /* No corruption. */
+                       return true;
+               }
+
+               /*
+                * There's a corruption, but since this affect PageIsNew, we
+                * can't compute a checksum, so set NoComputedChecksum for the
+                * expected checksum.
+                */
+               *chk_expected = NoComputedChecksum;
+               *chk_found = hdr->pd_checksum;
+               return false;
+       }
+
+       *chk_expected = pg_checksum_page(buffer, blkno);
+       *chk_found = hdr->pd_checksum;
+
+       return (*chk_expected == *chk_found);
+}
+
+/*
+ * Perform the check on a single relation, possibly filtered with a single
+ * fork.  This function will check if the given relation exists or not, as
+ * a relation could be dropped after checking for the list of relations and
+ * before getting here, and we don't want to error out in this case.
+ */
+void
+check_one_relation(TupleDesc tupdesc, Tuplestorestate *tupstore,
+                                        Oid relid, ForkNumber single_forknum)
+{
+       Relation        relation = NULL;
+       ForkNumber      forknum;
+
+       relation = relation_open(relid, AccessShareLock);
+
+       /* sanity checks */
+       if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("relation \"%s\" does not have storage 
to be checked",
+                                quote_qualified_identifier(
+                                        
get_namespace_name(get_rel_namespace(relid)),
+                                        get_rel_name(relid)))));
+
+       if (RELATION_IS_OTHER_TEMP(relation))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot verify temporary tables of 
other sessions")));
+
+       RelationOpenSmgr(relation);
+
+       for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
+       {
+               if (single_forknum != InvalidForkNumber && single_forknum != 
forknum)
+                       continue;
+
+               if (smgrexists(relation->rd_smgr, forknum))
+                       check_relation_fork(tupdesc, tupstore, relation, 
forknum);
+       }
+
+       relation_close(relation, AccessShareLock);      /* release the lock */
+}
+
+/*
+ * For a given relation and fork, Do the real work of iterating over all pages
+ * and doing the check.  Caller must hold an AccessShareLock lock on the given
+ * relation.
+ */
+static void
+check_relation_fork(TupleDesc tupdesc, Tuplestorestate *tupstore,
+                                                 Relation relation, ForkNumber 
forknum)
+{
+       BlockNumber blkno,
+                               nblocks;
+       char            buffer[BLCKSZ];
+       bool            found_in_sb;
+
+       /*
+        * We remember the number of blocks here.  Since caller must hold a 
lock on
+        * the relation, we know that it won't be truncated while we're 
iterating
+        * over the blocks.  Any block added after this function started won't 
be
+        * checked, but this is out of scope as such pages will be flushed 
before
+        * the next checkpoint's completion.
+        */
+       nblocks = RelationGetNumberOfBlocksInFork(relation, forknum);
+
+#define PG_CHECK_RELATION_COLS                 5       /* Number of output 
arguments in the SRF */
+       for (blkno = 0; blkno < nblocks; blkno++)
+       {
+               bool            force_lock = false;
+               uint16          chk_expected,
+                                       chk_found;
+               Datum           values[PG_CHECK_RELATION_COLS];
+               bool            nulls[PG_CHECK_RELATION_COLS];
+               int                     i = 0;
+
+               /*
+                * To avoid too much overhead, the buffer will be first read 
without
+                * the locks that would guarantee the lack of false positive, 
as such
+                * events should be quite rare.
+                */
+Retry:
+               if (!check_get_buffer(relation, forknum, blkno, buffer, 
force_lock,
+                                                         &found_in_sb))
+                       continue;
+
+               if (check_buffer(buffer, blkno, &chk_expected, &chk_found))
+                       continue;
+
+               /*
+                * If we get a failure and the buffer wasn't found in shared 
buffers,
+                * reread the buffer with suitable lock to avoid false 
positive.  See
+                * check_get_buffer for more details.
+                */
+               if (!found_in_sb && !force_lock)
+               {
+                       force_lock = true;
+                       goto Retry;
+               }
+
+               memset(values, 0, sizeof(values));
+               memset(nulls, 0, sizeof(nulls));
+
+               values[i++] = ObjectIdGetDatum(relation->rd_id);
+               values[i++] = Int32GetDatum(forknum);
+               values[i++] = UInt32GetDatum(blkno);
+               /*
+                * This can happen if a corruption makes the block appears as
+                * PageIsNew() but isn't a new page.
+                */
+               if (chk_expected == NoComputedChecksum)
+                       nulls[i++] = true;
+               else
+                       values[i++] = UInt16GetDatum(chk_expected);
+               values[i++] = UInt16GetDatum(chk_found);
+
+               Assert(i == PG_CHECK_RELATION_COLS);
+
+               /* Report the failure to the stat collector and the logs. */
+               pgstat_report_checksum_failure();
+               ereport(WARNING,
+                               (errcode(ERRCODE_DATA_CORRUPTED),
+                                errmsg("invalid page in block %u of relation 
%s",
+                                               blkno,
+                                               
relpath(relation->rd_smgr->smgr_rnode, forknum))));
+
+               /* Save the corrupted blocks in the tuplestore. */
+               tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+       }
+}
+
+/*
+ * Check for interrupts and cost-based delay.
+ */
+static void
+check_delay_point(void)
+{
+       /* Always check for interrupts */
+       CHECK_FOR_INTERRUPTS();
+
+       if (!ChecksumCostActive || InterruptPending)
+               return;
+
+       /* Nap if appropriate */
+       if (ChecksumCostBalance >= ChecksumCostLimit)
+       {
+               int                     msec;
+
+               msec = ChecksumCostDelay * ChecksumCostBalance / 
ChecksumCostLimit;
+               if (msec > ChecksumCostDelay * 4)
+                       msec = ChecksumCostDelay * 4;
+
+               pg_usleep(msec * 1000L);
+
+               ChecksumCostBalance = 0;
+
+               /* Might have gotten an interrupt while sleeping */
+               CHECK_FOR_INTERRUPTS();
+       }
+}
+
+/*
+ *-------------------------
+ * Safely read the wanted buffer from disk, dealing with possible concurrency
+ * issue.  Note that if a buffer is found dirty in shared_buffers, no read will
+ * be performed and the caller will be informed that no check should be done.
+ * We can safely ignore such buffers as they'll be written before next
+ * checkpoint's completion..
+ *
+ * The following locks can be used in this function:
+ *
+ *   - shared LWLock on the target buffer pool partition mapping.
+ *   - IOLock on the buffer
+ *
+ * The IOLock is taken when reading the buffer from disk if it exists in
+ * shared_buffers, to avoid torn pages.
+ *
+ * If the buffer isn't in shared_buffers, it'll be read from disk without any
+ * lock unless caller asked otherwise, setting needlock.  In this case, the
+ * read will be done while the buffer mapping partition LWLock is still being
+ * held.  Reading with this lock is to avoid the unlikely but possible case
+ * that a buffer wasn't present in shared buffers when we checked but it then
+ * alloc'ed in shared_buffers, modified and flushed concurrently when we
+ * later try to read it, leading to false positive due to torn page.  Caller
+ * can read first the buffer without holding the target buffer mapping
+ * partition LWLock to have an optimistic approach, and reread the buffer
+ * from disk in case of error.
+ *
+ * Caller should hold an AccessShareLock on the Relation
+ *-------------------------
+ */
+static bool
+check_get_buffer(Relation relation, ForkNumber forknum,
+                                BlockNumber blkno, char *buffer, bool needlock,
+                                bool *found_in_sb)
+{
+       bool            checkit = true;
+       BufferTag       buf_tag;                /* identity of requested block 
*/
+       uint32          buf_hash;               /* hash value for buf_tag */
+       LWLock     *partLock;           /* buffer partition lock for the buffer 
*/
+       BufferDesc *bufdesc;
+       int                     buf_id;
+
+       *found_in_sb = false;
+
+       /* Check for interrupts and take throttling into account. */
+       check_delay_point();
+
+       /* create a tag so we can lookup the buffer */
+       INIT_BUFFERTAG(buf_tag, relation->rd_smgr->smgr_rnode.node, forknum, 
blkno);
+
+       /* determine its hash code and partition lock ID */
+       buf_hash = BufTableHashCode(&buf_tag);
+       partLock = BufMappingPartitionLock(buf_hash);
+
+       /* see if the block is in the buffer pool already */
+       LWLockAcquire(partLock, LW_SHARED);
+       buf_id = BufTableLookup(&buf_tag, buf_hash);
+       if (buf_id >= 0)
+       {
+               uint32          buf_state;
+
+               *found_in_sb = true;
+
+               /*
+                * Found it.  Now, retrieve its state to know what to do with 
it, and
+                * release the pin immediately.  We do so to limit overhead as 
much
+                * as possible.  We'll keep the shared lightweight lock on the 
target
+                * buffer mapping partition, so this buffer can't be evicted, 
and
+                * we'll acquire an IOLock on the buffer if we need to read the
+                * content on disk.
+                */
+               bufdesc = GetBufferDescriptor(buf_id);
+
+               buf_state = LockBufHdr(bufdesc);
+               UnlockBufHdr(bufdesc, buf_state);
+
+               /*
+                * Dirty pages are ignored as they'll be flushed soon. Invalid 
buffers
+                * are also skipped.
+                */
+               if ((buf_state & BM_DIRTY) || !(buf_state & BM_TAG_VALID))
+                       checkit = false;
+
+               /*
+                * Read the buffer from disk, taking on IO lock to prevent 
torn-page
+                * reads, in the unlikely event that it was concurrently 
dirtied and
+                * flushed.
+                */
+               if (checkit)
+               {
+                       LWLockAcquire(BufferDescriptorGetIOLock(bufdesc), 
LW_SHARED);
+                       smgrread(relation->rd_smgr, forknum, blkno, buffer);
+                       LWLockRelease(BufferDescriptorGetIOLock(bufdesc));
+
+                       /* Add a page cost. */
+                       ChecksumCostBalance += ChecksumCostPage;
+               }
+       }
+       else if (needlock)
+       {
+               /*
+                * Caller asked to read the buffer while we have a lock on the 
target
+                * partition.
+                */
+               smgrread(relation->rd_smgr, forknum, blkno, buffer);
+
+               /* The buffer will have to check checked. */
+               Assert(checkit);
+
+               /* Add a page cost. */
+               ChecksumCostBalance += ChecksumCostPage;
+       }
+
+       LWLockRelease(partLock);
+
+       if (*found_in_sb || needlock)
+               return checkit;
+
+       /* After this point the buffer will always be checked. */
+       Assert(checkit);
+
+       /*
+        * Didn't find it in the buffer pool and didn't read it while holding 
the
+        * buffer mapping partition lock.  We'll have to try to read it from
+        * disk, after releasing the target partition lock to avoid too much
+        * overhead.  It means that it's possible to get a torn page later, so
+        * we'll have to retry with a suitable lock in case of error to avoid
+        * false positive.
+        */
+       smgrread(relation->rd_smgr, forknum, blkno, buffer);
+
+       /* Add a page cost. */
+       ChecksumCostBalance += ChecksumCostPage;
+
+       return checkit;
+}
diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile
index 13efa9338c..27dc524e00 100644
--- a/src/backend/utils/adt/Makefile
+++ b/src/backend/utils/adt/Makefile
@@ -22,6 +22,7 @@ OBJS = \
        bool.o \
        cash.o \
        char.o \
+       checksumfuncs.o \
        cryptohashes.o \
        date.o \
        datetime.o \
diff --git a/src/backend/utils/adt/checksumfuncs.c 
b/src/backend/utils/adt/checksumfuncs.c
new file mode 100644
index 0000000000..b53b4c0bbf
--- /dev/null
+++ b/src/backend/utils/adt/checksumfuncs.c
@@ -0,0 +1,96 @@
+/*-------------------------------------------------------------------------
+ *
+ * checksumfuncs.c
+ *       Functions for checksums related feature such as online verification
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/utils/adt/checksumfuncs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "catalog/pg_authid_d.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/checksum.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+
+Datum
+pg_check_relation(PG_FUNCTION_ARGS)
+{
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc       tupdesc;
+       Tuplestorestate *tupstore;
+       MemoryContext per_query_ctx;
+       MemoryContext oldcontext;
+       ForkNumber      forknum = InvalidForkNumber;
+       Oid                     relid = InvalidOid;
+       const char *forkname;
+
+       if (!DataChecksumsEnabled())
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("data checksums are not enabled in 
cluster")));
+
+       if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                errmsg("only superuser or a member of the 
pg_read_server_files role may use this function")));
+
+       if (PG_ARGISNULL(0))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("relation cannot be null")));
+
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context 
that cannot accept a set")));
+       if (!(rsinfo->allowedModes & SFRM_Materialize))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is 
not " \
+                                               "allowed in this context")));
+
+       /* Switch into long-lived context to construct returned data structures 
*/
+       per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+       oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       tupstore = tuplestore_begin_heap(true, false, work_mem);
+       rsinfo->returnMode = SFRM_Materialize;
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
+
+       MemoryContextSwitchTo(oldcontext);
+
+       relid = PG_GETARG_OID(0);
+
+       if (PG_NARGS() == 2 && !PG_ARGISNULL(1))
+       {
+               forkname = TextDatumGetCString(PG_GETARG_TEXT_PP(1));
+               forknum = forkname_to_number(forkname);
+       }
+
+       /* Set cost-based vacuum delay */
+       ChecksumCostActive = (ChecksumCostDelay > 0);
+       ChecksumCostBalance = 0;
+
+       check_one_relation(tupdesc, tupstore, relid, forknum);
+
+       tuplestore_donestoring(tupstore);
+
+       return (Datum) 0;
+}
+
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index eb19644419..8683343619 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -134,6 +134,13 @@ int                        max_worker_processes = 8;
 int                    max_parallel_workers = 8;
 int                    MaxBackends = 0;
 
+int                    ChecksumCostPage = 10;  /* GUC parameters for checksum 
check */
+int                    ChecksumCostLimit = 200;
+double         ChecksumCostDelay = 0;
+
+int                    ChecksumCostBalance = 0;        /* working state for 
checksums check */
+bool           ChecksumCostActive = false;
+
 int                    VacuumCostPageHit = 1;  /* GUC parameters for vacuum */
 int                    VacuumCostPageMiss = 10;
 int                    VacuumCostPageDirty = 20;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 64dc9fbd13..b3df5c6518 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -694,6 +694,8 @@ const char *const config_group_names[] =
        gettext_noop("Resource Usage / Disk"),
        /* RESOURCES_KERNEL */
        gettext_noop("Resource Usage / Kernel Resources"),
+       /* RESOURCES_CHECKSUM_DELAY */
+       gettext_noop("Resource Usage / Cost-Based Checksum Verification Delay"),
        /* RESOURCES_VACUUM_DELAY */
        gettext_noop("Resource Usage / Cost-Based Vacuum Delay"),
        /* RESOURCES_BGWRITER */
@@ -2388,6 +2390,26 @@ static struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"checksum_cost_page", PGC_USERSET, RESOURCES_CHECKSUM_DELAY,
+                       gettext_noop("Checksum cost for verifying a page 
found."),
+                       NULL
+               },
+               &ChecksumCostPage,
+               10, 0, 10000,
+               NULL, NULL, NULL
+       },
+
+       {
+               {"checksum_cost_limit", PGC_USERSET, RESOURCES_CHECKSUM_DELAY,
+                       gettext_noop("Checksum cost amount available before 
napping."),
+                       NULL
+               },
+               &ChecksumCostLimit,
+               200, 1, 10000,
+               NULL, NULL, NULL
+       },
+
        {
                {"vacuum_cost_page_hit", PGC_USERSET, RESOURCES_VACUUM_DELAY,
                        gettext_noop("Vacuum cost for a page found in the 
buffer cache."),
@@ -3541,6 +3563,17 @@ static struct config_real ConfigureNamesReal[] =
                check_random_seed, assign_random_seed, show_random_seed
        },
 
+       {
+               {"checksum_cost_delay", PGC_USERSET, RESOURCES_CHECKSUM_DELAY,
+                       gettext_noop("Checksum cost delay in milliseconds."),
+                       NULL,
+                       GUC_UNIT_MS
+               },
+               &ChecksumCostDelay,
+               0, 0, 100,
+               NULL, NULL, NULL
+       },
+
        {
                {"vacuum_cost_delay", PGC_USERSET, RESOURCES_VACUUM_DELAY,
                        gettext_noop("Vacuum cost delay in milliseconds."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample 
b/src/backend/utils/misc/postgresql.conf.sample
index e904fa7300..a112d010b3 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -156,6 +156,12 @@
 #max_files_per_process = 1000          # min 64
                                        # (change requires restart)
 
+# - Cost-Based Checksum Verification Delay -
+
+#checksum_cost_delay = 0                       # 0-100 milliseconds (0 
disables)
+#checksum_cost_page = 10               # 0-10000 credits
+#checksum_cost_limit = 200             # 1-10000 credits
+
 # - Cost-Based Vacuum Delay -
 
 #vacuum_cost_delay = 0                 # 0-100 milliseconds (0 disables)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a649e44d08..e706df5f5f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10825,6 +10825,14 @@
   proallargtypes => '{oid,text,int8,timestamptz}', proargmodes => '{i,o,o,o}',
   proargnames => '{tablespace,name,size,modification}',
   prosrc => 'pg_ls_tmpdir_1arg' },
+{ oid => '9147', descr => 'check data integrity for one or all relations',
+  proname => 'pg_check_relation', proisstrict => 'f', procost => '10000',
+  prorows => '20', proretset => 't', proparallel => 'r',
+  provolatile => 'v', prorettype => 'record', proargtypes => 'regclass text',
+  proallargtypes => '{regclass,text,oid,int4,int8,int4,int4}',
+  proargmodes => '{i,i,o,o,o,o,o}',
+  proargnames => 
'{relation,fork,relid,forknum,failed_blocknum,expected_checksum,found_checksum}',
+  prosrc => 'pg_check_relation' },
 
 # hash partitioning constraint function
 { oid => '5028', descr => 'hash partition CHECK constraint',
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 14fa127ab1..8d4605efc8 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -246,6 +246,13 @@ extern PGDLLIMPORT int work_mem;
 extern PGDLLIMPORT int maintenance_work_mem;
 extern PGDLLIMPORT int max_parallel_maintenance_workers;
 
+extern int     ChecksumCostPage;
+extern int     ChecksumCostLimit;
+extern double ChecksumCostDelay;
+
+extern int     ChecksumCostBalance;
+extern bool ChecksumCostActive;
+
 extern int     VacuumCostPageHit;
 extern int     VacuumCostPageMiss;
 extern int     VacuumCostPageDirty;
diff --git a/src/include/storage/checksum.h b/src/include/storage/checksum.h
index 6e77744cbc..29ca52d8c9 100644
--- a/src/include/storage/checksum.h
+++ b/src/include/storage/checksum.h
@@ -13,8 +13,15 @@
 #ifndef CHECKSUM_H
 #define CHECKSUM_H
 
+#include "postgres.h"
+
+#include "access/tupdesc.h"
+#include "common/relpath.h"
 #include "storage/block.h"
+#include "utils/tuplestore.h"
 
+extern void check_one_relation(TupleDesc tupdesc, Tuplestorestate *tupstore,
+                                                          Oid relid, 
ForkNumber single_forknum);
 /*
  * Compute the checksum for a Postgres page.  The page must be aligned on a
  * 4-byte boundary.
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index 454c2df487..84965d3f09 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -62,6 +62,7 @@ enum config_group
        RESOURCES_MEM,
        RESOURCES_DISK,
        RESOURCES_KERNEL,
+       RESOURCES_CHECKSUM_DELAY,
        RESOURCES_VACUUM_DELAY,
        RESOURCES_BGWRITER,
        RESOURCES_ASYNCHRONOUS,
diff --git a/src/test/Makefile b/src/test/Makefile
index efb206aa75..832868099f 100644
--- a/src/test/Makefile
+++ b/src/test/Makefile
@@ -12,7 +12,8 @@ subdir = src/test
 top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS = perl regress isolation modules authentication recovery subscription
+SUBDIRS = perl regress isolation modules authentication check_relation \
+         recovery subscription
 
 # Test suites that are not safe by default but can be run if selected
 # by the user via the whitespace-separated list in variable
diff --git a/src/test/check_relation/.gitignore 
b/src/test/check_relation/.gitignore
new file mode 100644
index 0000000000..871e943d50
--- /dev/null
+++ b/src/test/check_relation/.gitignore
@@ -0,0 +1,2 @@
+# Generated by test suite
+/tmp_check/
diff --git a/src/test/check_relation/Makefile b/src/test/check_relation/Makefile
new file mode 100644
index 0000000000..792098fa65
--- /dev/null
+++ b/src/test/check_relation/Makefile
@@ -0,0 +1,23 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/test/check_relation
+#
+# Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/test/check_relation/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/test/check_relation
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+check:
+       $(prove_check)
+
+installcheck:
+       $(prove_installcheck)
+
+clean distclean maintainer-clean:
+       rm -rf tmp_check
diff --git a/src/test/check_relation/README b/src/test/check_relation/README
new file mode 100644
index 0000000000..415c4b21ad
--- /dev/null
+++ b/src/test/check_relation/README
@@ -0,0 +1,23 @@
+src/test/check_relation/README
+
+Regression tests for online checksums verification
+==================================================
+
+This directory contains a test suite for online checksums verification.
+
+Running the tests
+=================
+
+NOTE: You must have given the --enable-tap-tests argument to configure.
+
+Run
+    make check
+or
+    make installcheck
+You can use "make installcheck" if you previously did "make install".
+In that case, the code in the installation tree is tested.  With
+"make check", a temporary installation tree is built from the current
+sources and then tested.
+
+Either way, this test initializes, starts, and stops a test Postgres
+cluster.
diff --git a/src/test/check_relation/t/01_checksums_check.pl 
b/src/test/check_relation/t/01_checksums_check.pl
new file mode 100644
index 0000000000..af67252ea1
--- /dev/null
+++ b/src/test/check_relation/t/01_checksums_check.pl
@@ -0,0 +1,276 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 59;
+
+our $CHECKSUM_UINT16_OFFSET = 4;
+our $PD_UPPER_UINT16_OFFSET = 7;
+our $BLOCKSIZE;
+our $TOTAL_NB_ERR = 0;
+
+sub get_block
+{
+       my ($filename, $blkno) = @_;
+       my $block;
+
+       open(my $infile, '<', $filename) or die;
+       binmode($infile);
+
+       my $success = read($infile, $block, $BLOCKSIZE, ($blkno * $BLOCKSIZE));
+       die($!) if not defined $success;
+
+       close($infile);
+
+       return($block);
+}
+
+sub overwrite_block
+{
+       my ($filename, $block, $blkno) = @_;
+
+       open(my $outfile, '>', $filename) or die;
+       binmode ($outfile);
+
+       my $nb = syswrite($outfile, $block, $BLOCKSIZE, ($blkno * $BLOCKSIZE));
+
+       die($!) if not defined $nb;
+       die("Write error") if ($nb != $BLOCKSIZE);
+
+       $outfile->flush();
+
+       close($outfile);
+}
+
+sub get_uint16_from_page
+{
+       my ($block, $offset) = @_;
+
+       return (unpack("S*", $block))[$offset];
+}
+
+sub set_uint16_to_page
+{
+       my ($block, $data, $offset) = @_;
+
+       my $pack = pack("S", $data);
+
+       # vec with 16B or more won't preserve endianness
+       vec($block, 2*$offset, 8) = (unpack('C*', $pack))[0];
+       vec($block, (2*$offset) + 1, 8) = (unpack('C*', $pack))[1];
+
+       return $block;
+}
+
+sub check_checksums_call
+{
+       my ($node, $relname) = @_;
+
+       my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT 
COUNT(*)"
+               . " FROM pg_catalog.pg_check_relation('$relname')"
+       );
+
+        return ($stderr eq '');
+}
+
+sub check_checksums_nb_error
+{
+       my ($node, $nb, $pattern) = @_;
+
+       my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT 
COUNT(*)"
+        . " FROM (SELECT pg_catalog.pg_check_relation(oid)"
+        . "   FROM pg_class WHERE relkind in ('r', 'i', 'm')) AS s"
+       );
+
+       is($cmdret, 0, 'Function should run successfully');
+       like($stderr, $pattern, 'Error output should match expectations');
+       is($stdout, $nb, "Should have $nb error");
+
+       $TOTAL_NB_ERR += $nb;
+}
+
+sub check_pg_stat_database_nb_error
+{
+       my ($node) = @_;
+
+       my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT "
+               . " sum(checksum_failures)"
+               . " FROM pg_catalog.pg_stat_database"
+       );
+
+       is($cmdret, 0, 'Function should run successfully');
+       is($stderr, '', 'Function should run successfully');
+       is($stdout, $TOTAL_NB_ERR, "Should have $TOTAL_NB_ERR error");
+}
+
+sub get_checksums_errors
+{
+       my ($node, $nb, $pattern) = @_;
+
+       my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT"
+               . " relid::regclass::text, forknum, failed_blocknum,"
+               . " expected_checksum, found_checksum"
+               . " FROM (SELECT (pg_catalog.pg_check_relation(oid)).*"
+        . "   FROM pg_class WHERE relkind in ('r','i', 'm')) AS s"
+       );
+
+        is($cmdret, '0', 'Function should run successfully');
+        like($stderr, $pattern, 'Error output should match expectations');
+
+        $TOTAL_NB_ERR += $nb;
+
+        return $stdout;
+}
+
+# This function will perform various test by modifying the specified block at
+# the specified uint16 offset, checking that the corruption is correctly
+# detected, and finally restore the specified block to its original content.
+sub corrupt_and_test_block
+{
+       my ($node, $filename, $blkno, $offset, $fake_data) = @_;
+
+       check_checksums_nb_error($node, 0, qr/^$/);
+
+check_pg_stat_database_nb_error($node);
+
+       $node->stop();
+
+       my $original_block = get_block($filename, 0);
+       my $original_data = get_uint16_from_page($original_block, $offset);
+
+       isnt($original_data, $fake_data,
+               "The fake data at offset $offset should be different"
+               . " from the existing one");
+
+       my $new_block = set_uint16_to_page($original_block, $fake_data, 
$offset);
+       isnt($original_data, get_uint16_from_page($new_block, $offset),
+               "The fake data at offset $offset should have been changed in 
memory");
+
+       overwrite_block($filename, $new_block, 0);
+
+       my $written_data = get_uint16_from_page(get_block($filename, 0), 
$offset);
+       isnt($original_data, $written_data,
+               "The data written at offset $offset should be different"
+               . " from the original one");
+       is(get_uint16_from_page($new_block, $offset), $written_data,
+               "The data written at offset $offset should be the same"
+               . " as the one in memory");
+       is($written_data, $fake_data,
+               "The data written at offset $offset should be the one"
+               . "     we wanted to write");
+
+       $node->start();
+
+       check_checksums_nb_error($node, 1, qr/invalid page in block $blkno/);
+
+       my $expected_checksum;
+       my $found_checksum = get_uint16_from_page($new_block,
+               $CHECKSUM_UINT16_OFFSET);
+       if ($offset == $PD_UPPER_UINT16_OFFSET)
+       {
+               # A checksum can't be computed if it's detected as PageIsNew(), 
so the
+               # function returns NULL for the computed checksum
+               $expected_checksum = '';
+       }
+       else
+       {
+               $expected_checksum = get_uint16_from_page($original_block,
+                       $CHECKSUM_UINT16_OFFSET);
+       }
+
+       my $det = get_checksums_errors($node, 1, qr/invalid page in block 
$blkno/);
+       is($det, "t1|0|0|$expected_checksum|$found_checksum",
+               "The checksums error for modification at offset $offset"
+               . " should be detected");
+
+       $node->stop();
+
+       $new_block = set_uint16_to_page($original_block, $original_data, 
$offset);
+       is($original_data, get_uint16_from_page($new_block, $offset),
+               "The data at offset $offset should have been restored in 
memory");
+
+       overwrite_block($filename, $new_block, 0);
+       is($original_data, get_uint16_from_page(get_block($filename, $blkno),
+                       $offset),
+               "The data at offset $offset should have been restored on disk");
+
+       $node->start();
+
+       check_checksums_nb_error($node, 0, qr/^$/);
+}
+
+if (exists $ENV{MY_PG_REGRESS})
+{
+       $ENV{PG_REGRESS} = $ENV{MY_PG_REGRESS};
+}
+
+my $node = get_new_node('main');
+
+my %params;
+$params{'extra'} = ['--data-checksums'];
+$node->init(%params);
+
+$node->start();
+
+$ENV{PGOPTIONS} = '--client-min-messages=WARNING';
+
+my ($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT"
+       . " current_setting('data_checksums')");
+
+is($stdout, 'on', 'Data checksums shoud be enabled');
+
+($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT"
+       . " current_setting('block_size')");
+
+$BLOCKSIZE = $stdout;
+
+$node->safe_psql(
+       'postgres', q|
+       CREATE TABLE public.t1(id integer);
+       CREATE INDEX t1_id_idx ON public.t1 (id);
+       INSERT INTO public.t1 SELECT generate_series(1, 100);
+       CREATE VIEW public.v1 AS SELECT * FROM t1;
+       CREATE MATERIALIZED VIEW public.mv1 AS SELECT * FROM t1;
+       CREATE SEQUENCE public.s1;
+       CREATE UNLOGGED TABLE public.u_t1(id integer);
+       CREATE INDEX u_t1_id_idx ON public.u_t1 (id);
+       INSERT INTO public.u_t1 SELECT generate_series(1, 100);
+       CHECKPOINT;
+|);
+
+# Check sane behavior on various objects type, including those that don't have
+# a storage.
+is(check_checksums_call($node, 't1'), '1', 'Can check a table');
+is(check_checksums_call($node, 't1_id_idx'), '1', 'Can check an index');
+is(check_checksums_call($node, 'v1'), '', 'Cannot check a view');
+is(check_checksums_call($node, 'mv1'), '1', 'Can check a materialized view');
+is(check_checksums_call($node, 's1'), '1', 'Can check a sequence');
+is(check_checksums_call($node, 'u_t1'), '1', 'Can check an unlogged table');
+is(check_checksums_call($node, 'u_t1_id_idx'), '1', 'Can check an unlogged 
index');
+
+# get the underlying heap absolute path
+($cmdret, $stdout, $stderr) = $node->psql('postgres', "SELECT"
+       . " current_setting('data_directory') || '/' || 
pg_relation_filepath('t1')"
+);
+
+isnt($stdout, '', 'A relfinode should be returned');
+
+my $filename = $stdout;
+
+check_checksums_nb_error($node, 0, qr/^$/);
+
+check_pg_stat_database_nb_error($node);
+
+my $fake_uint16 = hex '0x0000';
+
+# Test with a modified checksum.  We use a zero checksum here as it's the only
+# one that cannot exist on a checksummed page.  We also don't have an easy way
+# to compute what the checksum would be after a modification in a random place
+# in the block.
+corrupt_and_test_block($node, $filename, 0, $CHECKSUM_UINT16_OFFSET,
+       $fake_uint16);
+
+# Test corruption making the block looks like it's PageIsNew().
+corrupt_and_test_block($node, $filename, 0, $PD_UPPER_UINT16_OFFSET,
+       $fake_uint16);
-- 
2.20.1

Reply via email to