On 2020/04/28 1:24, Robert Haas wrote:
On Sun, Apr 26, 2020 at 9:41 PM Kyotaro Horiguchi
<horikyota....@gmail.com> wrote:
+1.  I actually sometimes need it.

y the way, -(pg_lsn, pg_lsn) yields a numeric.

It might be a good idea to use numeric here, too. Because int8 is
signed, it's not big enough to cover the whole range of LSNs.

Yes. Attached is the updated version of the patch, which introduces
+(pg_lsn, numeric) and -(pg_lsn, numeric) operators.
To implement them, I added also numeric_pg_lsn() function that converts
numeric to pg_lsn.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/doc/src/sgml/datatype.sgml b/doc/src/sgml/datatype.sgml
index 22eda0f4e9..b747571af5 100644
--- a/doc/src/sgml/datatype.sgml
+++ b/doc/src/sgml/datatype.sgml
@@ -4782,7 +4782,10 @@ SELECT * FROM pg_attribute
     standard comparison operators, like <literal>=</literal> and
     <literal>&gt;</literal>.  Two LSNs can be subtracted using the
     <literal>-</literal> operator; the result is the number of bytes separating
-    those write-ahead log locations.
+    those write-ahead log locations. Also the number of bytes can be added
+    into and substracted from LSN using the <literal>+</literal> and
+    <literal>-</literal> operators, respectively.
+
    </para>
   </sect1>
 
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 9986132b45..19f300205b 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -41,6 +41,7 @@
 #include "utils/guc.h"
 #include "utils/int8.h"
 #include "utils/numeric.h"
+#include "utils/pg_lsn.h"
 #include "utils/sortsupport.h"
 
 /* ----------
@@ -472,6 +473,7 @@ static void apply_typmod(NumericVar *var, int32 typmod);
 static bool numericvar_to_int32(const NumericVar *var, int32 *result);
 static bool numericvar_to_int64(const NumericVar *var, int64 *result);
 static void int64_to_numericvar(int64 val, NumericVar *var);
+static bool numericvar_to_uint64(const NumericVar *var, uint64 *result);
 #ifdef HAVE_INT128
 static bool numericvar_to_int128(const NumericVar *var, int128 *result);
 static void int128_to_numericvar(int128 val, NumericVar *var);
@@ -3688,6 +3690,31 @@ numeric_float4(PG_FUNCTION_ARGS)
 }
 
 
+Datum
+numeric_pg_lsn(PG_FUNCTION_ARGS)
+{
+       Numeric         num = PG_GETARG_NUMERIC(0);
+       NumericVar      x;
+       XLogRecPtr              result;
+
+       /* XXX would it be better to return NULL? */
+       if (NUMERIC_IS_NAN(num))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot convert NaN to pg_lsn")));
+
+       /* Convert to variable format and thence to pg_lsn */
+       init_var_from_num(num, &x);
+
+       if (!numericvar_to_uint64(&x, (uint64 *) &result))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("pg_lsn out of range")));
+
+       PG_RETURN_LSN(result);
+}
+
+
 /* ----------------------------------------------------------------------
  *
  * Aggregate functions
@@ -6739,6 +6766,78 @@ int64_to_numericvar(int64 val, NumericVar *var)
        var->weight = ndigits - 1;
 }
 
+/*
+ * Convert numeric to uint64, rounding if needed.
+ *
+ * If overflow, return false (no error is raised).  Return true if okay.
+ */
+static bool
+numericvar_to_uint64(const NumericVar *var, uint64 *result)
+{
+       NumericDigit *digits;
+       int                     ndigits;
+       int                     weight;
+       int                     i;
+       uint64          val;
+       NumericVar      rounded;
+
+       /* Round to nearest integer */
+       init_var(&rounded);
+       set_var_from_var(var, &rounded);
+       round_var(&rounded, 0);
+
+       /* Check for zero input */
+       strip_var(&rounded);
+       ndigits = rounded.ndigits;
+       if (ndigits == 0)
+       {
+               *result = 0;
+               free_var(&rounded);
+               return true;
+       }
+
+       /* Check for negative input */
+       if (rounded.sign == NUMERIC_NEG)
+       {
+               free_var(&rounded);
+               return false;
+       }
+
+       /*
+        * For input like 10000000000, we must treat stripped digits as real. So
+        * the loop assumes there are weight+1 digits before the decimal point.
+        */
+       weight = rounded.weight;
+       Assert(weight >= 0 && ndigits <= weight + 1);
+
+       /* Construct the result */
+       digits = rounded.digits;
+       val = digits[0];
+       for (i = 1; i <= weight; i++)
+       {
+               if (unlikely(pg_mul_u64_overflow(val, NBASE, &val)))
+               {
+                       free_var(&rounded);
+                       return false;
+               }
+
+               if (i < ndigits)
+               {
+                       if (unlikely(pg_add_u64_overflow(val, digits[i], &val)))
+                       {
+                               free_var(&rounded);
+                               return false;
+                       }
+               }
+       }
+
+       free_var(&rounded);
+
+       *result = val;
+
+       return true;
+}
+
 #ifdef HAVE_INT128
 /*
  * Convert numeric to int128, rounding if needed.
diff --git a/src/backend/utils/adt/pg_lsn.c b/src/backend/utils/adt/pg_lsn.c
index d9754a7778..d3e289964c 100644
--- a/src/backend/utils/adt/pg_lsn.c
+++ b/src/backend/utils/adt/pg_lsn.c
@@ -16,6 +16,7 @@
 #include "funcapi.h"
 #include "libpq/pqformat.h"
 #include "utils/builtins.h"
+#include "utils/numeric.h"
 #include "utils/pg_lsn.h"
 
 #define MAXPG_LSNLEN                   17
@@ -248,3 +249,59 @@ pg_lsn_mi(PG_FUNCTION_ARGS)
 
        return result;
 }
+
+/*
+ * Add the number of bytes to pg_lsn, giving a new pg_lsn.
+ * Must handle both positive and negative numbers of bytes.
+ */
+Datum
+pg_lsn_pli(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      lsn = PG_GETARG_LSN(0);
+       Datum   nbytes = PG_GETARG_DATUM(1);
+       Datum   num;
+       Datum   res;
+       char            buf[256];
+
+       /* Convert to numeric */
+       snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn);
+       num = DirectFunctionCall3(numeric_in,
+                                                         CStringGetDatum(buf),
+                                                         ObjectIdGetDatum(0),
+                                                         Int32GetDatum(-1));
+
+       /* Add two numerics */
+       res = DirectFunctionCall2(numeric_add,
+                                                          
NumericGetDatum(num), nbytes);
+
+       /* Convert to pg_lsn */
+       return DirectFunctionCall1(numeric_pg_lsn, res);
+}
+
+/*
+ * Substract the number of bytes from pg_lsn, giving a new pg_lsn.
+ * Must handle both positive and negative numbers of bytes.
+ */
+Datum
+pg_lsn_mii(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      lsn = PG_GETARG_LSN(0);
+       Datum   nbytes = PG_GETARG_DATUM(1);
+       Datum   num;
+       Datum   res;
+       char            buf[256];
+
+       /* Convert to numeric */
+       snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn);
+       num = DirectFunctionCall3(numeric_in,
+                                                         CStringGetDatum(buf),
+                                                         ObjectIdGetDatum(0),
+                                                         Int32GetDatum(-1));
+
+       /* Add two numerics */
+       res = DirectFunctionCall2(numeric_sub,
+                                                          
NumericGetDatum(num), nbytes);
+
+       /* Convert to pg_lsn */
+       return DirectFunctionCall1(numeric_pg_lsn, res);
+}
diff --git a/src/include/catalog/pg_operator.dat 
b/src/include/catalog/pg_operator.dat
index 00ada7e48f..15dfa1497a 100644
--- a/src/include/catalog/pg_operator.dat
+++ b/src/include/catalog/pg_operator.dat
@@ -2909,6 +2909,17 @@
 { oid => '3228', descr => 'minus',
   oprname => '-', oprleft => 'pg_lsn', oprright => 'pg_lsn',
   oprresult => 'numeric', oprcode => 'pg_lsn_mi' },
+{ oid => '5025', descr => 'add',
+  oprname => '+', oprleft => 'pg_lsn', oprright => 'numeric',
+  oprresult => 'pg_lsn', oprcom => '+(numeric,pg_lsn)',
+  oprcode => 'pg_lsn_pli' },
+{ oid => '5026', descr => 'add',
+  oprname => '+', oprleft => 'numeric', oprright => 'pg_lsn',
+  oprresult => 'pg_lsn', oprcom => '+(pg_lsn,numeric)',
+  oprcode => 'numeric_pl_pg_lsn' },
+{ oid => '5027', descr => 'subtract',
+  oprname => '-', oprleft => 'pg_lsn', oprright => 'numeric',
+  oprresult => 'pg_lsn', oprcode => 'pg_lsn_mii' },
 
 # enum operators
 { oid => '3516', descr => 'equal',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4bce3ad8de..0eb94d92e3 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -4398,6 +4398,9 @@
 { oid => '1783', descr => 'convert numeric to int2',
   proname => 'int2', prorettype => 'int2', proargtypes => 'numeric',
   prosrc => 'numeric_int2' },
+{ oid => '6103', descr => 'convert numeric to pg_lsn',
+  proname => 'pg_lsn', prorettype => 'pg_lsn', proargtypes => 'numeric',
+  prosrc => 'numeric_pg_lsn' },
 
 { oid => '3556', descr => 'convert jsonb to boolean',
   proname => 'bool', prorettype => 'bool', proargtypes => 'jsonb',
@@ -8578,6 +8581,15 @@
 { oid => '4188', descr => 'smaller of two',
   proname => 'pg_lsn_smaller', prorettype => 'pg_lsn',
   proargtypes => 'pg_lsn pg_lsn', prosrc => 'pg_lsn_smaller' },
+{ oid => '5022',
+  proname => 'pg_lsn_pli', prorettype => 'pg_lsn',
+  proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_pli' },
+{ oid => '5023',
+  proname => 'numeric_pl_pg_lsn', prolang => 'sql', prorettype => 'pg_lsn',
+  proargtypes => 'numeric pg_lsn', prosrc => 'select $2 + $1' },
+{ oid => '5024',
+  proname => 'pg_lsn_mii', prorettype => 'pg_lsn',
+  proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_mii' },
 
 # enum related procs
 { oid => '3504', descr => 'I/O',
diff --git a/src/test/regress/expected/pg_lsn.out 
b/src/test/regress/expected/pg_lsn.out
index 64d41dfdad..7eb6a387b1 100644
--- a/src/test/regress/expected/pg_lsn.out
+++ b/src/test/regress/expected/pg_lsn.out
@@ -71,6 +71,52 @@ SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn;
         1
 (1 row)
 
+SELECT '0/16AE7F7'::pg_lsn + 16::numeric;
+ ?column?  
+-----------
+ 0/16AE807
+(1 row)
+
+SELECT 16::numeric + '0/16AE7F7'::pg_lsn;
+ ?column?  
+-----------
+ 0/16AE807
+(1 row)
+
+SELECT '0/16AE7F7'::pg_lsn - 16::numeric;
+ ?column?  
+-----------
+ 0/16AE7E7
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric;
+     ?column?      
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error
+ERROR:  pg_lsn out of range
+SELECT '0/1'::pg_lsn - 1::numeric;
+ ?column? 
+----------
+ 0/0
+(1 row)
+
+SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error
+ERROR:  pg_lsn out of range
+SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+     ?column?      
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - 
'0/0'::pg_lsn);
+ ?column? 
+----------
+ 0/0
+(1 row)
+
 -- Check btree and hash opclasses
 EXPLAIN (COSTS OFF)
 SELECT DISTINCT (i || '/' || j)::pg_lsn f
diff --git a/src/test/regress/sql/pg_lsn.sql b/src/test/regress/sql/pg_lsn.sql
index 2c143c82ff..74d9fb4e6a 100644
--- a/src/test/regress/sql/pg_lsn.sql
+++ b/src/test/regress/sql/pg_lsn.sql
@@ -27,6 +27,15 @@ SELECT '0/16AE7F7' < '0/16AE7F8'::pg_lsn;
 SELECT '0/16AE7F8' > pg_lsn '0/16AE7F7';
 SELECT '0/16AE7F7'::pg_lsn - '0/16AE7F8'::pg_lsn;
 SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn;
+SELECT '0/16AE7F7'::pg_lsn + 16::numeric;
+SELECT 16::numeric + '0/16AE7F7'::pg_lsn;
+SELECT '0/16AE7F7'::pg_lsn - 16::numeric;
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric;
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error
+SELECT '0/1'::pg_lsn - 1::numeric;
+SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error
+SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - 
'0/0'::pg_lsn);
 
 -- Check btree and hash opclasses
 EXPLAIN (COSTS OFF)

Reply via email to