From 2b34be53d5d91920fa8b3366c73a4c624d98391c Mon Sep 17 00:00:00 2001
From: "kuroda.hayato%40jp.fujitsu.com" <kuroda.hayato@jp.fujitsu.com>
Date: Tue, 17 May 2022 08:03:31 +0000
Subject: [PATCH 1/2] (PoC) implement LRG

---
 src/Makefile                                |   1 +
 src/backend/catalog/Makefile                |   3 +-
 src/backend/postmaster/bgworker.c           |   7 +
 src/backend/postmaster/postmaster.c         |   3 +
 src/backend/replication/Makefile            |   4 +-
 src/backend/replication/libpqlrg/Makefile   |  38 ++
 src/backend/replication/libpqlrg/libpqlrg.c | 220 ++++++++
 src/backend/replication/lrg/Makefile        |  22 +
 src/backend/replication/lrg/lrg.c           | 417 ++++++++++++++
 src/backend/replication/lrg/lrg_launcher.c  | 323 +++++++++++
 src/backend/replication/lrg/lrg_worker.c    | 592 ++++++++++++++++++++
 src/backend/storage/ipc/ipci.c              |   2 +
 src/include/catalog/pg_lrg_info.h           |  47 ++
 src/include/catalog/pg_lrg_nodes.h          |  53 ++
 src/include/catalog/pg_lrg_pub.h            |  46 ++
 src/include/catalog/pg_lrg_sub.h            |  46 ++
 src/include/catalog/pg_proc.dat             |  25 +
 src/include/replication/libpqlrg.h          |  63 +++
 src/include/replication/lrg.h               |  67 +++
 src/test/regress/expected/oidjoins.out      |   6 +
 20 files changed, 1983 insertions(+), 2 deletions(-)
 create mode 100644 src/backend/replication/libpqlrg/Makefile
 create mode 100644 src/backend/replication/libpqlrg/libpqlrg.c
 create mode 100644 src/backend/replication/lrg/Makefile
 create mode 100644 src/backend/replication/lrg/lrg.c
 create mode 100644 src/backend/replication/lrg/lrg_launcher.c
 create mode 100644 src/backend/replication/lrg/lrg_worker.c
 create mode 100644 src/include/catalog/pg_lrg_info.h
 create mode 100644 src/include/catalog/pg_lrg_nodes.h
 create mode 100644 src/include/catalog/pg_lrg_pub.h
 create mode 100644 src/include/catalog/pg_lrg_sub.h
 create mode 100644 src/include/replication/libpqlrg.h
 create mode 100644 src/include/replication/lrg.h

diff --git a/src/Makefile b/src/Makefile
index 79e274a476..75db706762 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -23,6 +23,7 @@ SUBDIRS = \
 	interfaces \
 	backend/replication/libpqwalreceiver \
 	backend/replication/pgoutput \
+	backend/replication/libpqlrg \
 	fe_utils \
 	bin \
 	pl \
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index 89a0221ec9..744fdf4fb8 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -72,7 +72,8 @@ CATALOG_HEADERS := \
 	pg_collation.h pg_parameter_acl.h pg_partitioned_table.h \
 	pg_range.h pg_transform.h \
 	pg_sequence.h pg_publication.h pg_publication_namespace.h \
-	pg_publication_rel.h pg_subscription.h pg_subscription_rel.h
+	pg_publication_rel.h pg_subscription.h pg_subscription_rel.h \
+	pg_lrg_info.h pg_lrg_nodes.h pg_lrg_pub.h pg_lrg_sub.h
 
 GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h
 
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 40601aefd9..49d8ff1878 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -20,6 +20,7 @@
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
+#include "replication/lrg.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "storage/dsm.h"
@@ -128,6 +129,12 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"lrg_launcher_main", lrg_launcher_main
+	},
+	{
+		"lrg_worker_main", lrg_worker_main
 	}
 };
 
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 3b73e26956..b900008cdd 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -118,6 +118,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "replication/logicallauncher.h"
+#include "replication/lrg.h"
 #include "replication/walsender.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -1020,6 +1021,8 @@ PostmasterMain(int argc, char *argv[])
 	 */
 	ApplyLauncherRegister();
 
+	LrgLauncherRegister();
+
 	/*
 	 * process any libraries that should be preloaded at postmaster start
 	 */
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 3d8fb70c0e..49ffc243f6 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -35,7 +35,9 @@ OBJS = \
 	walreceiverfuncs.o \
 	walsender.o
 
-SUBDIRS = logical
+SUBDIRS = \
+	logical \
+	lrg
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/replication/libpqlrg/Makefile b/src/backend/replication/libpqlrg/Makefile
new file mode 100644
index 0000000000..72d911a918
--- /dev/null
+++ b/src/backend/replication/libpqlrg/Makefile
@@ -0,0 +1,38 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for src/backend/replication/lrg/libpqlrg
+#
+# IDENTIFICATION
+#    src/backend/replication/lrg/libpqlrg/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/lrg/libpqlrg
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS = \
+	$(WIN32RES) \
+	libpqlrg.o
+
+SHLIB_LINK_INTERNAL = $(libpq)
+SHLIB_LINK = $(filter -lintl, $(LIBS))
+SHLIB_PREREQS = submake-libpq
+PGFILEDESC = "libpqlrg"
+NAME = libpqlrg
+
+all: all-shared-lib
+
+include $(top_srcdir)/src/Makefile.shlib
+
+install: all installdirs install-lib
+
+installdirs: installdirs-lib
+
+uninstall: uninstall-lib
+
+clean distclean maintainer-clean: clean-lib
+	rm -f $(OBJS)
diff --git a/src/backend/replication/libpqlrg/libpqlrg.c b/src/backend/replication/libpqlrg/libpqlrg.c
new file mode 100644
index 0000000000..4bd8375be7
--- /dev/null
+++ b/src/backend/replication/libpqlrg/libpqlrg.c
@@ -0,0 +1,220 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpqlrg.c
+ *		  functions for lrg worker
+ *
+ *-------------------------------------------------------------------------
+ */
+
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "funcapi.h"
+#include "libpq-fe.h"
+#include "lib/stringinfo.h"
+#include "replication/libpqlrg.h"
+#include "replication/lrg.h"
+#include "utils/snapmgr.h"
+
+PG_MODULE_MAGIC;
+
+void		_PG_init(void);
+
+static void libpqlrg_connect(const char *connstring, PGconn **conn);
+static bool libpqlrg_check_group(PGconn *conn, const char *group_name);
+static void libpqlrg_copy_lrg_nodes(PGconn *remoteconn, PGconn *localconn);
+static void libpqlrg_insert_into_lrg_nodes(PGconn *remoteconn,
+										   const char *node_id, LRG_NODE_STATE status,
+										   const char *node_name, const char *local_connstring,
+										   const char *upstream_connstring);
+
+static void libpqlrg_create_subscription(const char *group_name, const char *publisher_connstring,
+										 const char *publisher_node_id, const char *subscriber_node_id,
+										 PGconn *subscriberconn, const char *options);
+static void libpqlrg_disconnect(PGconn *conn);
+
+static lrg_function_types PQLrgFunctionTypes =
+{
+	libpqlrg_connect,
+	libpqlrg_check_group,
+	libpqlrg_copy_lrg_nodes,
+	libpqlrg_insert_into_lrg_nodes,
+	libpqlrg_create_subscription,
+	libpqlrg_disconnect
+};
+
+/*
+ * Just a wrapper for PQconnectdb() and PQstatus().
+ */
+static void
+libpqlrg_connect(const char *connstring, PGconn **conn)
+{
+	elog(LOG, "given connstring: %s", connstring);
+	*conn = PQconnectdb(connstring);
+	if (PQstatus(*conn) != CONNECTION_OK)
+		elog(ERROR, "failed to connect");
+}
+
+static bool
+libpqlrg_check_group(PGconn *conn, const char *group_name)
+{
+	PGresult *result;
+	StringInfoData query;
+	bool ret;
+
+	Assert(PQstatus(conn) == CONNECTION_OK);
+	initStringInfo(&query);
+	appendStringInfo(&query, "SELECT COUNT(*) FROM pg_lrg_info WHERE groupname = '%s'", group_name);
+
+	result = PQexec(conn, query.data);
+
+	ret = atoi(PQgetvalue(result, 0, 0));
+	pfree(query.data);
+
+	return ret != 0;
+}
+
+/*
+ * Copy pg_lrg_nodes from remoteconn
+ */
+static void
+libpqlrg_copy_lrg_nodes(PGconn *remoteconn, PGconn *localconn)
+{
+	PGresult *result;
+	StringInfoData query;
+	int i, num_tuples;
+
+	Assert(PQstatus(remoteconn) == CONNECTION_OK
+		   && PQstatus(localconn) == CONNECTION_OK);
+	initStringInfo(&query);
+
+
+	/*
+	 * Note that COPY command cannot be used here because group_oid
+	 * might be different between remote and local.
+	 */
+	appendStringInfo(&query, "SELECT nodeid, status, nodename, "
+							 "localconn, upstreamconn FROM pg_lrg_nodes");
+	result = PQexec(remoteconn, query.data);
+	if (PQresultStatus(result) != PGRES_TUPLES_OK)
+		elog(ERROR, "failed to read pg_lrg_nodes");
+
+	resetStringInfo(&query);
+
+	num_tuples = PQntuples(result);
+
+	for(i = 0; i < num_tuples; i++)
+	{
+		char *node_id;
+		char *status;
+		char *nodename;
+		char *localconn;
+		char *upstreamconn;
+
+		node_id = PQgetvalue(result, i, 0);
+		status = PQgetvalue(result, i, 1);
+		nodename = PQgetvalue(result, i, 2);
+		localconn = PQgetvalue(result, i, 3);
+		upstreamconn = PQgetvalue(result, i, 4);
+
+		StartTransactionCommand();
+		(void) GetTransactionSnapshot();
+		/*
+		 * group_oid is adjusted to local value
+		 */
+		lrg_add_nodes(node_id, get_group_oid(), atoi(status), nodename, localconn, upstreamconn);
+		CommitTransactionCommand();
+	}
+}
+
+static void
+libpqlrg_insert_into_lrg_nodes(PGconn *remoteconn,
+							   const char *node_id, LRG_NODE_STATE status,
+							   const char *node_name, const char *local_connstring,
+							   const char *upstream_connstring)
+{
+	StringInfoData query;
+	PGresult *result;
+
+	Assert(PQstatus(remoteconn) == CONNECTION_OK
+		   && node_id != NULL
+		   && node_name != NULL
+		   && local_connstring != NULL
+		   && upstream_connstring != NULL);
+
+	initStringInfo(&query);
+	appendStringInfo(&query, "SELECT lrg_insert_into_nodes('%s', %d, '%s', '%s', '%s')",
+					 node_id, status, node_name, local_connstring, upstream_connstring);
+
+	result = PQexec(remoteconn, query.data);
+	if (PQresultStatus(result) != PGRES_TUPLES_OK)
+		elog(ERROR, "failed to execute libpqlrg_insert_to_remote_lrg_nodes: %s", query.data);
+	PQclear(result);
+
+	pfree(query.data);
+}
+
+
+static void
+libpqlrg_create_subscription(const char *group_name, const char *publisher_connstring,
+							 const char *publisher_node_id, const char *subscriber_node_id,
+							 PGconn *subscriberconn, const char *options)
+{
+	StringInfoData query, sub_name;
+	PGresult *result;
+
+	Assert(publisher_connstring != NULL && subscriberconn != NULL);
+
+	/*
+	 * the name of subscriber is just concat of two node_id.
+	 */
+	initStringInfo(&query);
+	initStringInfo(&sub_name);
+
+	/*
+	 * construct the name of subscription and query.
+	 */
+	appendStringInfo(&sub_name, "sub_%s_%s", subscriber_node_id, publisher_node_id);
+	appendStringInfo(&query, "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION pub_for_%s",
+					 sub_name.data, publisher_connstring, group_name);
+
+	if (options)
+		appendStringInfo(&query, " WITH (%s)", options);
+
+	result = PQexec(subscriberconn, query.data);
+	if (PQresultStatus(result) != PGRES_COMMAND_OK)
+		elog(ERROR, "failed to create subscription: %s", query.data);
+	PQclear(result);
+
+	resetStringInfo(&query);
+	appendStringInfo(&query, "SELECT lrg_insert_into_sub('%s')", sub_name.data);
+	result = PQexec(subscriberconn, query.data);
+	if (PQresultStatus(result) != PGRES_TUPLES_OK)
+		elog(ERROR, "failed to execute lrg_insert_into_sub: %s", query.data);
+	PQclear(result);
+
+	pfree(sub_name.data);
+	pfree(query.data);
+}
+
+
+/*
+ * Just a wrapper for PQfinish()
+ */
+static void
+libpqlrg_disconnect(PGconn *conn)
+{
+	PQfinish(conn);
+}
+
+/*
+ * Module initialization function
+ */
+void
+_PG_init(void)
+{
+	if (LrgFunctionTypes != NULL)
+		elog(ERROR, "libpqlrg already loaded");
+	LrgFunctionTypes = &PQLrgFunctionTypes;
+}
diff --git a/src/backend/replication/lrg/Makefile b/src/backend/replication/lrg/Makefile
new file mode 100644
index 0000000000..4ce929b6a4
--- /dev/null
+++ b/src/backend/replication/lrg/Makefile
@@ -0,0 +1,22 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for src/backend/replication/lrg
+#
+# IDENTIFICATION
+#    src/backend/replication/lrg/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/lrg
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS = \
+	lrg.o \
+	lrg_launcher.o \
+	lrg_worker.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/lrg/lrg.c b/src/backend/replication/lrg/lrg.c
new file mode 100644
index 0000000000..1580b9283f
--- /dev/null
+++ b/src/backend/replication/lrg/lrg.c
@@ -0,0 +1,417 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrg.c
+ *		  Constructs a logical replication group
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/relscan.h"
+#include "access/table.h"
+#include "access/xlog.h"
+#include "catalog/catalog.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_lrg_info.h"
+#include "catalog/pg_lrg_nodes.h"
+#include "catalog/pg_lrg_sub.h"
+#include "catalog/pg_subscription.h"
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "replication/libpqlrg.h"
+#include "replication/logicallauncher.h"
+#include "replication/lrg.h"
+#include "storage/lock.h"
+#include "utils/builtins.h"
+#include "utils/fmgrprotos.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+
+LrgPerdbCtxStruct *LrgPerdbCtx;
+
+static Size lrg_worker_array_size(void);
+static Oid lrg_add_info(char *group_name, bool puballtables);
+static Oid find_subscription(const char *subname);
+
+/*
+ * Helpler function for LrgLauncherShmemInit.
+ */
+static Size
+lrg_worker_array_size(void)
+{
+	Size size;
+
+	size = sizeof(LrgPerdbCtxStruct);
+	size = MAXALIGN(size);
+	/* XXX: for simplify the size of the array is set to max_worker_processes */
+	size = add_size(size, mul_size(max_worker_processes, sizeof(LrgPerdbCtxStruct)));
+
+	return size;
+}
+
+/*
+ * Allocate LrgPerdbCtxStruct to the shared memory.
+ */
+void
+LrgLauncherShmemInit(void)
+{
+	bool		found;
+
+	LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+	LrgPerdbCtx = (LrgPerdbCtxStruct *)
+		ShmemInitStruct("Lrg Launcher Data",
+						lrg_worker_array_size(),
+						&found);
+	if (!found)
+	{
+		MemSet(LrgPerdbCtx, 0, lrg_worker_array_size());
+		LWLockInitialize(&(LrgPerdbCtx->lock), LWLockNewTrancheId());
+	}
+	LWLockRelease(AddinShmemInitLock);
+	LWLockRegisterTranche(LrgPerdbCtx->lock.tranche, "lrg");
+}
+
+void
+LrgLauncherRegister(void)
+{
+	BackgroundWorker worker;
+
+	if (max_logical_replication_workers == 0)
+		return;
+
+	/*
+	 * Build struct BackgroundWorker for launcher.
+	 */
+	MemSet(&worker, 0, sizeof(BackgroundWorker));
+
+	snprintf(worker.bgw_name, BGW_MAXLEN, "lrg launcher");
+	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	snprintf(worker.bgw_library_name, BGW_MAXLEN, "postgres");
+	snprintf(worker.bgw_function_name, BGW_MAXLEN, "lrg_launcher_main");
+	RegisterBackgroundWorker(&worker);
+}
+
+/*
+ * construct node_id.
+ *
+ * TODO: construct proper node_id. Currently it is just concat of
+ * sytem identifier and dbid.
+ */
+void
+construct_node_id(char *out_node_id, int size)
+{
+	snprintf(out_node_id, size, UINT64_FORMAT "%u", GetSystemIdentifier(), MyDatabaseId);
+}
+
+/*
+ * Actual work for adding a tuple to pg_lrg_nodes.
+ */
+void
+lrg_add_nodes(char *node_id, Oid group_id, LRG_NODE_STATE status, char *node_name, char *local_connstring, char *upstream_connstring)
+{
+	Relation rel;
+	bool		nulls[Natts_pg_lrg_nodes];
+	Datum		values[Natts_pg_lrg_nodes];
+	HeapTuple tup;
+
+	Oid			lrgnodesoid;
+
+	rel = table_open(LrgNodesRelationId, ExclusiveLock);
+
+	memset(values, 0, sizeof(values));
+	memset(nulls, 0, sizeof(nulls));
+
+	lrgnodesoid = GetNewOidWithIndex(rel, LrgNodesRelationIndexId, Anum_pg_lrg_nodes_oid);
+	values[Anum_pg_lrg_nodes_oid - 1] = ObjectIdGetDatum(lrgnodesoid);
+	values[Anum_pg_lrg_nodes_nodeid - 1] = CStringGetDatum(node_id);
+	values[Anum_pg_lrg_nodes_groupid - 1] = ObjectIdGetDatum(group_id);
+	values[Anum_pg_lrg_nodes_status - 1] = Int32GetDatum(status);
+	values[Anum_pg_lrg_nodes_dbid - 1] = ObjectIdGetDatum(MyDatabaseId);
+	values[Anum_pg_lrg_nodes_nodename - 1] = CStringGetDatum(node_name);
+	values[Anum_pg_lrg_nodes_localconn - 1] = CStringGetDatum(local_connstring);
+
+	if (upstream_connstring != NULL)
+		values[Anum_pg_lrg_nodes_upstreamconn - 1] = CStringGetDatum(upstream_connstring);
+	else
+		nulls[Anum_pg_lrg_nodes_upstreamconn - 1] = true;
+
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+	/* Insert tuple into catalog. */
+	CatalogTupleInsert(rel, tup);
+	heap_freetuple(tup);
+	table_close(rel, ExclusiveLock);
+}
+
+/*
+ * read pg_lrg_info and get oid.
+ *
+ * XXX: This function assumes that there is only one tuple
+ * in thepg_lrg_info.
+ */
+Oid
+get_group_oid(void)
+{
+	Relation	rel;
+	HeapTuple tup;
+	TableScanDesc scan;
+	Oid group_oid = InvalidOid;
+	Form_pg_lrg_info infoform;
+	bool is_opened = false;
+
+	if (!IsTransactionState())
+	{
+		is_opened = true;
+		StartTransactionCommand();
+		(void) GetTransactionSnapshot();
+	}
+
+	rel = table_open(LrgNodesRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 0, NULL);
+	tup = heap_getnext(scan, ForwardScanDirection);
+
+	if (tup != NULL)
+	{
+		infoform = (Form_pg_lrg_info) GETSTRUCT(tup);
+		group_oid = infoform->oid;
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	if (is_opened)
+		CommitTransactionCommand();
+
+	return group_oid;
+}
+
+/*
+ * Actual work for adding a tuple to pg_lrg_info.
+ */
+static Oid
+lrg_add_info(char *group_name, bool puballtables)
+{
+	Relation	rel;
+	bool		nulls[Natts_pg_lrg_info];
+	Datum		values[Natts_pg_lrg_info];
+	HeapTuple tup;
+	Oid			lrgoid;
+
+	rel = table_open(LrgInfoRelationId, ExclusiveLock);
+
+	memset(values, 0, sizeof(values));
+	memset(nulls, 0, sizeof(nulls));
+
+	lrgoid = GetNewOidWithIndex(rel, LrgInfoRelationIndexId, Anum_pg_lrg_info_oid);
+	values[Anum_pg_lrg_info_oid - 1] = ObjectIdGetDatum(lrgoid);
+
+	values[Anum_pg_lrg_info_groupname - 1] = CStringGetDatum(group_name);
+
+	values[Anum_pg_lrg_info_puballtables - 1] = BoolGetDatum(puballtables);
+
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+	/* Insert tuple into catalog. */
+	CatalogTupleInsert(rel, tup);
+	heap_freetuple(tup);
+	table_close(rel, ExclusiveLock);
+
+	return lrgoid;
+}
+
+/*
+ * helper function for lrg_insert_into_sub
+ */
+static Oid
+find_subscription(const char *subname)
+{
+	/* for scannning */
+	Relation rel;
+	HeapTuple tup;
+	Form_pg_subscription form;
+
+	rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(subname));
+
+	if (!HeapTupleIsValid(tup))
+	{
+		table_close(rel, NoLock);
+		return InvalidOid;
+	}
+
+	form = (Form_pg_subscription) GETSTRUCT(tup);
+	table_close(rel, NoLock);
+
+	return form->oid;
+}
+
+/*
+ * ================================
+ * Public APIs
+ * ================================
+ */
+
+/*
+ * SQL function for creating a new logical replication group.
+ *
+ * This function adds a tuple to pg_lrg_info and pg_lrg_nodes,
+ * and after that kick lrg launcher.
+ */
+Datum
+lrg_create(PG_FUNCTION_ARGS)
+{
+	Oid			lrgoid;
+	char		*group_name;
+	char		*pub_type;
+	char		*local_connstring;
+	char		*node_name;
+
+	/* XXX: for simplify the fixed array is used */
+	char		node_id[64];
+
+	group_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	pub_type = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(1)));
+
+	if (pg_strcasecmp(pub_type, "FOR ALL TABLES") != 0)
+		elog(ERROR, "'only 'FOR ALL TABLES' is support");
+
+	lrgoid = lrg_add_info(group_name, true);
+
+	construct_node_id(node_id, sizeof(node_id));
+	local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2)));
+	node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3)));
+	lrg_add_nodes(node_id, lrgoid, LRG_STATE_INIT, node_name, local_connstring, NULL);
+
+	lrg_launcher_wakeup();
+	PG_RETURN_NULL();
+}
+
+
+/*
+ * SQL function for attaching to a specified group
+ *
+ * This function adds a tuple to pg_lrg_info and pg_lrg_nodes,
+ * and after that kick lrg launcher.
+ */
+Datum
+lrg_node_attach(PG_FUNCTION_ARGS)
+{
+	Oid			lrgoid;
+	char		*group_name;
+	char		*local_connstring;
+	char		*upstream_connstring;
+	char		*node_name;
+	PGconn		*upstreamconn = NULL;
+
+	/* XXX: for simplify the fixed array is used */
+	char		node_id[64];
+
+	group_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(1)));
+	upstream_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2)));
+	node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3)));
+
+	/*
+	 * For sanity check the backend process must connect to the upstream node.
+	 * libpqlrg shared library will be used for that.
+	 */
+	load_file("libpqlrg", false);
+	lrg_connect(upstream_connstring, &upstreamconn);
+	if (!lrg_check_group(upstreamconn, group_name))
+		elog(ERROR, "specified group is not exist");
+	lrg_disconnect(upstreamconn);
+
+	lrgoid = lrg_add_info(group_name, true);
+	construct_node_id(node_id, sizeof(node_id));
+	lrg_add_nodes(node_id, lrgoid, LRG_STATE_INIT, node_name, local_connstring, upstream_connstring);
+
+	lrg_launcher_wakeup();
+	PG_RETURN_NULL();
+}
+
+/*
+ * SQL function for detaching from a group
+ */
+Datum
+lrg_node_detach(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_NULL();
+}
+
+/*
+ * SQL function for dropping a group
+ */
+Datum
+lrg_drop(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_NULL();
+}
+
+/*
+ * This funciton is used internally: wrapper for adding a tuple into pg_lrg_sub
+ */
+Datum
+lrg_insert_into_sub(PG_FUNCTION_ARGS)
+{
+	char *sub_name;
+	Oid group_oid, sub_oid, lrgsub_oid;
+	Relation rel;
+	bool		nulls[Natts_pg_lrg_sub];
+	Datum		values[Natts_pg_lrg_sub];
+	HeapTuple tup;
+
+	sub_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+
+	group_oid = get_group_oid();
+	sub_oid = find_subscription(sub_name);
+
+	rel = table_open(LrgSubscriptionId, ExclusiveLock);
+
+	memset(values, 0, sizeof(values));
+	memset(nulls, 0, sizeof(nulls));
+
+	lrgsub_oid = GetNewOidWithIndex(rel, LrgSubscriptionOidIndexId, Anum_pg_lrg_sub_oid);
+
+	values[Anum_pg_lrg_sub_oid - 1] = ObjectIdGetDatum(lrgsub_oid);
+	values[Anum_pg_lrg_sub_groupid - 1] = ObjectIdGetDatum(group_oid);
+	values[Anum_pg_lrg_sub_subid - 1] = ObjectIdGetDatum(sub_oid);
+
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+	/* Insert tuple into catalog. */
+	CatalogTupleInsert(rel, tup);
+	heap_freetuple(tup);
+	table_close(rel, ExclusiveLock);
+
+	PG_RETURN_NULL();
+}
+
+/*
+ * This funciton is used internally: wrapper for adding a tuple into pg_lrg_nodes
+ */
+Datum
+lrg_insert_into_nodes(PG_FUNCTION_ARGS)
+{
+	char *node_id;
+	LRG_NODE_STATE status;
+	char *node_name;
+	char *local_connstring;
+	char *upstream_connstring;
+	Oid group_oid;
+
+	node_id = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	status = DatumGetInt32(PG_GETARG_DATUM(1));
+	node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2)));
+	local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3)));
+	upstream_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(4)));
+
+	group_oid = get_group_oid();
+
+	lrg_add_nodes(node_id, group_oid, status, node_name, local_connstring, upstream_connstring);
+
+	PG_RETURN_NULL();
+}
diff --git a/src/backend/replication/lrg/lrg_launcher.c b/src/backend/replication/lrg/lrg_launcher.c
new file mode 100644
index 0000000000..d0cbe36515
--- /dev/null
+++ b/src/backend/replication/lrg/lrg_launcher.c
@@ -0,0 +1,323 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrg_launcher.c
+ *		  functions for lrg launcher
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/heapam.h"
+#include "access/relscan.h"
+#include "access/table.h"
+#include "catalog/pg_database.h"
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/interrupt.h"
+#include "replication/logicallauncher.h"
+#include "replication/lrg.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "tcop/tcopprot.h"
+#include "utils/memutils.h"
+#include "utils/snapmgr.h"
+
+static void launch_lrg_worker(Oid dbid);
+static LrgPerdbWorker* find_perdb_worker(Oid dbid);
+static List* get_db_list(void);
+static void scan_and_launch(void);
+static void lrglauncher_worker_onexit(int code, Datum arg);
+
+static bool ishook_registered = false;
+static bool isworker_needed = false;
+
+typedef struct db_list_cell
+{
+	Oid dbid;
+	char *dbname;
+} db_list_cell;
+
+/*
+ * Launch a per-db worker that related with the given database
+ */
+static void
+launch_lrg_worker(Oid dbid)
+{
+	BackgroundWorker bgw;
+	LrgPerdbWorker *worker = NULL;
+	int slot = 0;
+
+	LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE);
+
+	/*
+	 * Find a free worker slot.
+	 */
+	for (int i = 0; i < max_logical_replication_workers; i++)
+	{
+		LrgPerdbWorker *pw = &LrgPerdbCtx->workers[i];
+
+		if (pw->dbid == InvalidOid)
+		{
+			worker = pw;
+			slot = i;
+			break;
+		}
+	}
+
+	/*
+	 * If there are no more free worker slots, raise an ERROR now.
+	 *
+	 * TODO: cleanup the array?
+	 */
+	if (worker == NULL)
+	{
+		LWLockRelease(&LrgPerdbCtx->lock);
+		ereport(ERROR,
+				errmsg("out of worker slots"));
+	}
+
+
+	/* Prepare the worker slot. */
+	worker->dbid = dbid;
+
+	LWLockRelease(&LrgPerdbCtx->lock);
+
+	MemSet(&bgw, 0, sizeof(BackgroundWorker));
+
+	snprintf(bgw.bgw_name, BGW_MAXLEN, "lrg worker for database %u", dbid);
+	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "lrg_worker_main");
+	bgw.bgw_main_arg = UInt32GetDatum(slot);
+
+	if (!RegisterDynamicBackgroundWorker(&bgw, NULL))
+	{
+		/* Failed to start worker, so clean up the worker slot. */
+		LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE);
+		lrg_worker_cleanup(worker);
+		LWLockRelease(&LrgPerdbCtx->lock);
+		ereport(ERROR,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("out of worker slots")));
+	}
+}
+
+/*
+ * Find a per-db worker that related with the given database
+ */
+static LrgPerdbWorker*
+find_perdb_worker(Oid dbid)
+{
+	int i;
+
+	Assert(LWLockHeldByMe(&LrgPerdbCtx->lock));
+
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LrgPerdbWorker *worker = &LrgPerdbCtx->workers[i];
+		if (worker->dbid == dbid)
+			return worker;
+	}
+	return NULL;
+}
+
+/*
+ * Load the list of databases.
+ */
+static List*
+get_db_list()
+{
+	List *res = NIL;
+	Relation	rel;
+	TableScanDesc scan;
+	HeapTuple	tup;
+	/* We will allocate the output data in the current memory context */
+	MemoryContext resultcxt = CurrentMemoryContext;
+
+	StartTransactionCommand();
+	(void) GetTransactionSnapshot();
+
+	rel = table_open(DatabaseRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tup);
+		db_list_cell *cell;
+		MemoryContext oldcxt;
+
+		/* skip if connection is not allowed */
+		if (!dbform->datallowconn)
+			continue;
+
+		/*
+		 * Allocate our results in the caller's context, not the transaction's.
+		 */
+		oldcxt = MemoryContextSwitchTo(resultcxt);
+
+		cell = (db_list_cell *) palloc0(sizeof(db_list_cell));
+		cell->dbid = dbform->oid;
+		cell->dbname = pstrdup(NameStr(dbform->datname));
+		res = lappend(res, cell);
+
+		MemoryContextSwitchTo(oldcxt);
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+	CommitTransactionCommand();
+
+	return res;
+}
+
+/*
+ * Scan pg_lrg_nodes and launch or notify to per-db worker if needed.
+ */
+static void
+scan_and_launch(void)
+{
+	List *list;
+	ListCell   *lc;
+	MemoryContext subctx;
+	MemoryContext oldctx;
+
+	subctx = AllocSetContextCreate(TopMemoryContext,
+									"Lrg Launcher list",
+									ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(subctx);
+
+	/* search for lrg nodes to start */
+	list = get_db_list();
+
+	foreach(lc, list)
+	{
+		db_list_cell *cell = (db_list_cell *)lfirst(lc);
+		LrgPerdbWorker *worker;
+
+		LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE);
+		worker = find_perdb_worker(cell->dbid);
+		LWLockRelease(&LrgPerdbCtx->lock);
+
+		if (worker != NULL)
+			continue;
+
+		launch_lrg_worker(cell->dbid);
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(subctx);
+}
+
+
+/*
+ * Callback for process exit. cleanup the controller
+ */
+static void
+lrglauncher_worker_onexit(int code, Datum arg)
+{
+	LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE);
+	LrgPerdbCtx->launcher_pid = InvalidPid;
+	LrgPerdbCtx->launcher_latch = NULL;
+	LWLockRelease(&LrgPerdbCtx->lock);
+}
+
+/*
+ * Entry point for lrg launcher
+ */
+void
+lrg_launcher_main(Datum arg)
+{
+	Assert(LrgPerdbCtx->launcher_pid == 0);
+	LrgPerdbCtx->launcher_pid = MyProcPid;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
+
+	/*
+	 * Register my latch to the controller
+	 * for receiving notifications from per-db background worker.
+	 */
+	LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE);
+	LrgPerdbCtx->launcher_latch = &MyProc->procLatch;
+	LrgPerdbCtx->launcher_pid = MyProcPid;
+	LWLockRelease(&LrgPerdbCtx->lock);
+	before_shmem_exit(lrglauncher_worker_onexit, (Datum) 0);
+	ResetLatch(&MyProc->procLatch);
+
+	/*
+	 * we did not connect specific database, because this
+	 * will read only pg_database
+	 */
+	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+
+	/*
+	 * main loop
+	 */
+	for (;;)
+	{
+		int rc = 0;
+
+		CHECK_FOR_INTERRUPTS();
+#define TEMPORARY_NAP_TIME 180000L
+
+		rc = WaitLatch(&MyProc->procLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					   TEMPORARY_NAP_TIME, 0);
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(&MyProc->procLatch);
+			CHECK_FOR_INTERRUPTS();
+			scan_and_launch();
+		}
+
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+	}
+	/* Not reachable */
+}
+
+/*
+ * Launches per-db worker if needed.
+ */
+static void
+lrg_perdb_wakeup_callback(XactEvent event, void *arg)
+{
+	switch (event)
+	{
+		case XACT_EVENT_COMMIT:
+			if (isworker_needed)
+			{
+				LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE);
+				SetLatch(LrgPerdbCtx->launcher_latch);
+				LWLockRelease(&LrgPerdbCtx->lock);
+			}
+			isworker_needed = false;
+			break;
+		default:
+			break;
+	}
+}
+
+/*
+ * Register a callback for notifying to launcher.
+ */
+void
+lrg_launcher_wakeup(void)
+{
+	if (!ishook_registered)
+	{
+		RegisterXactCallback(lrg_perdb_wakeup_callback, NULL);
+		ishook_registered = true;
+	}
+	isworker_needed = true;
+}
diff --git a/src/backend/replication/lrg/lrg_worker.c b/src/backend/replication/lrg/lrg_worker.c
new file mode 100644
index 0000000000..785ab851c8
--- /dev/null
+++ b/src/backend/replication/lrg/lrg_worker.c
@@ -0,0 +1,592 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrg_worker.c
+ *		  functions for lrg worker
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/relscan.h"
+#include "access/table.h"
+#include "catalog/catalog.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_lrg_info.h"
+#include "catalog/pg_lrg_nodes.h"
+#include "catalog/pg_lrg_pub.h"
+#include "catalog/pg_publication.h"
+#include "executor/spi.h"
+#include "libpq-fe.h"
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/interrupt.h"
+#include "replication/libpqlrg.h"
+#include "replication/lrg.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "tcop/tcopprot.h"
+#include "utils/fmgroids.h"
+#include "utils/memutils.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+
+typedef struct LrgNode {
+	Oid	  group_oid;
+	char *node_id;
+	char *node_name;
+	char *local_connstring;
+	char *upstream_connstring;
+} LrgNode;
+
+lrg_function_types *LrgFunctionTypes = NULL;
+
+static LrgPerdbWorker* my_lrg_worker = NULL;
+
+static void lrg_worker_onexit(int code, Datum arg);
+static void do_node_management(void);
+
+static void get_node_information(LrgNode *node, LRG_NODE_STATE *status);
+static void advance_state_machine(LrgNode *node, LRG_NODE_STATE initial_status);
+
+static void create_publication(const char* group_name, const char* node_id, Oid group_oid);
+static Oid find_publication(const char *pubname);
+
+static List* get_lrg_nodes_list(const char *local_nodeid);
+
+static void synchronise_system_tables(PGconn *localconn, PGconn *upstreamconn, char *local_connstring);
+static void get_group_name(char **group_name, Oid group_oid);
+static void update_mynode(LRG_NODE_STATE state);
+
+void
+lrg_worker_cleanup(LrgPerdbWorker *worker)
+{
+	Assert(LWLockHeldByMeInMode(&LrgPerdbCtx->lock, LW_EXCLUSIVE));
+
+	worker->dbid = InvalidOid;
+	worker->worker_pid = InvalidPid;
+	worker->worker_latch = NULL;
+}
+
+/*
+ * Callback for process exit. cleanup the array.
+ */
+static void
+lrg_worker_onexit(int code, Datum arg)
+{
+	LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE);
+	lrg_worker_cleanup(my_lrg_worker);
+	LWLockRelease(&LrgPerdbCtx->lock);
+}
+
+/*
+ * Synchronise system tables from upstream node.
+ *
+ * Currently it will read and insert pg_lrg_nodes only.
+ */
+static void
+synchronise_system_tables(PGconn *localconn, PGconn *upstreamconn, char *local_connstring)
+{
+	lrg_copy_lrg_nodes(upstreamconn, localconn);
+}
+
+/*
+ * Load the list of lrg_nodes.
+ */
+static List*
+get_lrg_nodes_list(const char *local_nodeid)
+{
+	List *res = NIL;
+	Relation	rel;
+	TableScanDesc scan;
+	HeapTuple	tup;
+	/* We will allocate the output data in the current memory context */
+	MemoryContext resultcxt = CurrentMemoryContext;
+
+	StartTransactionCommand();
+	(void) GetTransactionSnapshot();
+
+	rel = table_open(LrgNodesRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_lrg_nodes nodesform = (Form_pg_lrg_nodes) GETSTRUCT(tup);
+		LrgNode *node;
+		MemoryContext oldcxt;
+
+		if (strcmp(NameStr(nodesform->nodeid), local_nodeid) == 0)
+			continue;
+		/*
+		 * Allocate our results in the caller's context, not the transaction's.
+		 */
+		oldcxt = MemoryContextSwitchTo(resultcxt);
+
+		node = (LrgNode *)palloc0(sizeof(LrgNode));
+		node->group_oid = nodesform->groupid;
+		node->node_id = NameStr(nodesform->nodeid);
+		node->node_name = NameStr(nodesform->nodename);
+		node->local_connstring = NameStr(nodesform->localconn);
+		node->upstream_connstring = NameStr(nodesform->upstreamconn);
+		res = lappend(res, node);
+
+		MemoryContextSwitchTo(oldcxt);
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+	CommitTransactionCommand();
+
+	return res;
+}
+
+/*
+ * get group_name from local pg_lrg_info.
+ * The second argument is used for the key.
+ *
+ * XXX: In this version this function may be not needed
+ * because one node can join only one group.
+ */
+static void
+get_group_name(char **group_name, Oid group_oid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	SysScanDesc scandesc;
+	ScanKeyData entry[1];
+	Form_pg_lrg_info infoform;
+	MemoryContext old;
+
+	StartTransactionCommand();
+	(void) GetTransactionSnapshot();
+
+	Assert(group_name != NULL);
+	/*
+	 * Read a related tuple from pg_lrg_info.
+	 * TODO: use index scan instead?
+	 */
+	rel = table_open(LrgInfoRelationId, AccessShareLock);
+	ScanKeyInit(&entry[0],
+				Anum_pg_lrg_info_oid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				DatumGetObjectId(group_oid));
+
+	scandesc = systable_beginscan(rel, LrgInfoRelationIndexId, true,
+									NULL, 1, entry);
+	tup = systable_getnext(scandesc);
+
+	Assert(HeapTupleIsValid(tup));
+	infoform = (Form_pg_lrg_info) GETSTRUCT(tup);
+	old = MemoryContextSwitchTo(TopMemoryContext);
+	*group_name = pstrdup(NameStr(infoform->groupname));
+
+	MemoryContextSwitchTo(old);
+
+	systable_endscan(scandesc);
+	table_close(rel, AccessShareLock);
+
+	CommitTransactionCommand();
+}
+
+/*
+ * Update the node information myself to specified state
+ */
+static void
+update_mynode(LRG_NODE_STATE state)
+{
+	StringInfoData query;
+	int ret;
+
+	initStringInfo(&query);
+	appendStringInfo(&query, "UPDATE pg_lrg_nodes SET status = ");
+
+	switch (state)
+	{
+		case LRG_STATE_CREATE_PUBLICATION:
+			appendStringInfo(&query, "%d ", LRG_STATE_CREATE_PUBLICATION);
+			break;
+		case LRG_STATE_CREATE_SUBSCRIPTION:
+			appendStringInfo(&query, "%d", LRG_STATE_CREATE_SUBSCRIPTION);
+			break;
+		case LRG_STATE_READY:
+			appendStringInfo(&query, "%d", LRG_STATE_READY);
+			break;
+		default:
+			elog(ERROR, "not implemented yet");
+	}
+
+	appendStringInfo(&query, " WHERE dbid = %d", my_lrg_worker->dbid);
+
+	StartTransactionCommand();
+	SPI_connect();
+	PushActiveSnapshot(GetTransactionSnapshot());
+
+	ret = SPI_execute(query.data, false, 0);
+	if (ret != SPI_OK_UPDATE)
+		elog(ERROR, "SPI error while updating a table");
+
+	PopActiveSnapshot();
+	SPI_finish();
+	CommitTransactionCommand();
+
+	pfree(query.data);
+}
+
+static Oid
+find_publication(const char *pubname)
+{
+	/* for scannning */
+	Relation rel;
+	HeapTuple tup;
+	Form_pg_publication pubform;
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	/* Check if name is used */
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(pubname));
+
+	if (!HeapTupleIsValid(tup))
+	{
+		table_close(rel, NoLock);
+		return InvalidOid;
+	}
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+	table_close(rel, NoLock);
+
+	return pubform->oid;
+}
+
+/*
+ * Create publication via SPI interface.
+ */
+static void
+create_publication(const char* group_name, const char* node_id, Oid group_oid)
+{
+	int ret;
+	StringInfoData query, pub_name;
+	Oid pub_oid;
+	Oid lrgpub_oid;
+	Relation rel;
+	bool		nulls[Natts_pg_lrg_pub];
+	Datum		values[Natts_pg_lrg_pub];
+	HeapTuple tup;
+
+	initStringInfo(&query);
+	initStringInfo(&pub_name);
+
+	StartTransactionCommand();
+	SPI_connect();
+	PushActiveSnapshot(GetTransactionSnapshot());
+
+
+	appendStringInfo(&pub_name, "pub_for_%s", group_name);
+	appendStringInfo(&query, "CREATE PUBLICATION %s %s", pub_name.data, "FOR ALL TABLES");
+
+	ret = SPI_execute(query.data, false, 0);
+	if (ret != SPI_OK_UTILITY)
+		elog(ERROR, "SPI error while creating publication");
+
+	PopActiveSnapshot();
+	SPI_finish();
+	CommitTransactionCommand();
+
+	StartTransactionCommand();
+	(void) GetTransactionSnapshot();
+
+	pub_oid = find_publication(pub_name.data);
+	if (pub_oid == InvalidOid)
+		elog(ERROR, "publication is not found");
+
+	rel = table_open(LrgPublicationId, ExclusiveLock);
+
+	memset(nulls, 0, sizeof(nulls));
+	memset(values, 0, sizeof(values));
+
+	lrgpub_oid = GetNewOidWithIndex(rel, LrgPublicationOidIndexId, Anum_pg_lrg_pub_oid);
+
+	values[Anum_pg_lrg_pub_oid - 1] = ObjectIdGetDatum(lrgpub_oid);
+	values[Anum_pg_lrg_pub_groupid - 1] = ObjectIdGetDatum(group_oid);
+	values[Anum_pg_lrg_pub_pubid - 1] = ObjectIdGetDatum(pub_oid);
+
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+	/* Insert tuple into catalog. */
+	CatalogTupleInsert(rel, tup);
+	heap_freetuple(tup);
+	table_close(rel, ExclusiveLock);
+
+	CommitTransactionCommand();
+
+	pfree(pub_name.data);
+	pfree(query.data);
+}
+
+
+/*
+ * advance the state machine
+ */
+static void
+advance_state_machine(LrgNode *local_node, LRG_NODE_STATE initial_status)
+{
+	PGconn *localconn = NULL;
+	PGconn *upstreamconn = NULL;
+	char *group_name = NULL;
+
+	LRG_NODE_STATE state = initial_status;
+
+	if (state == LRG_STATE_INIT)
+	{
+		/* Establish connection if we are in the attaching case */
+		if (local_node->upstream_connstring != NULL)
+		{
+			load_file("libpqlrg", false);
+			lrg_connect(local_node->upstream_connstring, &upstreamconn);
+			lrg_connect(local_node->local_connstring, &localconn);
+			synchronise_system_tables(localconn, upstreamconn, local_node->local_connstring);
+		}
+
+		get_group_name(&group_name, local_node->group_oid);
+		elog(LOG, "set_name: %s", group_name);
+
+		create_publication(group_name, local_node->node_id, local_node->group_oid);
+
+		state = LRG_STATE_CREATE_PUBLICATION;
+		update_mynode(LRG_STATE_CREATE_PUBLICATION);
+	}
+
+	if (state == LRG_STATE_CREATE_PUBLICATION)
+	{
+		if (local_node->upstream_connstring != NULL)
+		{
+			List *list;
+			ListCell   *lc;
+			MemoryContext subctx;
+			MemoryContext oldctx;
+
+			subctx = AllocSetContextCreate(TopMemoryContext,
+											"Lrg Launcher list",
+											ALLOCSET_DEFAULT_SIZES);
+			oldctx = MemoryContextSwitchTo(subctx);
+
+			list = get_lrg_nodes_list(local_node->node_id);
+
+			foreach(lc, list)
+			{
+				LrgNode *other_node = (LrgNode *)lfirst(lc);
+				PGconn *otherconn = NULL;
+				lrg_connect(other_node->local_connstring, &otherconn);
+				lrg_create_subscription(group_name, local_node->local_connstring,
+										local_node->node_id, other_node->node_id,
+										otherconn, "local_only = true, copy_data = false");
+				lrg_create_subscription(group_name, other_node->local_connstring,
+										other_node->node_id, local_node->node_id,
+										localconn, "local_only = true, copy_data = false");
+
+				/*
+				 * XXX: adding a tuple into remote's pg_lrg_nodes here,
+				 * but it is bad. it should be end of this function.
+				 */
+				if (local_node->upstream_connstring != NULL)
+					lrg_insert_into_lrg_nodes(otherconn, local_node->node_id,
+							LRG_STATE_READY, local_node->node_name,
+							local_node->local_connstring, local_node->upstream_connstring);
+				lrg_disconnect(otherconn);
+			}
+			MemoryContextSwitchTo(oldctx);
+			MemoryContextDelete(subctx);
+		}
+
+		state = LRG_STATE_CREATE_SUBSCRIPTION;
+		update_mynode(LRG_STATE_CREATE_SUBSCRIPTION);
+	}
+
+	state = LRG_STATE_READY;
+	update_mynode(LRG_STATE_READY);
+
+	/*
+	 * clean up phase
+	 */
+	if (localconn != NULL)
+		lrg_disconnect(localconn);
+	if (upstreamconn != NULL)
+		lrg_disconnect(upstreamconn);
+	if (group_name != NULL)
+		pfree(group_name);
+}
+
+/*
+ * Get node-specific information from pg_lrg_nodes.
+ */
+static void
+get_node_information(LrgNode *node, LRG_NODE_STATE *status)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool found = false;
+	char local_node_id[64];
+	SysScanDesc scandesc;
+	ScanKeyData entry[1];
+
+	construct_node_id(local_node_id, sizeof(local_node_id));
+
+	/*
+	 * Read a related tuple from pg_lrg_nodes.
+	 * TODO: use index scan instead
+	 */
+	StartTransactionCommand();
+	(void) GetTransactionSnapshot();
+
+	rel = table_open(LrgNodesRelationId, AccessShareLock);
+
+	ScanKeyInit(&entry[0],
+				Anum_pg_lrg_nodes_nodeid,
+				BTEqualStrategyNumber, F_NAMEEQ,
+				CStringGetDatum(local_node_id));
+
+	scandesc = systable_beginscan(rel, LrgNodeIdIndexId, true,
+								  NULL, 1, entry);
+
+	tup = systable_getnext(scandesc);
+
+	/* We assume that there can be at most one matching tuple */
+	if (HeapTupleIsValid(tup))
+	{
+		MemoryContext old;
+		Form_pg_lrg_nodes nodesform = (Form_pg_lrg_nodes) GETSTRUCT(tup);
+
+		old = MemoryContextSwitchTo(TopMemoryContext);
+		node->group_oid = nodesform->groupid;
+		node->node_id = pstrdup(NameStr(nodesform->nodeid));
+		node->node_name = pstrdup(NameStr(nodesform->nodename));
+		node->local_connstring = pstrdup(NameStr(nodesform->localconn));
+		if (strlen(NameStr(nodesform->upstreamconn)) != 0)
+			node->upstream_connstring = pstrdup(NameStr(nodesform->upstreamconn));
+		else
+			node->upstream_connstring = NULL;
+		*status = nodesform->status;
+		found = true;
+		MemoryContextSwitchTo(old);
+	}
+
+	systable_endscan(scandesc);
+
+	table_close(rel, AccessShareLock);
+	CommitTransactionCommand();
+
+	if (!found)
+		elog(ERROR, "no tuples found");
+
+}
+
+static void
+do_node_management(void)
+{
+	LrgNode node;
+	LRG_NODE_STATE status;
+	/*
+	 * read information from pg_lrg_nodes
+	 */
+	get_node_information(&node, &status);
+	elog(DEBUG3, "initial status of %u: %d", my_lrg_worker->dbid, status);
+
+	/*
+	 * advance the state machine for creating or attaching.
+	 *
+	 * TODO: consider detaching case
+	 */
+	advance_state_machine(&node, status);
+
+	pfree(node.node_id);
+	pfree(node.node_name);
+	pfree(node.local_connstring);
+	if (node.upstream_connstring != NULL)
+		pfree(node.upstream_connstring);
+}
+
+/*
+ * Entry point for lrg worker
+ */
+void
+lrg_worker_main(Datum arg)
+{
+	int slot = DatumGetInt32(arg);
+
+	/* Establish signal handlers. */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
+
+	/*
+	 * Get information from the controller. The idex
+	 * is given as the argument
+	 */
+	LWLockAcquire(&LrgPerdbCtx->lock, LW_SHARED);
+	my_lrg_worker = &LrgPerdbCtx->workers[slot];
+	my_lrg_worker->worker_pid = MyProcPid;
+	my_lrg_worker->worker_latch = &MyProc->procLatch;
+	LWLockRelease(&LrgPerdbCtx->lock);
+
+	before_shmem_exit(lrg_worker_onexit, (Datum) 0);
+
+	BackgroundWorkerInitializeConnectionByOid(my_lrg_worker->dbid, 0, 0);
+
+	elog(DEBUG3, "per-db worker for %u was launched", my_lrg_worker->dbid);
+
+	/*
+	 * The launcher launches the worker without considering
+	 * the existence of lrg related data.
+	 * So firstly workers must check their catalogs, and exit
+	 * if there is no data.
+	 * In any cases pg_lrg_info will have tuples if
+	 * this node is in a node group, so we reads it.
+	 */
+	if (!get_group_oid())
+	{
+		elog(DEBUG3, "This database %u is not a member of lrg", MyDatabaseId);
+		proc_exit(0);
+	}
+
+	do_node_management();
+
+	ResetLatch(&MyProc->procLatch);
+
+	/*
+	 * Wait for detaching or removing.
+	 */
+	for (;;)
+	{
+		int rc;
+		bool is_latch_set = false;
+
+		CHECK_FOR_INTERRUPTS();
+
+#define TEMPORARY_NAP_TIME 180000L
+
+		rc = WaitLatch(&MyProc->procLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					   TEMPORARY_NAP_TIME, 0);
+
+		if (rc & WL_LATCH_SET)
+		{
+			is_latch_set = true;
+			ResetLatch(&MyProc->procLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		if (is_latch_set)
+		{
+			do_node_management();
+			is_latch_set = false;
+		}
+	}
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 26372d95b3..15b77405bc 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -32,6 +32,7 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
+#include "replication/lrg.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -284,6 +285,7 @@ CreateSharedMemoryAndSemaphores(void)
 	WalRcvShmemInit();
 	PgArchShmemInit();
 	ApplyLauncherShmemInit();
+	LrgLauncherShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/include/catalog/pg_lrg_info.h b/src/include/catalog/pg_lrg_info.h
new file mode 100644
index 0000000000..0067aac389
--- /dev/null
+++ b/src/include/catalog/pg_lrg_info.h
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_lrg_info.h
+ *	  definition of the "logical replication group information" system
+ *	  catalog (pg_lrg_info)
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_lrg_info.h
+ *
+ * NOTES
+ *	  The Catalog.pm module reads this file and derives schema
+ *	  information.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LRG_INFO_H
+#define PG_LRG_INFO_H
+
+#include "catalog/genbki.h"
+#include "catalog/pg_lrg_info_d.h"
+
+/* ----------------
+ *		pg_lrg_info definition.  cpp turns this into
+ *		typedef struct FormData_pg_lrg_info
+ * ----------------
+ */
+CATALOG(pg_lrg_info,8337,LrgInfoRelationId)
+{
+	Oid			oid;			/* oid */
+
+	NameData	groupname;		/* name of the logical replication group */
+	bool		puballtables;
+} FormData_pg_lrg_info;
+
+/* ----------------
+ *		Form_pg_lrg_info corresponds to a pointer to a tuple with
+ *		the format of pg_lrg_info relation.
+ * ----------------
+ */
+typedef FormData_pg_lrg_info *Form_pg_lrg_info;
+
+DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_info_oid_index, 8338, LrgInfoRelationIndexId, on pg_lrg_info using btree(oid oid_ops));
+
+#endif							/* PG_LRG_INFO_H */
diff --git a/src/include/catalog/pg_lrg_nodes.h b/src/include/catalog/pg_lrg_nodes.h
new file mode 100644
index 0000000000..b4e4b290dc
--- /dev/null
+++ b/src/include/catalog/pg_lrg_nodes.h
@@ -0,0 +1,53 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_lrg_nodes.h
+ *	  definition of the "logical replication nodes" system
+ *	  catalog (pg_lrg_nodes)
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_lrg_nodes.h
+ *
+ * NOTES
+ *	  The Catalog.pm module reads this file and derives schema
+ *	  information.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LRG_NODES_H
+#define PG_LRG_NODES_H
+
+#include "catalog/genbki.h"
+#include "catalog/pg_lrg_nodes_d.h"
+
+/* ----------------
+ *		pg_lrg_nodes definition.  cpp turns this into
+ *		typedef struct FormData_pg_lrg_nodes
+ * ----------------
+ */
+CATALOG(pg_lrg_nodes,8339,LrgNodesRelationId)
+{
+	Oid			oid;			/* oid */
+
+	NameData	nodeid;		/* name of the logical replication group */
+	Oid			groupid BKI_LOOKUP(pg_lrg_info);
+	Oid 		dbid BKI_LOOKUP(pg_database);
+	int32		status;
+	NameData	nodename;
+	NameData	localconn;
+	NameData	upstreamconn BKI_FORCE_NULL;
+} FormData_pg_lrg_nodes;
+
+/* ----------------
+ *		Form_pg_lrg_nodes corresponds to a pointer to a tuple with
+ *		the format of pg_lrg_nodes relation.
+ * ----------------
+ */
+typedef FormData_pg_lrg_nodes *Form_pg_lrg_nodes;
+
+DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_nodes_oid_index, 8340, LrgNodesRelationIndexId, on pg_lrg_nodes using btree(oid oid_ops));
+DECLARE_UNIQUE_INDEX(pg_lrg_nodes_name_index, 8346, LrgNodeIdIndexId, on pg_lrg_nodes using btree(nodeid name_ops));
+
+#endif							/* PG_LRG_NODES_H */
diff --git a/src/include/catalog/pg_lrg_pub.h b/src/include/catalog/pg_lrg_pub.h
new file mode 100644
index 0000000000..d65dc51d4d
--- /dev/null
+++ b/src/include/catalog/pg_lrg_pub.h
@@ -0,0 +1,46 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_lrg_info.h
+ *	  definition of the "logical replication group publication" system
+ *	  catalog (pg_lrg_pub)
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_lrg_pub.h
+ *
+ * NOTES
+ *	  The Catalog.pm module reads this file and derives schema
+ *	  information.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LRG_PUB_H
+#define PG_LRG_PUB_H
+
+#include "catalog/genbki.h"
+#include "catalog/pg_lrg_pub_d.h"
+
+/* ----------------
+ *		pg_lrg_pub definition.  cpp turns this into
+ *		typedef struct FormData_pg_lrg_pub
+ * ----------------
+ */
+CATALOG(pg_lrg_pub,8341,LrgPublicationId)
+{
+	Oid			oid;
+	Oid 		groupid BKI_LOOKUP(pg_lrg_info);
+	Oid 		pubid BKI_LOOKUP(pg_publication);
+} FormData_pg_lrg_pub;
+
+/* ----------------
+ *		Form_pg_lrg_pub corresponds to a pointer to a tuple with
+ *		the format of pg_lrg_pub relation.
+ * ----------------
+ */
+typedef FormData_pg_lrg_pub *Form_pg_lrg_pub;
+
+DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_pub_oid_index, 8344, LrgPublicationOidIndexId, on pg_lrg_pub using btree(oid oid_ops));
+
+#endif							/* PG_LRG_PUB_H */
diff --git a/src/include/catalog/pg_lrg_sub.h b/src/include/catalog/pg_lrg_sub.h
new file mode 100644
index 0000000000..398c8e8971
--- /dev/null
+++ b/src/include/catalog/pg_lrg_sub.h
@@ -0,0 +1,46 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_lrg_sub.h
+ *	  definition of the "logical replication group subscription" system
+ *	  catalog (pg_lrg_sub)
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_lrg_sub.h
+ *
+ * NOTES
+ *	  The Catalog.pm module reads this file and derives schema
+ *	  information.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LRG_SUB_H
+#define PG_LRG_SUB_H
+
+#include "catalog/genbki.h"
+#include "catalog/pg_lrg_sub_d.h"
+
+/* ----------------
+ *		pg_lrg_sub definition.  cpp turns this into
+ *		typedef struct FormData_pg_lrg_sub
+ * ----------------
+ */
+CATALOG(pg_lrg_sub,8343,LrgSubscriptionId)
+{
+	Oid			oid;
+	Oid 		groupid BKI_LOOKUP(pg_lrg_info);;
+	Oid 		subid BKI_LOOKUP(pg_subscription);
+} FormData_pg_lrg_sub;
+
+/* ----------------
+ *		Form_pg_lrg_sub corresponds to a pointer to a tuple with
+ *		the format of pg_lrg_sub relation.
+ * ----------------
+ */
+typedef FormData_pg_lrg_sub *Form_pg_lrg_sub;
+
+DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_sub_oid_index, 8345, LrgSubscriptionOidIndexId, on pg_lrg_sub using btree(oid oid_ops));
+
+#endif							/* PG_LRG_SUB_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index babe16f00a..8350934a77 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11885,4 +11885,29 @@
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
 
+# lrg
+{ oid => '8143', descr => 'create logical replication group',
+  proname => 'lrg_create', proparallel => 'r',
+  prorettype => 'void', proargtypes => 'text text text text',
+  prosrc => 'lrg_create' },
+{ oid => '8144', descr => 'attach to logical replication group',
+  proname => 'lrg_node_attach', proparallel => 'r',
+  prorettype => 'void', proargtypes => 'text text text text',
+  prosrc => 'lrg_node_attach' },
+{ oid => '8145', descr => 'detach from logical replication group',
+  proname => 'lrg_node_detach', proparallel => 'r',
+  prorettype => 'void', proargtypes => 'text text text text',
+  prosrc => 'lrg_node_detach' },
+{ oid => '8146', descr => 'delete logical replication group',
+  proname => 'lrg_drop', proparallel => 'r',
+  prorettype => 'void', proargtypes => 'text text text text',
+  prosrc => 'lrg_drop' },
+{ oid => '8147', descr => 'insert a tuple to pg_lrg_sub',
+  proname => 'lrg_insert_into_sub', proparallel => 'r',
+  prorettype => 'void', proargtypes => 'text',
+  prosrc => 'lrg_insert_into_sub' },
+{ oid => '8148', descr => 'insert a tuple to pg_lrg_nodes',
+  proname => 'lrg_insert_into_nodes', proparallel => 'r',
+  prorettype => 'void', proargtypes => 'text int4 text text text',
+  prosrc => 'lrg_insert_into_nodes' },
 ]
diff --git a/src/include/replication/libpqlrg.h b/src/include/replication/libpqlrg.h
new file mode 100644
index 0000000000..650715d40d
--- /dev/null
+++ b/src/include/replication/libpqlrg.h
@@ -0,0 +1,63 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpqlrg.h
+ *		  Constructs a logical replication group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LIBPQLIG_H
+#define LIBPQLIG_H
+
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "replication/lrg.h"
+
+typedef void (*libpqlrg_connect_fn) (const char *connstring, PGconn **conn);
+typedef bool (*libpqlrg_check_group_fn) (PGconn *conn, const char *group_name);
+typedef void (*libpqlrg_copy_lrg_nodes_fn) (PGconn *remoteconn, PGconn *localconn);
+typedef void (*libpqlrg_insert_into_lrg_nodes_fn) (PGconn *remoteconn,
+												   const char *node_id, LRG_NODE_STATE status,
+												   const char *node_name, const char *local_connstring,
+												   const char *upstream_connstring);
+typedef void (*libpqlrg_create_subscription_fn) (const char *group_name, const char *publisher_connstring,
+											  const char *publisher_node_id, const char *subscriber_node_id,
+											  PGconn *subscriberconn, const char *options);
+typedef void (*libpqlrg_disconnect_fn) (PGconn *conn);
+
+typedef struct lrg_function_types
+{
+	libpqlrg_connect_fn libpqlrg_connect;
+	libpqlrg_check_group_fn libpqlrg_check_group;
+	libpqlrg_copy_lrg_nodes_fn libpqlrg_copy_lrg_nodes;
+	libpqlrg_insert_into_lrg_nodes_fn libpqlrg_insert_into_lrg_nodes;
+	libpqlrg_create_subscription_fn libpqlrg_create_subscription;
+	libpqlrg_disconnect_fn libpqlrg_disconnect;
+} lrg_function_types;
+
+extern PGDLLIMPORT lrg_function_types *LrgFunctionTypes;
+
+#define lrg_connect(connstring, conn) \
+	LrgFunctionTypes->libpqlrg_connect(connstring, conn)
+#define lrg_check_group(conn, group_name) \
+	LrgFunctionTypes->libpqlrg_check_group(conn, group_name)
+#define lrg_copy_lrg_nodes(remoteconn, localconn) \
+	LrgFunctionTypes->libpqlrg_copy_lrg_nodes(remoteconn, localconn)
+
+#define lrg_insert_into_lrg_nodes(remoteconn, \
+								  node_id, status, \
+								  node_name, local_connstring, \
+								  upstream_connstring) \
+	LrgFunctionTypes->libpqlrg_insert_into_lrg_nodes(remoteconn, \
+													 node_id, status, \
+													 node_name, local_connstring, \
+													 upstream_connstring)
+#define lrg_create_subscription(group_name, publisher_connstring, \
+								publisher_node_id, subscriber_node_id, \
+								subscriberconn, options) \
+	LrgFunctionTypes->libpqlrg_create_subscription(group_name, publisher_connstring, \
+												publisher_node_id, subscriber_node_id, \
+												subscriberconn, options)
+#define lrg_disconnect(conn) \
+	LrgFunctionTypes->libpqlrg_disconnect(conn)
+
+#endif /* LIBPQLIG_H */
\ No newline at end of file
diff --git a/src/include/replication/lrg.h b/src/include/replication/lrg.h
new file mode 100644
index 0000000000..874cfe6477
--- /dev/null
+++ b/src/include/replication/lrg.h
@@ -0,0 +1,67 @@
+/*-------------------------------------------------------------------------
+ *
+ * lrg.h
+ *		  Constructs a logical replication group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LRG_H
+#define LRG_H
+
+#include "postgres.h"
+
+#include "storage/latch.h"
+#include "storage/lock.h"
+#include "storage/lwlock.h"
+
+/*
+ * enumeration for represents its status
+ */
+typedef enum
+{
+	LRG_STATE_INIT = 0,
+	LRG_STATE_CREATE_PUBLICATION,
+	LRG_STATE_CREATE_SUBSCRIPTION,
+	LRG_STATE_READY,
+	LRG_STATE_TO_BE_DETACHED,
+} LRG_NODE_STATE;
+
+/*
+ * working space for each lrg per-db worker.
+ */
+typedef struct LrgPerdbWorker {
+	pid_t worker_pid;
+	Oid dbid;
+	Latch *worker_latch;
+} LrgPerdbWorker;
+
+/*
+ * controller for lrg per-db worker.
+ * This will be hold by launcher.
+ */
+typedef struct LrgPerdbCtxStruct {
+	LWLock lock;
+	pid_t launcher_pid;
+	Latch *launcher_latch;
+	LrgPerdbWorker workers[FLEXIBLE_ARRAY_MEMBER];
+} LrgPerdbCtxStruct;
+
+extern LrgPerdbCtxStruct *LrgPerdbCtx;
+
+/* lrg.c */
+extern void LrgLauncherShmemInit(void);
+extern void LrgLauncherRegister(void);
+extern void lrg_add_nodes(char *node_id, Oid group_id, LRG_NODE_STATE status, char *node_name, char *local_connstring, char *upstream_connstring);
+extern Oid get_group_oid(void);
+extern void construct_node_id(char *out_node_id, int size);
+
+
+/* lrg_launcher.c */
+extern void lrg_launcher_main(Datum arg) pg_attribute_noreturn();
+extern void lrg_launcher_wakeup(void);
+
+/* *lrg_worker.c */
+extern void lrg_worker_main(Datum arg) pg_attribute_noreturn();
+extern void lrg_worker_cleanup(LrgPerdbWorker *worker);
+
+#endif /* LRG_H */
\ No newline at end of file
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be..da0ef150a2 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -266,3 +266,9 @@ NOTICE:  checking pg_subscription {subdbid} => pg_database {oid}
 NOTICE:  checking pg_subscription {subowner} => pg_authid {oid}
 NOTICE:  checking pg_subscription_rel {srsubid} => pg_subscription {oid}
 NOTICE:  checking pg_subscription_rel {srrelid} => pg_class {oid}
+NOTICE:  checking pg_lrg_nodes {groupid} => pg_lrg_info {oid}
+NOTICE:  checking pg_lrg_nodes {dbid} => pg_database {oid}
+NOTICE:  checking pg_lrg_pub {groupid} => pg_lrg_info {oid}
+NOTICE:  checking pg_lrg_pub {pubid} => pg_publication {oid}
+NOTICE:  checking pg_lrg_sub {groupid} => pg_lrg_info {oid}
+NOTICE:  checking pg_lrg_sub {subid} => pg_subscription {oid}
-- 
2.27.0

