This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git


The following commit(s) were added to refs/heads/main by this push:
     new 120e7bb  Add support for customizing the max number of records per 
record batch (#27)
120e7bb is described below

commit 120e7bbd3fd580c892c988499d488c7e8b34efe2
Author: Sutou Kouhei <[email protected]>
AuthorDate: Thu Mar 9 10:36:32 2023 +0900

    Add support for customizing the max number of records per record batch (#27)
    
    Closes GH-26
---
 src/afs.cc | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 65 insertions(+), 13 deletions(-)

diff --git a/src/afs.cc b/src/afs.cc
index 79597a2..bb8b5fc 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -84,6 +84,9 @@ static char* URI;
 static const int SessionTimeoutDefault = 300;
 static int SessionTimeout;
 
+static const int MaxNRowsPerRecordBatchDefault = 1 * 1024 * 1024;
+static int MaxNRowsPerRecordBatch;
+
 static volatile sig_atomic_t GotSIGTERM = false;
 void afs_sigterm(SIGNAL_ARGS)
 {
@@ -93,12 +96,21 @@ void afs_sigterm(SIGNAL_ARGS)
        errno = errnoSaved;
 }
 
+static volatile sig_atomic_t GotSIGHUP = false;
+void afs_sighup(SIGNAL_ARGS)
+{
+       auto errnoSaved = errno;
+       GotSIGHUP = true;
+       SetLatch(MyLatch);
+       errno = errnoSaved;
+}
+
 static volatile sig_atomic_t GotSIGUSR1 = false;
 void afs_sigusr1(SIGNAL_ARGS)
 {
        procsignal_sigusr1_handler(postgres_signal_arg);
-       GotSIGUSR1 = true;
        auto errnoSaved = errno;
+       GotSIGUSR1 = true;
        SetLatch(MyLatch);
        errno = errnoSaved;
 }
@@ -689,13 +701,7 @@ class Executor : public WorkerProcessor {
                // Write schema only stream format data to return only schema.
                ARROW_ASSIGN_OR_RAISE(auto writer,
                                      arrow::ipc::MakeStreamWriter(&output, 
schema, option));
-               // Build 1 null row to write schema.
-               for (uint64_t iAttribute = 0; iAttribute < 
SPI_tuptable->tupdesc->natts;
-                    ++iAttribute)
-               {
-                       auto arrayBuilder = builder->GetField(iAttribute);
-                       ARROW_RETURN_NOT_OK(arrayBuilder->AppendNull());
-               }
+               // Build an empty record batch to write schema.
                ARROW_ASSIGN_OR_RAISE(auto recordBatch, builder->Flush());
                P("%s: %s: write: schema: WriteRecordBatch", Tag, tag_);
                ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
@@ -705,6 +711,7 @@ class Executor : public WorkerProcessor {
                // Write another stream format data with record batches.
                ARROW_ASSIGN_OR_RAISE(writer,
                                      arrow::ipc::MakeStreamWriter(&output, 
schema, option));
+               bool needLastFlush = false;
                for (uint64_t iTuple = 0; iTuple < SPI_processed; ++iTuple)
                {
                        P("%s: %s: write: data: record batch: %d/%d",
@@ -739,10 +746,21 @@ class Executor : public WorkerProcessor {
                                        
ARROW_RETURN_NOT_OK(arrayBuilder->Append(DatumGetInt32(datum)));
                                }
                        }
+
+                       if (((iTuple + 1) % MaxNRowsPerRecordBatch) == 0) {
+                               ARROW_ASSIGN_OR_RAISE(recordBatch, 
builder->Flush());
+                               P("%s: %s: write: data: WriteRecordBatch: 
%d/%d", Tag, tag_, iTuple, SPI_processed);
+                               
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
+                               needLastFlush = false;
+                       } else {
+                               needLastFlush = true;
+                       }
+               }
+               if (needLastFlush) {
+                       ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
+                       P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
+                       
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
                }
-               ARROW_ASSIGN_OR_RAISE(recordBatch, builder->Flush());
-               P("%s: %s: write: data: WriteRecordBatch", Tag, tag_);
-               ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*recordBatch));
                P("%s: %s: write: data: Close", Tag, tag_);
                ARROW_RETURN_NOT_OK(writer->Close());
                return output.Close();
@@ -1002,6 +1020,12 @@ afs_server_internal(Proxy* proxy)
                WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, 
PG_WAIT_EXTENSION);
                ResetLatch(MyLatch);
 
+               if (GotSIGHUP)
+               {
+                       GotSIGHUP = false;
+                       ProcessConfigFile(PGC_SIGHUP);
+               }
+
                if (GotSIGUSR1)
                {
                        GotSIGUSR1 = false;
@@ -1022,6 +1046,7 @@ extern "C" void
 afs_executor(Datum arg)
 {
        pqsignal(SIGTERM, afs_sigterm);
+       pqsignal(SIGHUP, afs_sighup);
        pqsignal(SIGUSR1, afs_sigusr1);
        BackgroundWorkerUnblockSignals();
 
@@ -1045,6 +1070,12 @@ afs_executor(Datum arg)
 
                        ResetLatch(MyLatch);
 
+                       if (GotSIGHUP)
+                       {
+                               GotSIGHUP = false;
+                               ProcessConfigFile(PGC_SIGHUP);
+                       }
+
                        if (GotSIGUSR1)
                        {
                                GotSIGUSR1 = false;
@@ -1064,6 +1095,7 @@ extern "C" void
 afs_server(Datum arg)
 {
        pqsignal(SIGTERM, afs_sigterm);
+       pqsignal(SIGHUP, afs_sighup);
        pqsignal(SIGUSR1, afs_sigusr1);
        BackgroundWorkerUnblockSignals();
 
@@ -1088,6 +1120,7 @@ extern "C" void
 afs_main(Datum arg)
 {
        pqsignal(SIGTERM, afs_sigterm);
+       pqsignal(SIGHUP, afs_sighup);
        pqsignal(SIGUSR1, afs_sigusr1);
        BackgroundWorkerUnblockSignals();
 
@@ -1099,6 +1132,12 @@ afs_main(Datum arg)
                        WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 
-1, PG_WAIT_EXTENSION);
                        ResetLatch(MyLatch);
 
+                       if (GotSIGHUP)
+                       {
+                               GotSIGHUP = false;
+                               ProcessConfigFile(PGC_SIGHUP);
+                       }
+
                        if (GotSIGUSR1)
                        {
                                GotSIGUSR1 = false;
@@ -1126,7 +1165,7 @@ _PG_init(void)
                                   (std::string("default: ") + 
URIDefault).c_str(),
                                   &URI,
                                   URIDefault,
-                                  PGC_USERSET,
+                                  PGC_POSTMASTER,
                                   0,
                                   NULL,
                                   NULL,
@@ -1140,12 +1179,25 @@ _PG_init(void)
                                SessionTimeoutDefault,
                                -1,
                                INT_MAX,
-                               PGC_SIGHUP,
+                               PGC_USERSET,
                                GUC_UNIT_S,
                                NULL,
                                NULL,
                                NULL);
 
+       DefineCustomIntVariable("arrow_flight_sql.max_n_rows_per_record_batch",
+                               "The maximum number of rows per record batch.",
+                               "The default is 1 * 1024 * 1024 rows.",
+                               &MaxNRowsPerRecordBatch,
+                               MaxNRowsPerRecordBatchDefault,
+                               1,
+                               INT_MAX,
+                               PGC_USERSET,
+                               0,
+                               NULL,
+                               NULL,
+                               NULL);
+
        PreviousShmemRequestHook = shmem_request_hook;
        shmem_request_hook = afs_shmem_request_hook;
 

Reply via email to