This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git
The following commit(s) were added to refs/heads/main by this push:
new 120e7bb Add support for customizing the max number of records per
record batch (#27)
120e7bb is described below
commit 120e7bbd3fd580c892c988499d488c7e8b34efe2
Author: Sutou Kouhei <[email protected]>
AuthorDate: Thu Mar 9 10:36:32 2023 +0900
Add support for customizing the max number of records per record batch (#27)
Closes GH-26
---
src/afs.cc | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++-----------
1 file changed, 65 insertions(+), 13 deletions(-)
diff --git a/src/afs.cc b/src/afs.cc
index 79597a2..bb8b5fc 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -84,6 +84,9 @@ static char* URI;
static const int SessionTimeoutDefault = 300;
static int SessionTimeout;
+static const int MaxNRowsPerRecordBatchDefault = 1 * 1024 * 1024;
+static int MaxNRowsPerRecordBatch;
+
static volatile sig_atomic_t GotSIGTERM = false;
void afs_sigterm(SIGNAL_ARGS)
{
@@ -93,12 +96,21 @@ void afs_sigterm(SIGNAL_ARGS)
errno = errnoSaved;
}
+static volatile sig_atomic_t GotSIGHUP = false;
+void afs_sighup(SIGNAL_ARGS)
+{
+ auto errnoSaved = errno;
+ GotSIGHUP = true;
+ SetLatch(MyLatch);
+ errno = errnoSaved;
+}
+
static volatile sig_atomic_t GotSIGUSR1 = false;
void afs_sigusr1(SIGNAL_ARGS)
{
procsignal_sigusr1_handler(postgres_signal_arg);
- GotSIGUSR1 = true;
auto errnoSaved = errno;
+ GotSIGUSR1 = true;
SetLatch(MyLatch);
errno = errnoSaved;
}
@@ -689,13 +701,7 @@ class Executor : public WorkerProcessor {
// Write schema only stream format data to return only schema.
ARROW_ASSIGN_OR_RAISE(auto writer,
arrow::ipc::MakeStreamWriter(&output,
schema, option));
- // Build 1 null row to write schema.
- for (uint64_t iAttribute = 0; iAttribute <
SPI_tuptable->tupdesc->natts;
- ++iAttribute)
- {
- auto arrayBuilder = builder->GetField(iAttribute);
- ARROW_RETURN_NOT_OK(arrayBuilder->AppendNull());
- }
+ // Build an empty record batch to write schema.
ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
P("%s: %s: write: schema: WriteRecordBatch", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
@@ -705,6 +711,7 @@ class Executor : public WorkerProcessor {
// Write another stream format data with record batches.
ARROW_ASSIGN_OR_RAISE(writer,
arrow::ipc::MakeStreamWriter(&output,
schema, option));
+ bool needLastFlush = false;
for (uint64_t iTuple = 0; iTuple < SPI_processed; ++iTuple)
{
P("%s: %s: write: data: record batch: %d/%d",
@@ -739,10 +746,21 @@ class Executor : public WorkerProcessor {
ARROW_RETURN_NOT_OK(arrayBuilder->Append(DatumGetInt32(datum)));
}
}
+
+ if (((iTuple + 1) % MaxNRowsPerRecordBatch) == 0) {
+ ARROW_ASSIGN_OR_RAISE(recordBatch,
builder->Flush());
+ P("%s: %s: write: data: WriteRecordBatch:
%d/%d", Tag, tag_, iTuple, SPI_processed);
+
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+ needLastFlush = false;
+ } else {
+ needLastFlush = true;
+ }
+ }
+ if (needLastFlush) {
+ ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
+ P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
+
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
}
- ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
- P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
- ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
P("%s: %s: write: data: Close", Tag, tag_);
ARROW_RETURN_NOT_OK(writer->Close());
return output.Close();
@@ -1002,6 +1020,12 @@ afs_server_internal(Proxy* proxy)
WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
+ if (GotSIGHUP)
+ {
+ GotSIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
if (GotSIGUSR1)
{
GotSIGUSR1 = false;
@@ -1022,6 +1046,7 @@ extern "C" void
afs_executor(Datum arg)
{
pqsignal(SIGTERM, afs_sigterm);
+ pqsignal(SIGHUP, afs_sighup);
pqsignal(SIGUSR1, afs_sigusr1);
BackgroundWorkerUnblockSignals();
@@ -1045,6 +1070,12 @@ afs_executor(Datum arg)
ResetLatch(MyLatch);
+ if (GotSIGHUP)
+ {
+ GotSIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
if (GotSIGUSR1)
{
GotSIGUSR1 = false;
@@ -1064,6 +1095,7 @@ extern "C" void
afs_server(Datum arg)
{
pqsignal(SIGTERM, afs_sigterm);
+ pqsignal(SIGHUP, afs_sighup);
pqsignal(SIGUSR1, afs_sigusr1);
BackgroundWorkerUnblockSignals();
@@ -1088,6 +1120,7 @@ extern "C" void
afs_main(Datum arg)
{
pqsignal(SIGTERM, afs_sigterm);
+ pqsignal(SIGHUP, afs_sighup);
pqsignal(SIGUSR1, afs_sigusr1);
BackgroundWorkerUnblockSignals();
@@ -1099,6 +1132,12 @@ afs_main(Datum arg)
WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
-1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
+ if (GotSIGHUP)
+ {
+ GotSIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
if (GotSIGUSR1)
{
GotSIGUSR1 = false;
@@ -1126,7 +1165,7 @@ _PG_init(void)
(std::string("default: ") +
URIDefault).c_str(),
&URI,
URIDefault,
- PGC_USERSET,
+ PGC_POSTMASTER,
0,
NULL,
NULL,
@@ -1140,12 +1179,25 @@ _PG_init(void)
SessionTimeoutDefault,
-1,
INT_MAX,
- PGC_SIGHUP,
+ PGC_USERSET,
GUC_UNIT_S,
NULL,
NULL,
NULL);
+ DefineCustomIntVariable("arrow_flight_sql.max_n_rows_per_record_batch",
+ "The maximum number of rows per record batch.",
+ "The default is 1 * 1024 * 1024 rows.",
+ &MaxNRowsPerRecordBatch,
+ MaxNRowsPerRecordBatchDefault,
+ 1,
+ INT_MAX,
+ PGC_USERSET,
+ 0,
+ NULL,
+ NULL,
+ NULL);
+
PreviousShmemRequestHook = shmem_request_hook;
shmem_request_hook = afs_shmem_request_hook;