Alvaro Herrera wrote:

> Yeah, seems a reasonable idea to me.  Here's a tidied-up version of your
> patch, minus the regression test changes (I may end up committing that
> one separately).

... and patch.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 7e14d8afb10744508313a56a76c86b2a4c6b38f0 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 14 Mar 2018 14:15:28 -0300
Subject: [PATCH] logical replication: change how datatype mapping is used

---
 src/backend/replication/logical/relation.c | 90 ++++++++++--------------------
 src/backend/replication/logical/worker.c   | 63 +++++++++++++--------
 src/include/replication/logicalproto.h     |  7 +--
 src/include/replication/logicalrelation.h  |  2 +-
 4 files changed, 74 insertions(+), 88 deletions(-)

diff --git a/src/backend/replication/logical/relation.c 
b/src/backend/replication/logical/relation.c
index e492d26d18..1f20df5680 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -35,8 +35,6 @@ static MemoryContext LogicalRepRelMapContext = NULL;
 static HTAB *LogicalRepRelMap = NULL;
 static HTAB *LogicalRepTypMap = NULL;
 
-static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
-                                                               uint32 
hashvalue);
 
 /*
  * Relcache invalidation callback for our relation map cache.
@@ -115,8 +113,6 @@ logicalrep_relmap_init(void)
        /* Watch for invalidation events. */
        CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
                                                                  (Datum) 0);
-       CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
-                                                                 (Datum) 0);
 }
 
 /*
@@ -375,27 +371,6 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE 
lockmode)
        rel->localrel = NULL;
 }
 
-
-/*
- * Type cache invalidation callback for our type map cache.
- */
-static void
-logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
-{
-       HASH_SEQ_STATUS status;
-       LogicalRepTyp *entry;
-
-       /* Just to be sure. */
-       if (LogicalRepTypMap == NULL)
-               return;
-
-       /* invalidate all cache entries */
-       hash_seq_init(&status, LogicalRepTypMap);
-
-       while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
-               entry->typoid = InvalidOid;
-}
-
 /*
  * Free the type map cache entry data.
  */
@@ -404,8 +379,6 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
 {
        pfree(entry->nspname);
        pfree(entry->typname);
-
-       entry->typoid = InvalidOid;
 }
 
 /*
@@ -436,58 +409,53 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
        entry->nspname = pstrdup(remotetyp->nspname);
        entry->typname = pstrdup(remotetyp->typname);
        MemoryContextSwitchTo(oldctx);
-       entry->typoid = InvalidOid;
 }
 
 /*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
+ *
+ * Return a substitute value if we cannot find the data type; no message is
+ * sent to the log in that case, because this is used by error callback
+ * already.
  */
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
 {
        LogicalRepTyp *entry;
        bool            found;
-       Oid                     nspoid;
 
        /* Internal types are mapped directly. */
        if (remoteid < FirstNormalObjectId)
        {
                if (!get_typisdefined(remoteid))
-                       ereport(ERROR,
-                                       (errmsg("built-in type %u not found", 
remoteid),
-                                        errhint("This can be caused by having 
a publisher with a higher PostgreSQL major version than the subscriber.")));
-               return remoteid;
+               {
+                       /*
+                        * This can be caused by having a publisher with a 
higher
+                        * PostgreSQL major version than the subscriber.
+                        */
+                       return psprintf("unrecognized %u", remoteid);
+               }
+
+               return format_type_be(remoteid);
        }
 
        if (LogicalRepTypMap == NULL)
-               logicalrep_relmap_init();
+       {
+               /*
+                * If the typemap is not initialized yet, we cannot possibly 
attempt
+                * to search the hash table; but there's no way we know the type
+                * locally yet, since we haven't received a message about this 
type,
+                * so this is the best we can do.
+                */
+               return psprintf("unrecognized %u", remoteid);
+       }
 
-       /* Try finding the mapping. */
+       /* search the mapping */
        entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
                                                HASH_FIND, &found);
-
        if (!found)
-               elog(ERROR, "no type map entry for remote type %u",
-                        remoteid);
+               return psprintf("unrecognized %u", remoteid);
 
-       /* Found and mapped, return the oid. */
-       if (OidIsValid(entry->typoid))
-               return entry->typoid;
-
-       /* Otherwise, try to map to local type. */
-       nspoid = LookupExplicitNamespace(entry->nspname, true);
-       if (OidIsValid(nspoid))
-               entry->typoid = GetSysCacheOid2(TYPENAMENSP,
-                                                                               
PointerGetDatum(entry->typname),
-                                                                               
ObjectIdGetDatum(nspoid));
-       else
-               entry->typoid = InvalidOid;
-
-       if (!OidIsValid(entry->typoid))
-               ereport(ERROR,
-                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("data type \"%s.%s\" required for 
logical replication does not exist",
-                                               entry->nspname, 
entry->typname)));
-
-       return entry->typoid;
+       Assert(OidIsValid(entry->remoteid));
+       return psprintf("%s.%s", entry->nspname, entry->typname);
 }
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 04985c9f91..fdace7eea2 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping = 
DLIST_STATIC_INIT(lsn_mapping);
 
 typedef struct SlotErrCallbackArg
 {
-       LogicalRepRelation *rel;
-       int                     attnum;
+       LogicalRepRelMapEntry *rel;
+       int                     local_attnum;
+       int                     remote_attnum;
 } SlotErrCallbackArg;
 
 static MemoryContext ApplyMessageContext = NULL;
@@ -282,19 +283,29 @@ static void
 slot_store_error_callback(void *arg)
 {
        SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+       LogicalRepRelMapEntry *rel;
+       char       *remotetypname;
        Oid                     remotetypoid,
                                localtypoid;
 
-       if (errarg->attnum < 0)
+       /* Nothing to do if remote attribute number is not set */
+       if (errarg->remote_attnum < 0)
                return;
 
-       remotetypoid = errarg->rel->atttyps[errarg->attnum];
-       localtypoid = logicalrep_typmap_getid(remotetypoid);
+       rel = errarg->rel;
+       remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
+
+       /* Fetch remote type name from the LogicalRepTypMap cache */
+       remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+       /* Fetch local type OID from the local sys cache */
+       localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
+
        errcontext("processing remote data for replication target relation 
\"%s.%s\" column \"%s\", "
                           "remote type %s, local type %s",
-                          errarg->rel->nspname, errarg->rel->relname,
-                          errarg->rel->attnames[errarg->attnum],
-                          format_type_be(remotetypoid),
+                          rel->remoterel.nspname, rel->remoterel.relname,
+                          rel->remoterel.attnames[errarg->remote_attnum],
+                          remotetypname,
                           format_type_be(localtypoid));
 }
 
@@ -315,8 +326,9 @@ slot_store_cstrings(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
        ExecClearTuple(slot);
 
        /* Push callback + info on the error context stack */
-       errarg.rel = &rel->remoterel;
-       errarg.attnum = -1;
+       errarg.rel = rel;
+       errarg.local_attnum = -1;
+       errarg.remote_attnum = -1;
        errcallback.callback = slot_store_error_callback;
        errcallback.arg = (void *) &errarg;
        errcallback.previous = error_context_stack;
@@ -334,14 +346,17 @@ slot_store_cstrings(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
                        Oid                     typinput;
                        Oid                     typioparam;
 
-                       errarg.attnum = remoteattnum;
+                       errarg.local_attnum = i;
+                       errarg.remote_attnum = remoteattnum;
 
                        getTypeInputInfo(att->atttypid, &typinput, &typioparam);
-                       slot->tts_values[i] = OidInputFunctionCall(typinput,
-                                                                               
                           values[remoteattnum],
-                                                                               
                           typioparam,
-                                                                               
                           att->atttypmod);
+                       slot->tts_values[i] =
+                               OidInputFunctionCall(typinput, 
values[remoteattnum],
+                                                                        
typioparam, att->atttypmod);
                        slot->tts_isnull[i] = false;
+
+                       errarg.local_attnum = -1;
+                       errarg.remote_attnum = -1;
                }
                else
                {
@@ -380,8 +395,9 @@ slot_modify_cstrings(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
        ExecClearTuple(slot);
 
        /* Push callback + info on the error context stack */
-       errarg.rel = &rel->remoterel;
-       errarg.attnum = -1;
+       errarg.rel = rel;
+       errarg.local_attnum = -1;
+       errarg.remote_attnum = -1;
        errcallback.callback = slot_store_error_callback;
        errcallback.arg = (void *) &errarg;
        errcallback.previous = error_context_stack;
@@ -404,14 +420,17 @@ slot_modify_cstrings(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
                        Oid                     typinput;
                        Oid                     typioparam;
 
-                       errarg.attnum = remoteattnum;
+                       errarg.local_attnum = i;
+                       errarg.remote_attnum = remoteattnum;
 
                        getTypeInputInfo(att->atttypid, &typinput, &typioparam);
-                       slot->tts_values[i] = OidInputFunctionCall(typinput,
-                                                                               
                           values[remoteattnum],
-                                                                               
                           typioparam,
-                                                                               
                           att->atttypmod);
+                       slot->tts_values[i] =
+                               OidInputFunctionCall(typinput, 
values[remoteattnum],
+                                                                        
typioparam, att->atttypmod);
                        slot->tts_isnull[i] = false;
+
+                       errarg.local_attnum = -1;
+                       errarg.remote_attnum = -1;
                }
                else
                {
diff --git a/src/include/replication/logicalproto.h 
b/src/include/replication/logicalproto.h
index 0eb21057c5..116f16f42d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -55,10 +55,9 @@ typedef struct LogicalRepRelation
 /* Type mapping info */
 typedef struct LogicalRepTyp
 {
-       Oid                     remoteid;               /* unique id of the 
type */
-       char       *nspname;            /* schema name */
-       char       *typname;            /* name of the type */
-       Oid                     typoid;                 /* local type Oid */
+       Oid                     remoteid;               /* unique id of the 
remote type */
+       char       *nspname;            /* schema name of remote type */
+       char       *typname;            /* name of the remote type */
 } LogicalRepTyp;
 
 /* Transaction info */
diff --git a/src/include/replication/logicalrelation.h 
b/src/include/replication/logicalrelation.h
index d4250c2608..73e4805827 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
                                         LOCKMODE lockmode);
 
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid     logicalrep_typmap_getid(Oid remoteid);
+extern char *logicalrep_typmap_gettypname(Oid remoteid);
 
 #endif                                                 /* LOGICALRELATION_H */
-- 
2.11.0

Reply via email to