On 2017/12/08 13:18, Huong Dangminh wrote:

Hi Sawada-san,

On Thu, Dec 7, 2017 at 11:07 AM, Masahiko Sawada <sawada.m...@gmail.com>
wrote:
On Thu, Dec 7, 2017 at 12:23 AM, Dang Minh Huong <kakalo...@gmail.com>
wrote:
Hi Sawada-san,

Sorry for my late response.

On 2017/12/05 0:11, Masahiko Sawada wrote:

There is one more case that user-defined data type is not supported
in Logical Replication.
That is when remote data type's name does not exist in SUBSCRIBE.

In relation.c:logicalrep_typmap_gettypname
We search OID in syscache by remote's data type name and mapping it,
if it does not exist in syscache We will be faced with the bellow
error.

         if (!OidIsValid(entry->typoid))
                 ereport(ERROR,

(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                  errmsg("data type \"%s.%s\" required
for logical replication does not exist",
                                                 entry->nspname,
entry->typname)));

I think, it is not necessary to check typoid here in order to avoid
above case, is that right?

I think it's not right. We should end up with an error in the case
where the same type name doesn't exist on subscriber. With your
proposed patch,
logicalrep_typmap_gettypname() can return an invalid string
(entry->typname) in that case, which can be a cause of SEGV.

Thanks, I think you are right.
# I thought that entry->typname was received from walsender, and it
is already be qualified in logicalrep_write_typ.
# But we also need check it in subscriber, because it could be lost
info in transmission.
Oops, the last sentence of my previous mail was wrong.
logicalrep_typmap_gettypname() doesn't return an invalid string since
entry->typname is initialized with a type name got from wal sender.
Yeah, so we do not need to check the existing of publisher's type name in subscriber.
After more thought, we might not need to raise an error even if there
is not the same data type on both publisher and subscriber. Because
data is sent after converted to the text representation and is
converted to a data type according to the local table definition
subscribers don't always need to have the same data type. If it's
right, slot_store_error_callback() doesn't need to find a
corresponding local data type OID but just finds the corresponding
type name by remote data type OID. What do you think?
I totally agree. It will make logical replication more flexible with data type.
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,9 @@ static dlist_head lsn_mapping =
DLIST_STATIC_INIT(lsn_mapping);

  typedef struct SlotErrCallbackArg
  {
-    LogicalRepRelation *rel;
-    int            attnum;
+    LogicalRepRelMapEntry *rel;
+    int            remote_attnum;
+    int            local_attnum;
  } SlotErrCallbackArg;

Since LogicalRepRelMapEntry has a map of local attributes to remote
ones we don't need to have two attribute numbers.

Sorry for the late reply.

Attached the patch incorporated what I have on mind. Please review it.
Thanks for the patch, I will do it at this weekend.
Your patch is fine for me.
But logicalrep_typmap_getid will be unused.
I attached the patch with removing of it.


---
Thanks and best regards,
Dang Minh Huong
diff --git a/src/backend/replication/logical/relation.c 
b/src/backend/replication/logical/relation.c
index 46e515e4b6..7858cecbb6 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -439,14 +439,13 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
 }
 
 /*
- * Fetch type info from the cache.
+ * Fetch type name from the cache by remote type OID.
  */
-Oid
-logicalrep_typmap_getid(Oid remoteid)
+char *
+logicalrep_typmap_gettypname(Oid remoteid)
 {
        LogicalRepTyp *entry;
        bool            found;
-       Oid                     nspoid;
 
        /* Internal types are mapped directly. */
        if (remoteid < FirstNormalObjectId)
@@ -455,7 +454,7 @@ logicalrep_typmap_getid(Oid remoteid)
                        ereport(ERROR,
                                        (errmsg("built-in type %u not found", 
remoteid),
                                         errhint("This can be caused by having 
a publisher with a higher PostgreSQL major version than the subscriber.")));
-               return remoteid;
+               return format_type_be(remoteid);
        }
 
        if (LogicalRepTypMap == NULL)
@@ -469,24 +468,7 @@ logicalrep_typmap_getid(Oid remoteid)
                elog(ERROR, "no type map entry for remote type %u",
                         remoteid);
 
-       /* Found and mapped, return the oid. */
-       if (OidIsValid(entry->typoid))
-               return entry->typoid;
+       Assert(OidIsValid(entry->remoteid));
 
-       /* Otherwise, try to map to local type. */
-       nspoid = LookupExplicitNamespace(entry->nspname, true);
-       if (OidIsValid(nspoid))
-               entry->typoid = GetSysCacheOid2(TYPENAMENSP,
-                                                                               
PointerGetDatum(entry->typname),
-                                                                               
ObjectIdGetDatum(nspoid));
-       else
-               entry->typoid = InvalidOid;
-
-       if (!OidIsValid(entry->typoid))
-               ereport(ERROR,
-                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("data type \"%s.%s\" required for 
logical replication does not exist",
-                                               entry->nspname, 
entry->typname)));
-
-       return entry->typoid;
+       return entry->typname;
 }
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index eedc3a8816..8c96982608 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -100,8 +100,8 @@ static dlist_head lsn_mapping = 
DLIST_STATIC_INIT(lsn_mapping);
 
 typedef struct SlotErrCallbackArg
 {
-       LogicalRepRelation *rel;
-       int                     attnum;
+       LogicalRepRelMapEntry *rel;
+       int                     local_attnum;
 } SlotErrCallbackArg;
 
 static MemoryContext ApplyMessageContext = NULL;
@@ -279,20 +279,32 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState 
*estate,
 static void
 slot_store_error_callback(void *arg)
 {
-       SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+       SlotErrCallbackArg      *errarg = (SlotErrCallbackArg *) arg;
+       LogicalRepRelMapEntry   *rel;
+       char                    *remotetypname;
+       int                     remote_attnum;
        Oid                     remotetypoid,
                                localtypoid;
 
-       if (errarg->attnum < 0)
+       rel = errarg->rel;
+       remote_attnum = rel->attrmap[errarg->local_attnum];
+
+       if (remote_attnum < 0)
                return;
 
-       remotetypoid = errarg->rel->atttyps[errarg->attnum];
-       localtypoid = logicalrep_typmap_getid(remotetypoid);
+       remotetypoid = rel->remoterel.atttyps[remote_attnum];
+
+       /* Fetch remote type name from the LogicalRepTypMap cache */
+       remotetypname = logicalrep_typmap_gettypname(remotetypoid);
+
+       /* Fetch local type OID from the local sys cache */
+       localtypoid = get_atttype(errarg->rel->localreloid, 
errarg->local_attnum + 1);
+
        errcontext("processing remote data for replication target relation 
\"%s.%s\" column \"%s\", "
-                          "remote type %s, local type %s",
-                          errarg->rel->nspname, errarg->rel->relname,
-                          errarg->rel->attnames[errarg->attnum],
-                          format_type_be(remotetypoid),
+                          "remote type \"%s\", local type \"%s\"",
+                          rel->remoterel.nspname, rel->remoterel.relname,
+                          rel->remoterel.attnames[remote_attnum],
+                          remotetypname,
                           format_type_be(localtypoid));
 }
 
@@ -313,8 +325,8 @@ slot_store_cstrings(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
        ExecClearTuple(slot);
 
        /* Push callback + info on the error context stack */
-       errarg.rel = &rel->remoterel;
-       errarg.attnum = -1;
+       errarg.rel = rel;
+       errarg.local_attnum = -1;
        errcallback.callback = slot_store_error_callback;
        errcallback.arg = (void *) &errarg;
        errcallback.previous = error_context_stack;
@@ -332,7 +344,7 @@ slot_store_cstrings(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
                        Oid                     typinput;
                        Oid                     typioparam;
 
-                       errarg.attnum = remoteattnum;
+                       errarg.local_attnum = i;
 
                        getTypeInputInfo(att->atttypid, &typinput, &typioparam);
                        slot->tts_values[i] = OidInputFunctionCall(typinput,
@@ -378,8 +390,8 @@ slot_modify_cstrings(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
        ExecClearTuple(slot);
 
        /* Push callback + info on the error context stack */
-       errarg.rel = &rel->remoterel;
-       errarg.attnum = -1;
+       errarg.rel = rel;
+       errarg.local_attnum = -1;
        errcallback.callback = slot_store_error_callback;
        errcallback.arg = (void *) &errarg;
        errcallback.previous = error_context_stack;
@@ -402,7 +414,7 @@ slot_modify_cstrings(TupleTableSlot *slot, 
LogicalRepRelMapEntry *rel,
                        Oid                     typinput;
                        Oid                     typioparam;
 
-                       errarg.attnum = remoteattnum;
+                       errarg.local_attnum = i;
 
                        getTypeInputInfo(att->atttypid, &typinput, &typioparam);
                        slot->tts_values[i] = OidInputFunctionCall(typinput,
diff --git a/src/include/replication/logicalrelation.h 
b/src/include/replication/logicalrelation.h
index 8352705650..563bb4f37d 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
                                         LOCKMODE lockmode);
 
 extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
-extern Oid     logicalrep_typmap_getid(Oid remoteid);
+extern char    *logicalrep_typmap_gettypname(Oid remoteid);
 
 #endif                                                 /* LOGICALRELATION_H */

Reply via email to