Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-10 Thread Nathan Bossart
On Tue, Oct 11, 2022 at 11:52:03AM +0900, Michael Paquier wrote:
> Okay.  So, I have reviewed the whole thing, added a description of all
> the fields of BeginCopyTo() in its top comment, tweaked a few things
> and added in the module an extra NOTICE with the number of processed
> rows.  The result seemed fine, so applied.

Thanks!

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-10 Thread Michael Paquier
On Mon, Oct 10, 2022 at 05:06:39PM -0700, Nathan Bossart wrote:
> Yeah, I named it that way because I figured we might want a test for the
> COPY FROM callback someday.

Okay.  So, I have reviewed the whole thing, added a description of all
the fields of BeginCopyTo() in its top comment, tweaked a few things
and added in the module an extra NOTICE with the number of processed
rows.  The result seemed fine, so applied.
--
Michael


signature.asc
Description: PGP signature


Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-10 Thread Michael Paquier
On Wed, Sep 30, 2020 at 01:48:12PM +0500, Andrey V. Lepikhov wrote:
> Your code almost exactly the same as proposed in [1] as part of 'Fast COPY
> FROM' command. But it seems there are differences.
> 
> [1] 
> https://www.postgresql.org/message-id/flat/3d0909dc-3691-a576-208a-90986e55489f%40postgrespro.ru

I have been looking at what you have here while reviewing the contents
of this thread, and it seems to me that you should basically be able
to achieve the row-level control that your patch is doing with the
callback to do the per-row processing posted here.  The main
difference, though, is that you want to have more control at the
beginning and the end of the COPY TO processing which explains the
split of DoCopyTo().  I am a bit surprised to see this much footprint
in the backend code once there are two FDW callbacks to control the
beginning and the end of the COPY TO, to be honest, sacrifying a lot
the existing symmetry between the COPY TO and COPY FROM code paths
where there is currently a strict control on the pre-row and post-row
processing like the per-row memory context.
--
Michael


signature.asc
Description: PGP signature


Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-10 Thread Nathan Bossart
On Tue, Oct 11, 2022 at 09:01:41AM +0900, Michael Paquier wrote:
> I like these toy modules, they provide test coverage while acting as a
> template for new developers.  I am wondering whether it should have
> something for the copy from callback, actually, as it is named
> "test_copy_callbacks" but I see no need to extend the module more than
> necessary in the context of this thread (logical decoding uses it,
> anyway).

Yeah, I named it that way because I figured we might want a test for the
COPY FROM callback someday.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-10 Thread Michael Paquier
On Mon, Oct 10, 2022 at 09:38:59AM -0700, Nathan Bossart wrote:
> This new callback allows the use of COPY TO's machinery in extensions.  A
> couple of generic use-cases are listed upthread [0], and one concrete
> use-case is the aws_s3 extension [1].

FWIW, I understand that the proposal is to have an easier control of
how, what and where to the data is processed.  COPY TO PROGRAM
provides that with exactly the same kind of interface (data input, its
length) once you have a program able to process the data piped out the
same way.  However, it is in the shape of an external process that
receives the data through a pipe hence it provides a much wider attack
surface which is something that all cloud provider care about.  The
thing is that this allows extension developers to avoid arbitrary
commands on the backend as the OS user running the Postgres instance,
while still being able to process the data the way they want
(auditing, analytics, whatever) within the strict context of the
process running an extension code.  I'd say that this is a very cheap
change to allow people to have more fun with the backend engine
(similar to the recent changes with archive libraries for
archive_command, but much less complex):
 src/backend/commands/copy.c   |  2 +-
 src/backend/commands/copyto.c | 18 +++---
 2 files changed, 16 insertions(+), 4 deletions(-)

(Not to mention that we've had our share of CVEs regarding COPY
PROGRAM even if it is superuser-only).

> I really doubt that this small test case is going to cause anything
> approaching undue maintenance burden.  I think it's important to ensure
> this functionality continues to work as expected long into the future.

I like these toy modules, they provide test coverage while acting as a
template for new developers.  I am wondering whether it should have
something for the copy from callback, actually, as it is named
"test_copy_callbacks" but I see no need to extend the module more than
necessary in the context of this thread (logical decoding uses it,
anyway).
--
Michael


signature.asc
Description: PGP signature


Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-10 Thread Nathan Bossart
On Mon, Oct 10, 2022 at 12:41:40PM +0530, Bharath Rupireddy wrote:
> IIUC, COPY TO callback helps move a table's data out of postgres
> server. Just wondering, how is it different from existing solutions
> like COPY TO ... PROGRAM/FILE, logical replication, pg_dump etc. that
> can move a table's data out? I understandb that the COPY FROM callback
> was needed for logical replication 7c4f52409. Mentioning a concrete
> use-case helps here.

This new callback allows the use of COPY TO's machinery in extensions.  A
couple of generic use-cases are listed upthread [0], and one concrete
use-case is the aws_s3 extension [1].

> I'm not quite sure if we need a separate module to just tell how to
> use this new callback. I strongly feel that it's not necessary. It
> unnecessarily creates extra code (actual code is 25 LOC with v1 patch
> but 150 LOC with v5 patch) and can cause maintenance burden. These
> callback APIs are simple enough to understand for those who know
> BeginCopyTo() or BeginCopyFrom() and especially for those who know how
> to write extensions. These are not APIs that an end-user uses. The
> best would be to document both COPY FROM and COPY TO callbacks,
> perhaps with a pseudo code specifying just the essence [1], and their
> possible usages somewhere here
> https://www.postgresql.org/docs/devel/sql-copy.html.
> 
> The order of below NOTICE messages isn't guaranteed and it can change
> depending on platforms. Previously, we've had to suppress such
> messages in the test output 6adc5376d.

I really doubt that this small test case is going to cause anything
approaching undue maintenance burden.  I think it's important to ensure
this functionality continues to work as expected long into the future.

[0] https://postgr.es/m/253C21D1-FCEB-41D9-A2AF-E6517015B7D7%40amazon.com
[1] 
https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/postgresql-s3-export.html#aws_s3.export_query_to_s3

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-10 Thread Bharath Rupireddy
On Sun, Oct 9, 2022 at 2:44 AM Nathan Bossart  wrote:
>
> Sorry for the noise.  There was an extra #include in v4 that I've removed
> in v5.

IIUC, COPY TO callback helps move a table's data out of postgres
server. Just wondering, how is it different from existing solutions
like COPY TO ... PROGRAM/FILE, logical replication, pg_dump etc. that
can move a table's data out? I understandb that the COPY FROM callback
was needed for logical replication 7c4f52409. Mentioning a concrete
use-case helps here.

I'm not quite sure if we need a separate module to just tell how to
use this new callback. I strongly feel that it's not necessary. It
unnecessarily creates extra code (actual code is 25 LOC with v1 patch
but 150 LOC with v5 patch) and can cause maintenance burden. These
callback APIs are simple enough to understand for those who know
BeginCopyTo() or BeginCopyFrom() and especially for those who know how
to write extensions. These are not APIs that an end-user uses. The
best would be to document both COPY FROM and COPY TO callbacks,
perhaps with a pseudo code specifying just the essence [1], and their
possible usages somewhere here
https://www.postgresql.org/docs/devel/sql-copy.html.

The order of below NOTICE messages isn't guaranteed and it can change
depending on platforms. Previously, we've had to suppress such
messages in the test output 6adc5376d.

+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
+NOTICE:  COPY TO callback called with data "123" and length 5
+NOTICE:  COPY TO callback called with data "123456" and length 8
+NOTICE:  COPY TO callback called with data "123456789" and length 11
+ test_copy_to_callback

[1]
+Relationrel = table_open(PG_GETARG_OID(0), AccessShareLock);
+CopyToState cstate;
+
+cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
+ to_cb, NIL, NIL);
+(void) DoCopyTo(cstate);
+EndCopyTo(cstate);
+
+table_close(rel, AccessShareLock);

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com




Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-08 Thread Nathan Bossart
On Sat, Oct 08, 2022 at 10:37:41AM -0700, Nathan Bossart wrote:
> Yeah, that makes more sense.  It actually simplifies things a bit, too.

Sorry for the noise.  There was an extra #include in v4 that I've removed
in v5.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 6e80d41135b8b21f9b06e09a7e85069acc8e57a8 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v5 1/1] Support COPY TO callback functions.

---
 src/backend/commands/copy.c   |  2 +-
 src/backend/commands/copyto.c | 18 ++--
 src/include/commands/copy.h   |  3 +-
 src/test/modules/Makefile |  1 +
 src/test/modules/meson.build  |  1 +
 .../modules/test_copy_callbacks/.gitignore|  4 ++
 src/test/modules/test_copy_callbacks/Makefile | 23 ++
 .../expected/test_copy_callbacks.out  | 12 +
 .../modules/test_copy_callbacks/meson.build   | 34 ++
 .../sql/test_copy_callbacks.sql   |  4 ++
 .../test_copy_callbacks--1.0.sql  |  8 
 .../test_copy_callbacks/test_copy_callbacks.c | 46 +++
 .../test_copy_callbacks.control   |  4 ++
 src/tools/pgindent/typedefs.list  |  1 +
 14 files changed, 156 insertions(+), 5 deletions(-)
 create mode 100644 src/test/modules/test_copy_callbacks/.gitignore
 create mode 100644 src/test/modules/test_copy_callbacks/Makefile
 create mode 100644 src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
 create mode 100644 src/test/modules/test_copy_callbacks/meson.build
 create mode 100644 src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.c
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..db4c9dbc23 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 
 		cstate = BeginCopyTo(pstate, rel, query, relid,
 			 stmt->filename, stmt->is_program,
-			 stmt->attlist, stmt->options);
+			 NULL, stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
 	}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
 {
 	COPY_FILE,	/* to file (or a piped program) */
 	COPY_FRONTEND,/* to frontend */
+	COPY_CALLBACK,/* to callback function */
 } CopyDest;
 
 /*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
+	copy_data_dest_cb data_dest_cb; /* function for writing data */
 
 	CopyFormatOptions opts;
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
+		case COPY_CALLBACK:
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+			break;
 	}
 
 	/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
 	CopyToState cstate;
-	bool		pipe = (filename == NULL);
+	bool		pipe = (filename == NULL && data_dest_cb == NULL);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
 
 	cstate->copy_dest = COPY_FILE;	/* default */
 
-	if (pipe)
+	if (data_dest_cb)
+	{
+		progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
+	else if (pipe)
 	{
 		progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
 
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
 uint64
 DoCopyTo(CopyToState cstate)
 {
-	bool		pipe = (cstate->filename == NULL);
+	bool		pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 3f6677b132..b77b935005 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
 typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typede

Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-08 Thread Nathan Bossart
On Sat, Oct 08, 2022 at 02:11:38PM +0900, Michael Paquier wrote:
> Using an ereport(NOTICE) to show the data reported in the callback is
> fine by me.  How about making the module a bit more modular, by
> passing as argument a regclass and building a list of arguments with
> it?  You may want to hold the ShareAccessLock on the relation until
> the end of the transaction in this example.

Yeah, that makes more sense.  It actually simplifies things a bit, too.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 0b45607988e28b405c7795de0c7f82f51d4662e5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v4 1/1] Support COPY TO callback functions.

---
 src/backend/commands/copy.c   |  2 +-
 src/backend/commands/copyto.c | 18 +--
 src/include/commands/copy.h   |  3 +-
 src/test/modules/Makefile |  1 +
 src/test/modules/meson.build  |  1 +
 .../modules/test_copy_callbacks/.gitignore|  4 ++
 src/test/modules/test_copy_callbacks/Makefile | 23 +
 .../expected/test_copy_callbacks.out  | 12 +
 .../modules/test_copy_callbacks/meson.build   | 34 ++
 .../sql/test_copy_callbacks.sql   |  4 ++
 .../test_copy_callbacks--1.0.sql  |  8 
 .../test_copy_callbacks/test_copy_callbacks.c | 47 +++
 .../test_copy_callbacks.control   |  4 ++
 src/tools/pgindent/typedefs.list  |  1 +
 14 files changed, 157 insertions(+), 5 deletions(-)
 create mode 100644 src/test/modules/test_copy_callbacks/.gitignore
 create mode 100644 src/test/modules/test_copy_callbacks/Makefile
 create mode 100644 src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
 create mode 100644 src/test/modules/test_copy_callbacks/meson.build
 create mode 100644 src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.c
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..db4c9dbc23 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 
 		cstate = BeginCopyTo(pstate, rel, query, relid,
 			 stmt->filename, stmt->is_program,
-			 stmt->attlist, stmt->options);
+			 NULL, stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
 	}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
 {
 	COPY_FILE,	/* to file (or a piped program) */
 	COPY_FRONTEND,/* to frontend */
+	COPY_CALLBACK,/* to callback function */
 } CopyDest;
 
 /*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
+	copy_data_dest_cb data_dest_cb; /* function for writing data */
 
 	CopyFormatOptions opts;
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
+		case COPY_CALLBACK:
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+			break;
 	}
 
 	/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
 	CopyToState cstate;
-	bool		pipe = (filename == NULL);
+	bool		pipe = (filename == NULL && data_dest_cb == NULL);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
 
 	cstate->copy_dest = COPY_FILE;	/* default */
 
-	if (pipe)
+	if (data_dest_cb)
+	{
+		progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
+	else if (pipe)
 	{
 		progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
 
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
 uint64
 DoCopyTo(CopyToState cstate)
 {
-	bool		pipe = (cstate->filename == NULL);
+	bool		pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 3f6677b132..b77b935005 100644
--- a/src/include/commands/

Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-07 Thread Michael Paquier
On Fri, Oct 07, 2022 at 02:48:24PM -0700, Nathan Bossart wrote:
> Here is an attempt at adding such a test module.

Using an ereport(NOTICE) to show the data reported in the callback is
fine by me.  How about making the module a bit more modular, by
passing as argument a regclass and building a list of arguments with
it?  You may want to hold the ShareAccessLock on the relation until
the end of the transaction in this example.
--
Michael


signature.asc
Description: PGP signature


Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-07 Thread Nathan Bossart
On Fri, Oct 07, 2022 at 03:49:31PM +0900, Michael Paquier wrote:
> Perhaps there should be a module in src/test/modules/ to provide a
> short, still useful, example of what this could achieve?

Here is an attempt at adding such a test module.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 4e60fce09b031258ace825bd540847ebffd7959d Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v3 1/1] Support COPY TO callback functions.

---
 src/backend/commands/copy.c   |  2 +-
 src/backend/commands/copyto.c | 18 +--
 src/include/commands/copy.h   |  3 +-
 src/test/modules/Makefile |  1 +
 src/test/modules/meson.build  |  1 +
 .../modules/test_copy_callbacks/.gitignore|  4 ++
 src/test/modules/test_copy_callbacks/Makefile | 23 +
 .../expected/test_copy_callbacks.out  | 12 +
 .../modules/test_copy_callbacks/meson.build   | 34 +
 .../sql/test_copy_callbacks.sql   |  4 ++
 .../test_copy_callbacks--1.0.sql  |  8 +++
 .../test_copy_callbacks/test_copy_callbacks.c | 50 +++
 .../test_copy_callbacks.control   |  4 ++
 src/tools/pgindent/typedefs.list  |  1 +
 14 files changed, 160 insertions(+), 5 deletions(-)
 create mode 100644 src/test/modules/test_copy_callbacks/.gitignore
 create mode 100644 src/test/modules/test_copy_callbacks/Makefile
 create mode 100644 src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
 create mode 100644 src/test/modules/test_copy_callbacks/meson.build
 create mode 100644 src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.c
 create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..db4c9dbc23 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 
 		cstate = BeginCopyTo(pstate, rel, query, relid,
 			 stmt->filename, stmt->is_program,
-			 stmt->attlist, stmt->options);
+			 NULL, stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
 	}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
 {
 	COPY_FILE,	/* to file (or a piped program) */
 	COPY_FRONTEND,/* to frontend */
+	COPY_CALLBACK,/* to callback function */
 } CopyDest;
 
 /*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
+	copy_data_dest_cb data_dest_cb; /* function for writing data */
 
 	CopyFormatOptions opts;
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
+		case COPY_CALLBACK:
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+			break;
 	}
 
 	/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
 	CopyToState cstate;
-	bool		pipe = (filename == NULL);
+	bool		pipe = (filename == NULL && data_dest_cb == NULL);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
 
 	cstate->copy_dest = COPY_FILE;	/* default */
 
-	if (pipe)
+	if (data_dest_cb)
+	{
+		progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
+	else if (pipe)
 	{
 		progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
 
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
 uint64
 DoCopyTo(CopyToState cstate)
 {
-	bool		pipe = (cstate->filename == NULL);
+	bool		pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 3f6677b132..b77b935005 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
 typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, in

Re: Adding Support for Copy callback functionality on COPY TO api

2022-10-06 Thread Michael Paquier
On Tue, Aug 02, 2022 at 04:49:19PM -0700, Nathan Bossart wrote:
> I've rebased this patch and will register it in the next commitfest
> shortly.

Perhaps there should be a module in src/test/modules/ to provide a
short, still useful, example of what this could achieve?
--
Michael


signature.asc
Description: PGP signature


Re: Adding Support for Copy callback functionality on COPY TO api

2022-08-02 Thread Nathan Bossart
On Wed, Sep 30, 2020 at 04:41:51PM +0900, Michael Paquier wrote:
> This feedback has not been answered after two weeks, so I have marked
> the patch as returned with feedback.

I've rebased this patch and will register it in the next commitfest
shortly.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From a1c0704a094f849a587c7332a547c89581f4f7e6 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v2 1/1] Support COPY TO callback functions.

---
 src/backend/commands/copy.c  |  2 +-
 src/backend/commands/copyto.c| 18 +++---
 src/include/commands/copy.h  |  3 ++-
 src/tools/pgindent/typedefs.list |  1 +
 4 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3ac731803b..f714c5e22e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -304,7 +304,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 
 		cstate = BeginCopyTo(pstate, rel, query, relid,
 			 stmt->filename, stmt->is_program,
-			 stmt->attlist, stmt->options);
+			 NULL, stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
 	}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
 {
 	COPY_FILE,	/* to file (or a piped program) */
 	COPY_FRONTEND,/* to frontend */
+	COPY_CALLBACK,/* to callback function */
 } CopyDest;
 
 /*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
+	copy_data_dest_cb data_dest_cb; /* function for writing data */
 
 	CopyFormatOptions opts;
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
+		case COPY_CALLBACK:
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+			break;
 	}
 
 	/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
 	CopyToState cstate;
-	bool		pipe = (filename == NULL);
+	bool		pipe = (filename == NULL && data_dest_cb == NULL);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
 
 	cstate->copy_dest = COPY_FILE;	/* default */
 
-	if (pipe)
+	if (data_dest_cb)
+	{
+		progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
+	else if (pipe)
 	{
 		progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
 
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
 uint64
 DoCopyTo(CopyToState cstate)
 {
-	bool		pipe = (cstate->filename == NULL);
+	bool		pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index cb0096aeb6..76b9d5c23f 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
 typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *data, int len);
 
 extern void DoCopy(ParseState *state, const CopyStmt *stmt,
    int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
  */
 extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 			   Oid queryRelId, const char *filename, bool is_program,
-			   List *attnamelist, List *options);
+			   copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
 extern void EndCopyTo(CopyToState cstate);
 extern uint64 DoCopyTo(CopyToState cstate);
 extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 35c9f1efce..83e5270034 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3189,6 +3189,7 @@ compare_context
 config_var_value
 contain_aggs_of_level_context
 convert_testexpr_context
+copy_data_dest_cb
 copy_data_source_cb
 core_YYSTYPE
 core_yy_extra_type
-- 
2.25.1



Re: Adding Support for Copy callback functionality on COPY TO api

2020-09-30 Thread Andrey V. Lepikhov

On 7/2/20 2:41 AM, Sanaba, Bilva wrote:

Hi hackers,

Currently, the COPY TO api does not support callback functions, while 
the COPY FROM api does. The COPY TO code does, however, include 
placeholders for supporting callbacks in the future.


Rounding out the support of callback functions to both could be very 
beneficial for extension development. In particular, supporting 
callbacks for COPY TO will allow developers to utilize the preexisting 
command in order to create tools that give users more support for moving 
data for storage, backup, analytics, etc.


We are aiming to get the support in core PostgreSQL and add COPY TO 
callback support in the next commitfest.The attached patch contains a 
change to COPY TO api to support callbacks.


Your code almost exactly the same as proposed in [1] as part of 'Fast 
COPY FROM' command. But it seems there are differences.


[1] 
https://www.postgresql.org/message-id/flat/3d0909dc-3691-a576-208a-90986e55489f%40postgrespro.ru


--
regards,
Andrey Lepikhov
Postgres Professional




Re: Adding Support for Copy callback functionality on COPY TO api

2020-09-30 Thread Michael Paquier
On Mon, Sep 14, 2020 at 04:28:12PM -0700, Soumyadeep Chakraborty wrote:
> I think a similar change should also be applied to BeginCopyFrom() and
> CopyFrom(). Or even better, such code could be refactored to have a
> separate destination type COPY_PIPE. This of course, will be a separate
> patch. I think the above line is okay for this patch.

This feedback has not been answered after two weeks, so I have marked
the patch as returned with feedback.
--
Michael


signature.asc
Description: PGP signature


Re: Adding Support for Copy callback functionality on COPY TO api

2020-09-14 Thread Soumyadeep Chakraborty
Hi Bilva,

Thank you for registering this patch!

I had a few suggestions:

1. Please run pg_indent[1] on your code. Make sure you add
copy_data_destination_cb to src/tools/pgindent/typedefs.list. Please
run pg_indent on only the files you changed (it will take files as
command line args)

2. For features such as this, it is often helpful to find a use case
within backend/utility/extension code that demonstrate thes callback and
to include the code to exercise it with the patch. Refer how
copy_read_data() is used as copy_data_source_cb, to copy the data from
the query results from the WAL receiver (Refer: copy_table()). Finding
a similar use case in the source tree will make a stronger case
for this patch.

3. Wouldn't we want to return the number of bytes written from
copy_data_destination_cb? (Similar to copy_data_source_cb) We should
also think about how to represent failure. Looking at CopySendEndOfRow(),
we should error out like we do for the other copy_dests after checking the
return value for the callback invocation.

4.
> bool pipe = (cstate->filename == NULL) && (cstate->data_destination_cb == 
> NULL);

I think a similar change should also be applied to BeginCopyFrom() and
CopyFrom(). Or even better, such code could be refactored to have a
separate destination type COPY_PIPE. This of course, will be a separate
patch. I think the above line is okay for this patch.

Regards,
Soumyadeep