On Sat, Oct 08, 2022 at 10:37:41AM -0700, Nathan Bossart wrote:
> Yeah, that makes more sense.  It actually simplifies things a bit, too.

Sorry for the noise.  There was an extra #include in v4 that I've removed
in v5.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 6e80d41135b8b21f9b06e09a7e85069acc8e57a8 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v5 1/1] Support COPY TO callback functions.

---
 src/backend/commands/copy.c                   |  2 +-
 src/backend/commands/copyto.c                 | 18 ++++++--
 src/include/commands/copy.h                   |  3 +-
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 .../modules/test_copy_callbacks/.gitignore    |  4 ++
 src/test/modules/test_copy_callbacks/Makefile | 23 ++++++++++
 .../expected/test_copy_callbacks.out          | 12 +++++
 .../modules/test_copy_callbacks/meson.build   | 34 ++++++++++++++
 .../sql/test_copy_callbacks.sql               |  4 ++
 .../test_copy_callbacks--1.0.sql              |  8 ++++
 .../test_copy_callbacks/test_copy_callbacks.c | 46 +++++++++++++++++++
 .../test_copy_callbacks.control               |  4 ++
 src/tools/pgindent/typedefs.list              |  1 +
 14 files changed, 156 insertions(+), 5 deletions(-)
 create mode 100644 src/test/modules/test_copy_callbacks/.gitignore
 create mode 100644 src/test/modules/test_copy_callbacks/Makefile
 create mode 100644 src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
 create mode 100644 src/test/modules/test_copy_callbacks/meson.build
 create mode 100644 src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.c
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..db4c9dbc23 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 
 		cstate = BeginCopyTo(pstate, rel, query, relid,
 							 stmt->filename, stmt->is_program,
-							 stmt->attlist, stmt->options);
+							 NULL, stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
 	}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
 {
 	COPY_FILE,					/* to file (or a piped program) */
 	COPY_FRONTEND,				/* to frontend */
+	COPY_CALLBACK,				/* to callback function */
 } CopyDest;
 
 /*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
+	copy_data_dest_cb data_dest_cb; /* function for writing data */
 
 	CopyFormatOptions opts;
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
+		case COPY_CALLBACK:
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+			break;
 	}
 
 	/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
 	CopyToState cstate;
-	bool		pipe = (filename == NULL);
+	bool		pipe = (filename == NULL && data_dest_cb == NULL);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
 
 	cstate->copy_dest = COPY_FILE;	/* default */
 
-	if (pipe)
+	if (data_dest_cb)
+	{
+		progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
+	else if (pipe)
 	{
 		progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
 
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
 uint64
 DoCopyTo(CopyToState cstate)
 {
-	bool		pipe = (cstate->filename == NULL);
+	bool		pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 3f6677b132..b77b935005 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
 typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *data, int len);
 
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
 				   int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
  */
 extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
 							   Oid queryRelId, const char *filename, bool is_program,
-							   List *attnamelist, List *options);
+							   copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
 extern void EndCopyTo(CopyToState cstate);
 extern uint64 DoCopyTo(CopyToState cstate);
 extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6c31c8707c..7b3f292965 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
 		  snapshot_too_old \
 		  spgist_name_ops \
 		  test_bloomfilter \
+		  test_copy_callbacks \
 		  test_ddl_deparse \
 		  test_extensions \
 		  test_ginpostinglist \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index a80e6e2ce2..c2e5f5ffd5 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -9,6 +9,7 @@ subdir('snapshot_too_old')
 subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
+subdir('test_copy_callbacks')
 subdir('test_ddl_deparse')
 subdir('test_extensions')
 subdir('test_ginpostinglist')
diff --git a/src/test/modules/test_copy_callbacks/.gitignore b/src/test/modules/test_copy_callbacks/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_callbacks/Makefile b/src/test/modules/test_copy_callbacks/Makefile
new file mode 100644
index 0000000000..6b0a0efc37
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_callbacks/Makefile
+
+MODULE_big = test_copy_callbacks
+OBJS = \
+	$(WIN32RES) \
+	test_copy_callbacks.o
+PGFILEDESC = "test_copy_callbacks - example use of COPY callbacks"
+
+EXTENSION = test_copy_callbacks
+DATA = test_copy_callbacks--1.0.sql
+
+REGRESS = test_copy_callbacks
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_callbacks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
new file mode 100644
index 0000000000..3c4c504ef8
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
@@ -0,0 +1,12 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
+NOTICE:  COPY TO callback called with data "1	2	3" and length 5
+NOTICE:  COPY TO callback called with data "12	34	56" and length 8
+NOTICE:  COPY TO callback called with data "123	456	789" and length 11
+ test_copy_to_callback 
+-----------------------
+ 
+(1 row)
+
diff --git a/src/test/modules/test_copy_callbacks/meson.build b/src/test/modules/test_copy_callbacks/meson.build
new file mode 100644
index 0000000000..0f1ec47951
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/meson.build
@@ -0,0 +1,34 @@
+# FIXME: prevent install during main install, but not during test :/
+
+test_copy_callbacks_sources = files(
+  'test_copy_callbacks.c',
+)
+
+if host_system == 'windows'
+  test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_copy_callbacks',
+    '--FILEDESC', 'test_copy_callbacks - example use of COPY callbacks',])
+endif
+
+test_copy_callbacks = shared_module('test_copy_callbacks',
+  test_copy_callbacks_sources,
+  kwargs: pg_mod_args,
+)
+testprep_targets += test_copy_callbacks
+
+install_data(
+  'test_copy_callbacks.control',
+  'test_copy_callbacks--1.0.sql',
+  kwargs: contrib_data_args,
+)
+
+tests += {
+  'name': 'test_copy_callbacks',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_copy_callbacks',
+    ],
+  },
+}
diff --git a/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
new file mode 100644
index 0000000000..2deffba635
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
@@ -0,0 +1,4 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
new file mode 100644
index 0000000000..215cf3fad6
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit
+
+CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass)
+	RETURNS pg_catalog.void
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
new file mode 100644
index 0000000000..54de3fc5ab
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
@@ -0,0 +1,46 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_callbacks.c
+ *		Code for testing COPY callbacks.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/test/modules/test_copy_callbacks/test_copy_callbacks.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "commands/copy.h"
+#include "fmgr.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+static void
+to_cb(void *data, int len)
+{
+	ereport(NOTICE,
+			(errmsg("COPY TO callback called with data \"%s\" and length %d",
+					(char *) data, len)));
+}
+
+PG_FUNCTION_INFO_V1(test_copy_to_callback);
+Datum
+test_copy_to_callback(PG_FUNCTION_ARGS)
+{
+	Relation	rel = table_open(PG_GETARG_OID(0), AccessShareLock);
+	CopyToState cstate;
+
+	cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
+						 to_cb, NIL, NIL);
+	(void) DoCopyTo(cstate);
+	EndCopyTo(cstate);
+
+	table_close(rel, AccessShareLock);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.control b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
new file mode 100644
index 0000000000..b7ce3f12ff
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
@@ -0,0 +1,4 @@
+comment = 'Test code for COPY callbacks'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_callbacks'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 97c9bc1861..d9b839c979 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3177,6 +3177,7 @@ compare_context
 config_var_value
 contain_aggs_of_level_context
 convert_testexpr_context
+copy_data_dest_cb
 copy_data_source_cb
 core_YYSTYPE
 core_yy_extra_type
-- 
2.25.1

Reply via email to