This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git


The following commit(s) were added to refs/heads/main by this push:
     new 14df9b5  Don't use PostgreSQL API in gRPC threads on server process 
(#167)
14df9b5 is described below

commit 14df9b5fe61eda2d71bdfbf67c61a227741f616c
Author: Sutou Kouhei <[email protected]>
AuthorDate: Mon Nov 13 18:15:02 2023 +0900

    Don't use PostgreSQL API in gRPC threads on server process (#167)
    
    Closes GH-160
    
    We must use the main thread to run PostgreSQL API because PostgreSQL API
    isn't thread safe.
---
 src/afs.cc             | 2229 ++++++++++++++++++++++++++++++++++++------------
 test/helper/sandbox.rb |    2 +-
 2 files changed, 1668 insertions(+), 563 deletions(-)

diff --git a/src/afs.cc b/src/afs.cc
index 1c792bc..23940b9 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -47,7 +47,7 @@ extern "C"
 #undef Abs
 
 #if PG_VERSION_NUM >= 150000
-#      define PGRN_HAVE_SHMEM_REQUEST_HOOK
+#      define AFS_HAVE_SHMEM_REQUEST_HOOK
 #endif
 
 #include <arrow/buffer.h>
@@ -59,6 +59,7 @@ extern "C"
 #include <arrow/ipc/writer.h>
 #include <arrow/table_builder.h>
 #include <arrow/util/base64.h>
+#include <arrow/util/string.h>
 
 #include <cinttypes>
 #include <condition_variable>
@@ -77,6 +78,12 @@ extern "C"
 #      define AFS_FUNC __func__
 #endif
 
+#ifdef _WIN32
+#      define PRIsize "Iu"
+#else
+#      define PRIsize "zu"
+#endif
+
 // #define AFS_VERBOSE
 #ifdef AFS_VERBOSE
 #      define P(...) ereport(DEBUG5, errmsg_internal(__VA_ARGS__))
@@ -147,14 +154,14 @@ afs_sigsegv(SIGNAL_ARGS)
 }
 #endif
 
-#ifdef PGRN_HAVE_SHMEM_REQUEST_HOOK
+#ifdef AFS_HAVE_SHMEM_REQUEST_HOOK
 static shmem_request_hook_type PreviousShmemRequestHook = nullptr;
 #endif
 static const char* LWLockTrancheName = "arrow-flight-sql: lwlock tranche";
 void
 afs_shmem_request_hook(void)
 {
-#ifdef PGRN_HAVE_SHMEM_REQUEST_HOOK
+#ifdef AFS_HAVE_SHMEM_REQUEST_HOOK
        if (PreviousShmemRequestHook)
                PreviousShmemRequestHook();
 #endif
@@ -204,6 +211,8 @@ struct SharedRingBufferData {
 };
 
 // Naive ring buffer implementation. We can improve this later.
+//
+// We can't call PostgreSQL API in this class.
 class SharedRingBuffer {
    public:
        static void initialize_data(SharedRingBufferData* data)
@@ -214,6 +223,14 @@ class SharedRingBuffer {
                data->tail = 0;
        }
 
+       static void allocate_data(SharedRingBufferData* data, dsa_area* area, 
size_t total)
+       {
+               data->pointer = dsa_allocate(area, total);
+               data->total = total;
+               data->head = 0;
+               data->tail = 0;
+       }
+
        static void free_data(SharedRingBufferData* data, dsa_area* area)
        {
                if (data->pointer != InvalidDsaPointer)
@@ -221,21 +238,11 @@ class SharedRingBuffer {
                initialize_data(data);
        }
 
-       SharedRingBuffer(SharedRingBufferData* data, dsa_area* area)
-               : data_(data), area_(area)
-       {
-       }
-
-       void allocate(size_t total)
+       SharedRingBuffer(SharedRingBufferData* data, void* address, const char* 
tag)
+               : data_(data), address_(static_cast<uint8_t*>(address)), 
tag_(tag)
        {
-               data_->pointer = dsa_allocate(area_, total);
-               data_->total = total;
-               data_->head = 0;
-               data_->tail = 0;
        }
 
-       void free() { free_data(data_, area_); }
-
        size_t size() const
        {
                if (data_->head <= data_->tail)
@@ -250,14 +257,25 @@ class SharedRingBuffer {
 
        size_t rest_size() const { return data_->total - size() - 1; }
 
+       // Need LWLockAcquire() by caller.
        size_t write(const void* data, size_t n)
        {
-               P("%s: %s: before: (%d:%d) %d", Tag, AFS_FUNC, data_->head, 
data_->tail, n);
+#ifdef AFS_VERBOSE
+               const char* tag = "write";
+#endif
+               P("%s: %s: %s: before: (%" PRIsize ":%" PRIsize ") %" PRIsize,
+                 Tag,
+                 tag_,
+                 tag,
+                 data_->head,
+                 data_->tail,
+                 n);
                if (rest_size() == 0)
                {
-                       P("%s: %s: after: no space: (%d:%d) %d:0",
+                       P("%s: %s: %s: after: no space: (%" PRIsize ":%" 
PRIsize ") %" PRIsize ":0",
                          Tag,
-                         AFS_FUNC,
+                         tag_,
+                         tag,
                          data_->head,
                          data_->tail,
                          n);
@@ -265,7 +283,7 @@ class SharedRingBuffer {
                }
 
                size_t writtenSize = 0;
-               auto output = address();
+               auto output = address_;
                if (data_->head <= data_->tail)
                {
                        auto restSize = data_->total - data_->tail;
@@ -274,9 +292,11 @@ class SharedRingBuffer {
                                restSize--;
                        }
                        const auto firstHalfWriteSize = std::min(n, restSize);
-                       P("%s: %s: first half: (%d:%d) %d:%d",
+                       P("%s: %s: %s: first half: (%" PRIsize ":%" PRIsize ") 
%" PRIsize
+                         ":%" PRIsize,
                          Tag,
-                         AFS_FUNC,
+                         tag_,
+                         tag,
                          data_->head,
                          data_->tail,
                          n,
@@ -289,9 +309,10 @@ class SharedRingBuffer {
                if (n > 0 && rest_size() > 0)
                {
                        const auto lastHalfWriteSize = std::min(n, data_->head 
- data_->tail - 1);
-                       P("%s: %s: last half: (%d:%d) %d:%d",
+                       P("%s: %s: %s: last half: (%" PRIsize ":%" PRIsize ") 
%" PRIsize ":%" PRIsize,
                          Tag,
-                         AFS_FUNC,
+                         tag_,
+                         tag,
                          data_->head,
                          data_->tail,
                          n,
@@ -303,27 +324,43 @@ class SharedRingBuffer {
                        n -= lastHalfWriteSize;
                        writtenSize += lastHalfWriteSize;
                }
-               P("%s: %s: after: (%d:%d) %d:%d",
+               P("%s: %s: %s: after: (%" PRIsize ":%" PRIsize ") %" PRIsize 
":%" PRIsize
+                 " [%s] [%s]",
                  Tag,
-                 AFS_FUNC,
+                 tag_,
+                 tag,
                  data_->head,
                  data_->tail,
                  n,
-                 writtenSize);
+                 writtenSize,
+                 arrow::HexEncode(reinterpret_cast<const uint8_t*>(data), 
writtenSize).c_str(),
+                 arrow::HexEncode(reinterpret_cast<uint8_t*>(output), 
writtenSize).c_str());
                return writtenSize;
        }
 
+       // Need LWLockAcquire() by caller.
        size_t read(size_t n, void* output)
        {
-               P("%s: %s: before: (%d:%d) %d", Tag, AFS_FUNC, data_->head, 
data_->tail, n);
+#ifdef AFS_VERBOSE
+               const char* tag = "read";
+#endif
+               P("%s: %s: %s: before: (%" PRIsize ":%" PRIsize ") %" PRIsize,
+                 Tag,
+                 tag_,
+                 tag,
+                 data_->head,
+                 data_->tail,
+                 n);
                size_t readSize = 0;
-               const auto input = address();
+               const auto input = address_;
                if (data_->head > data_->tail)
                {
                        const auto firstHalfReadSize = std::min(n, data_->total 
- data_->head);
-                       P("%s: %s: first half: (%d:%d) %d:%d",
+                       P("%s: %s: %s: first half: (%" PRIsize ":%" PRIsize ") 
%" PRIsize
+                         ":%" PRIsize,
                          Tag,
-                         AFS_FUNC,
+                         tag_,
+                         tag,
                          data_->head,
                          data_->tail,
                          n,
@@ -336,9 +373,10 @@ class SharedRingBuffer {
                if (n > 0 && data_->head != data_->tail)
                {
                        const auto lastHalfReadSize = std::min(n, data_->tail - 
data_->head);
-                       P("%s: %s: last half: (%d:%d) %d:%d",
+                       P("%s: %s: %s: last half: (%" PRIsize ":%" PRIsize ") 
%" PRIsize ":%" PRIsize,
                          Tag,
-                         AFS_FUNC,
+                         tag_,
+                         tag,
                          data_->head,
                          data_->tail,
                          n,
@@ -350,24 +388,24 @@ class SharedRingBuffer {
                        n -= lastHalfReadSize;
                        readSize += lastHalfReadSize;
                }
-               P("%s: %s: after: (%d:%d) %d:%d",
+               P("%s: %s: %s: after: (%" PRIsize ":%" PRIsize ") %" PRIsize 
":%" PRIsize
+                 " [%s] [%s]",
                  Tag,
-                 AFS_FUNC,
+                 tag_,
+                 tag,
                  data_->head,
                  data_->tail,
                  n,
-                 readSize);
+                 readSize,
+                 arrow::HexEncode(reinterpret_cast<const uint8_t*>(output), 
readSize).c_str(),
+                 arrow::HexEncode(reinterpret_cast<const uint8_t*>(input), 
readSize).c_str());
                return readSize;
        }
 
    private:
        SharedRingBufferData* data_;
-       dsa_area* area_;
-
-       uint8_t* address()
-       {
-               return static_cast<uint8_t*>(dsa_get_address(area_, 
data_->pointer));
-       }
+       uint8_t* address_;
+       const char* tag_;
 };
 
 enum class Action
@@ -424,12 +462,38 @@ dsa_pointer_set_string(dsa_pointer& pointer, dsa_area* 
area, const std::string&
        memcpy(dsa_get_address(area, pointer), input.c_str(), input.size() + 1);
 }
 
-// Put only data (don't add methods) to use with dshash.
-struct SessionData {
+// Session data used in only one process.
+//
+// They are based on data in SharedSessionData and process local data.
+struct LocalSessionData {
+       LocalSessionData()
+               : id(0),
+                 valid(false),
+                 peerPID(InvalidPid),
+                 bufferData(nullptr),
+                 bufferAddress(nullptr)
+       {
+       }
+
+       uint64_t id;
+       bool valid;
+       pid_t peerPID;
+       SharedRingBufferData* bufferData;
+       void* bufferAddress;
+};
+
+// Session data shared with multiple processes. LWLockAcquire() with
+// Processor's lock_ is needed when we change this data.
+//
+// This struct must have only data (don't add methods) to use with
+// dshash.
+struct SharedSessionData {
        uint64_t id;
        dsa_pointer errorMessage;
        pid_t executorPID;
+       bool started;
        bool initialized;
+       bool finished;
        dsa_pointer databaseName;
        dsa_pointer userName;
        dsa_pointer password;
@@ -440,20 +504,23 @@ struct SessionData {
        int64_t nUpdatedRecords;
        dsa_pointer prepareQuery;
        dsa_pointer preparedStatementHandle;
+       bool setParametersFinished;
        SharedRingBufferData bufferData;
 };
 
 void
-session_data_initialize(SessionData* session,
-                        dsa_area* area,
-                        const std::string& databaseName,
-                        const std::string& userName,
-                        const std::string& password,
-                        const std::string& clientAddress)
+shared_session_data_initialize(SharedSessionData* session,
+                               dsa_area* area,
+                               const std::string& databaseName,
+                               const std::string& userName,
+                               const std::string& password,
+                               const std::string& clientAddress)
 {
        session->errorMessage = InvalidDsaPointer;
        session->executorPID = InvalidPid;
+       session->started = false;
        session->initialized = false;
+       session->finished = false;
        dsa_pointer_set_string(session->databaseName, area, databaseName);
        dsa_pointer_set_string(session->userName, area, userName);
        dsa_pointer_set_string(session->password, area, password);
@@ -464,11 +531,12 @@ session_data_initialize(SessionData* session,
        session->nUpdatedRecords = -1;
        session->prepareQuery = InvalidDsaPointer;
        session->preparedStatementHandle = InvalidDsaPointer;
+       session->setParametersFinished = false;
        SharedRingBuffer::initialize_data(&(session->bufferData));
 }
 
 void
-session_data_finalize(SessionData* session, dsa_area* area)
+shared_session_data_finalize(SharedSessionData* session, dsa_area* area)
 {
        if (DsaPointerIsValid(session->errorMessage))
                dsa_free(area, session->errorMessage);
@@ -489,23 +557,23 @@ session_data_finalize(SessionData* session, dsa_area* 
area)
        SharedRingBuffer::free_data(&(session->bufferData), area);
 }
 
-class SessionReleaser {
+class SharedSessionReleaser {
    public:
-       explicit SessionReleaser(dshash_table* sessions, SessionData* data)
+       explicit SharedSessionReleaser(dshash_table* sessions, 
SharedSessionData* data)
                : sessions_(sessions), data_(data)
        {
        }
 
-       ~SessionReleaser() { dshash_release_lock(sessions_, data_); }
+       ~SharedSessionReleaser() { dshash_release_lock(sessions_, data_); }
 
    private:
        dshash_table* sessions_;
-       SessionData* data_;
+       SharedSessionData* data_;
 };
 
 static dshash_parameters SessionsParams = {
        sizeof(uint64_t),
-       sizeof(SessionData),
+       sizeof(SharedSessionData),
        dshash_memcmp,
        dshash_memhash,
        0,  // Set later because this is determined dynamically.
@@ -528,14 +596,7 @@ class Processor {
                Written,
        };
 
-       Processor(const char* tag, bool runInPGThread)
-               : tag_(tag),
-                 runInPGThread_(runInPGThread),
-                 sharedData_(nullptr),
-                 area_(nullptr),
-                 lock_(),
-                 mutex_(),
-                 conditionVariable_()
+       Processor(const char* tag) : tag_(tag), sharedData_(nullptr), 
area_(nullptr), lock_()
        {
        }
 
@@ -553,87 +614,39 @@ class Processor {
 
        void lock_release() { LWLockRelease(lock_); }
 
-       SharedRingBuffer create_shared_ring_buffer(SessionData* session)
+       // Executor and Server must implement this.
+       virtual arrow::Result<size_t> read(LocalSessionData* session,
+                                          SharedRingBuffer* buffer,
+                                          size_t n,
+                                          void* output)
+       {
+               return 0;
+       }
+
+       // Executor and Server must implement this.
+       virtual arrow::Result<size_t> write(LocalSessionData* session,
+                                           SharedRingBuffer* buffer,
+                                           const void* data,
+                                           size_t n)
        {
-               return SharedRingBuffer(&(session->bufferData), area_);
+               return 0;
        }
 
-       arrow::Status wait(SessionData* session, SharedRingBuffer* buffer, 
WaitMode mode)
+       arrow::Status wait(LocalSessionData* session, SharedRingBuffer* buffer, 
WaitMode mode)
        {
-               const bool read = (mode == WaitMode::Read);
-               const char* tag = read ? "wait read" : "wait written";
+               const char* tag = (mode == WaitMode::Read) ? "wait read" : 
"wait written";
                auto peerPID = peer_pid(session);
-               auto peerName = peer_name(session);
 
                if (ARROW_PREDICT_FALSE(peerPID == InvalidPid))
                {
                        return arrow::Status::IOError(
-                               Tag, ": ", tag_, ": ", tag, ": ", peerName, ": 
not alive");
+                               Tag, ": ", tag_, ": ", tag, ": ", peer_name(), 
": not alive");
                }
 
-               P("%s: %s: %s: %s: kill: %d", Tag, tag_, tag, peerName, 
peerPID);
+               P("%s: %s: %s: %s: kill: %d", Tag, tag_, tag, peer_name(), 
peerPID);
                kill(peerPID, SIGUSR1);
-               auto get_target_size =
-                       read ? [](SharedRingBuffer* buffer) { return 
buffer->rest_size(); }
-                                : [](SharedRingBuffer* buffer) { return 
buffer->size(); };
-               if (runInPGThread_)
-               {
-                       while (true)
-                       {
-                               int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
-                               WaitLatch(MyLatch, events, -1, 
PG_WAIT_EXTENSION);
-                               if (GotSIGTERM)
-                               {
-                                       break;
-                               }
-                               ResetLatch(MyLatch);
-
-                               if (GotSIGUSR1)
-                               {
-                                       GotSIGUSR1 = false;
-                                       P("%s: %s: %s: %s: wait: %d:%d",
-                                         Tag,
-                                         tag_,
-                                         tag,
-                                         peerName,
-                                         get_target_size(buffer),
-                                         targetSize);
-                                       if (get_target_size(buffer) > 0)
-                                       {
-                                               break;
-                                       }
-                               }
-
-                               // TODO: Convert PG error to arrow::Status.
-                               CHECK_FOR_INTERRUPTS();
-                       }
-               }
-               else
-               {
-                       std::unique_lock<std::mutex> lock(mutex_);
-                       conditionVariable_.wait(lock, [&] {
-                               P("%s: %s: %s: %s: wait: %d:%d",
-                                 Tag,
-                                 tag_,
-                                 tag,
-                                 peerName,
-                                 get_target_size(buffer),
-                                 targetSize);
-                               if (INTERRUPTS_PENDING_CONDITION())
-                               {
-                                       return true;
-                               }
-                               return get_target_size(buffer) > 0;
-                       });
-               }
-               return arrow::Status::OK();
-       }
 
-       virtual void signaled()
-       {
-               P("%s: %s: signaled: before", Tag, tag_);
-               conditionVariable_.notify_all();
-               P("%s: %s: signaled: after", Tag, tag_);
+               return wait_internal(session, buffer, mode, tag);
        }
 
    protected:
@@ -642,17 +655,34 @@ class Processor {
                dsa_pointer_set_string(pointer, area_, input);
        }
 
-       virtual pid_t peer_pid(SessionData* session) { return InvalidPid; }
+       virtual pid_t peer_pid(LocalSessionData* session) { return InvalidPid; }
+
+       virtual const char* peer_name() { return "unknown"; }
+
+       virtual arrow::Status wait_internal(LocalSessionData* session,
+                                           SharedRingBuffer* buffer,
+                                           WaitMode mode,
+                                           const char* tag)
+       {
+               return arrow::Status::NotImplemented("wait_internal");
+       }
 
-       virtual const char* peer_name(SessionData* session) { return "unknown"; 
}
+       size_t get_waiting_buffer_size(SharedRingBuffer* buffer, WaitMode mode)
+       {
+               if (mode == WaitMode::Read)
+               {
+                       return buffer->rest_size();
+               }
+               else
+               {
+                       return buffer->size();
+               }
+       }
 
        const char* tag_;
-       bool runInPGThread_;
        SharedData* sharedData_;
        dsa_area* area_;
        LWLock* lock_;
-       std::mutex mutex_;
-       std::condition_variable conditionVariable_;
 };
 
 struct ProcessorLockGuard {
@@ -666,13 +696,13 @@ struct ProcessorLockGuard {
        Processor* processor_;
 };
 
-class Proxy;
 class SharedRingBufferInputStream : public arrow::io::InputStream {
    public:
-       SharedRingBufferInputStream(Processor* processor, SessionData* session)
+       SharedRingBufferInputStream(Processor* processor,
+                                   std::shared_ptr<LocalSessionData> 
localSession)
                : arrow::io::InputStream(),
                  processor_(processor),
-                 session_(session),
+                 localSession_(std::move(localSession)),
                  position_(0),
                  is_open_(true)
        {
@@ -701,18 +731,18 @@ class SharedRingBufferInputStream : public 
arrow::io::InputStream {
 
    private:
        Processor* processor_;
-       SessionData* session_;
+       std::shared_ptr<LocalSessionData> localSession_;
        int64_t position_;
        bool is_open_;
 };
 
-class Executor;
 class SharedRingBufferOutputStream : public arrow::io::OutputStream {
    public:
-       SharedRingBufferOutputStream(Processor* processor, SessionData* session)
+       SharedRingBufferOutputStream(Processor* processor,
+                                    std::shared_ptr<LocalSessionData> 
localSession)
                : arrow::io::OutputStream(),
                  processor_(processor),
-                 session_(session),
+                 localSession_(std::move(localSession)),
                  position_(0),
                  is_open_(true)
        {
@@ -734,15 +764,14 @@ class SharedRingBufferOutputStream : public 
arrow::io::OutputStream {
 
    private:
        Processor* processor_;
-       SessionData* session_;
+       std::shared_ptr<LocalSessionData> localSession_;
        int64_t position_;
        bool is_open_;
 };
 
 class WorkerProcessor : public Processor {
    public:
-       explicit WorkerProcessor(const char* tag, bool runInPGThread)
-               : Processor(tag, runInPGThread), sessions_(nullptr)
+       explicit WorkerProcessor(const char* tag) : Processor(tag), 
sessions_(nullptr)
        {
                LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
                bool found;
@@ -765,13 +794,6 @@ class WorkerProcessor : public Processor {
 
        ~WorkerProcessor() override { dshash_detach(sessions_); }
 
-   protected:
-       void delete_session(SessionData* session)
-       {
-               session_data_finalize(session, area_);
-               dshash_delete_entry(sessions_, session);
-       }
-
    protected:
        dshash_table* sessions_;
 };
@@ -1426,21 +1448,23 @@ class PreparedStatement {
 class Executor : public WorkerProcessor {
    public:
        explicit Executor(uint64_t sessionID)
-               : WorkerProcessor("executor", true),
+               : WorkerProcessor("executor"),
                  sessionID_(sessionID),
-                 session_(nullptr),
-                 connected_(false),
+                 localSession_(std::make_shared<LocalSessionData>()),
+                 needFinish_(false),
                  closed_(false),
                  nextPreparedStatementID_(1),
                  preparedStatements_()
        {
+               localSession_->id = sessionID_;
+               localSession_->peerPID = sharedData_->serverPID;
        }
 
        ~Executor()
        {
                if (!closed_)
                {
-                       close_internal(false);
+                       close();
                }
        }
 
@@ -1450,45 +1474,101 @@ class Executor : public WorkerProcessor {
                // pg_usleep(5000000);
                // pg_usleep(5000000);
                pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
opening").c_str());
-               session_ = static_cast<SessionData*>(dshash_find(sessions_, 
&sessionID_, false));
-               auto databaseName =
-                       static_cast<const char*>(dsa_get_address(area_, 
session_->databaseName));
-               auto userName =
-                       static_cast<const char*>(dsa_get_address(area_, 
session_->userName));
-               auto password =
-                       static_cast<const char*>(dsa_get_address(area_, 
session_->password));
-               auto clientAddress =
-                       static_cast<const char*>(dsa_get_address(area_, 
session_->clientAddress));
-               BackgroundWorkerInitializeConnection(databaseName, userName, 0);
-               CurrentResourceOwner = ResourceOwnerCreate(nullptr, 
"arrow-flight-sql: Executor");
-               if (!check_password(databaseName, userName, password, 
clientAddress))
-               {
-                       session_->initialized = true;
+               auto session = find_session();
+               PG_TRY();
+               {
+                       auto databaseName =
+                               static_cast<const char*>(dsa_get_address(area_, 
session->databaseName));
+                       auto userName =
+                               static_cast<const char*>(dsa_get_address(area_, 
session->userName));
+                       auto password =
+                               static_cast<const char*>(dsa_get_address(area_, 
session->password));
+                       auto clientAddress =
+                               static_cast<const char*>(dsa_get_address(area_, 
session->clientAddress));
+                       BackgroundWorkerInitializeConnection(databaseName, 
userName, 0);
+                       CurrentResourceOwner =
+                               ResourceOwnerCreate(nullptr, "arrow-flight-sql: 
Executor");
+                       if (check_password(session, databaseName, userName, 
password, clientAddress))
+                       {
+                               // TODO: Customizable.
+                               SharedRingBuffer::allocate_data(
+                                       &(session->bufferData), area_, 1L * 
1024L * 1024L);
+
+                               SetCurrentStatementStartTimestamp();
+                               SPI_connect();
+                               needFinish_ = true;
+                       }
+                       session->initialized = true;
+                       localSession_->valid = true;
+                       localSession_->bufferData = &(session->bufferData);
+                       localSession_->bufferAddress =
+                               dsa_get_address(area_, 
session->bufferData.pointer);
+                       P("%s: %s: open: %" PRIu64, Tag, tag_, 
session->bufferData.pointer);
+                       dshash_release_lock(sessions_, session);
                        signal_server(tag);
-                       return;
                }
+               PG_CATCH();
+               {
+                       set_error_message(session, "failed to connect", tag);
+                       session->initialized = true;
+                       dshash_release_lock(sessions_, session);
+                       signal_server(tag);
+                       PG_RE_THROW();
+               }
+               PG_END_TRY();
+       }
+
+       void close()
+       {
+               closed_ = true;
+               pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
closing").c_str());
+               preparedStatements_.clear();
+               if (needFinish_)
                {
-                       SharedRingBuffer buffer(&(session_->bufferData), area_);
-                       // TODO: Customizable.
-                       buffer.allocate(1L * 1024L * 1024L);
+                       SPI_finish();
+                       needFinish_ = false;
                }
-               SetCurrentStatementStartTimestamp();
-               SPI_connect();
-               pgstat_report_activity(STATE_IDLE, NULL);
-               session_->initialized = true;
-               connected_ = true;
-               signal_server(tag);
+               if (CurrentResourceOwner)
+               {
+                       auto resourceOwner = CurrentResourceOwner;
+                       CurrentResourceOwner = nullptr;
+                       ResourceOwnerRelease(
+                               resourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, 
false, true);
+                       ResourceOwnerRelease(resourceOwner, 
RESOURCE_RELEASE_LOCKS, false, true);
+                       ResourceOwnerRelease(
+                               resourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, 
false, true);
+                       ResourceOwnerDelete(resourceOwner);
+               }
+               pgstat_report_activity(STATE_IDLE, nullptr);
+       }
+
+       arrow::Result<size_t> read(LocalSessionData* session,
+                                  SharedRingBuffer* buffer,
+                                  size_t n,
+                                  void* output) override
+       {
+               ProcessorLockGuard lock(this);
+               return buffer->read(n, output);
        }
 
-       void close() { close_internal(true); }
+       arrow::Result<size_t> write(LocalSessionData* session,
+                                   SharedRingBuffer* buffer,
+                                   const void* data,
+                                   size_t n) override
+       {
+               ProcessorLockGuard lock(this);
+               return buffer->write(data, n);
+       }
 
-       void signaled() override
+       void signaled()
        {
                Action action;
                {
+                       auto session = find_session();
+                       SharedSessionReleaser sessionReleaser(sessions_, 
session);
                        ProcessorLockGuard lock(this);
-                       action = session_->action;
-                       session_->action = Action::None;
+                       action = session->action;
+                       session->action = Action::None;
                }
                P("%s: %s: signaled: before: %s", Tag, tag_, 
action_name(action));
                PG_TRY();
@@ -1517,34 +1597,86 @@ class Executor : public WorkerProcessor {
                                        update_prepared_statement();
                                        break;
                                default:
-                                       Processor::signaled();
+                                       // TODO: Report an error
                                        break;
                        }
                }
                PG_CATCH();
                {
-                       if (session_ && 
!DsaPointerIsValid(session_->errorMessage))
+                       auto session = find_session();
+                       SharedSessionReleaser sessionReleaser(sessions_, 
session);
+                       if (!DsaPointerIsValid(session->errorMessage))
                        {
                                auto error = CopyErrorData();
-                               set_error_message(std::string("failed to run: 
") + action_name(action) +
+                               set_error_message(session,
+                                                 std::string("failed to run: 
") + action_name(action) +
                                                      ": " + error->message,
                                                  "unexpected error");
                                FreeErrorData(error);
                        }
-                       pgstat_report_activity(STATE_IDLE, NULL);
+                       pgstat_report_activity(STATE_IDLE, nullptr);
                        PG_RE_THROW();
                }
                PG_END_TRY();
-               pgstat_report_activity(STATE_IDLE, NULL);
+               pgstat_report_activity(STATE_IDLE, nullptr);
                P("%s: %s: signaled: after: %s", Tag, tag_, 
action_name(action));
        }
 
    protected:
-       pid_t peer_pid(SessionData* session) override { return 
sharedData_->serverPID; }
+       pid_t peer_pid(LocalSessionData* session) override { return 
sharedData_->serverPID; }
+
+       const char* peer_name() override { return "server"; }
+
+       arrow::Status wait_internal(LocalSessionData* sessio51n,
+                                   SharedRingBuffer* buffer,
+                                   WaitMode mode,
+                                   const char* tag) override
+       {
+               while (true)
+               {
+                       int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+                       WaitLatch(MyLatch, events, -1, PG_WAIT_EXTENSION);
+                       if (GotSIGTERM)
+                       {
+                               break;
+                       }
+                       ResetLatch(MyLatch);
+
+                       if (GotSIGUSR1)
+                       {
+                               GotSIGUSR1 = false;
+                               P("%s: %s: %s: %s: wait: %" PRIsize,
+                                 Tag,
+                                 tag_,
+                                 tag,
+                                 peer_name(),
+                                 get_waiting_buffer_size(buffer, mode));
+                               if (get_waiting_buffer_size(buffer, mode) > 0)
+                               {
+                                       break;
+                               }
+                       }
 
-       const char* peer_name(SessionData* session) override { return "server"; 
}
+                       // TODO: Convert PG error to arrow::Status.
+                       CHECK_FOR_INTERRUPTS();
+               }
+               return arrow::Status::OK();
+       }
 
    private:
+       SharedSessionData* find_session()
+       {
+               auto session =
+                       static_cast<SharedSessionData*>(dshash_find(sessions_, 
&sessionID_, false));
+               if (!session)
+               {
+                       ereport(ERROR,
+                               errcode(ERRCODE_INTERNAL_ERROR),
+                               errmsg("%s: %s: unknown session: %" PRIu64, 
Tag, tag_, sessionID_));
+               }
+               return session;
+       }
+
        void signal_server(const char* tag)
        {
                if (sharedData_->serverPID == InvalidPid)
@@ -1555,58 +1687,23 @@ class Executor : public WorkerProcessor {
                kill(sharedData_->serverPID, SIGUSR1);
        }
 
-       void set_error_message(const std::string& message, const char* tag)
+       void set_error_message(SharedSessionData* session,
+                              const std::string& message,
+                              const char* tag)
        {
-               if (DsaPointerIsValid(session_->errorMessage))
+               if (DsaPointerIsValid(session->errorMessage))
                {
                        return;
                }
                {
                        ProcessorLockGuard lock(this);
-                       set_shared_string(session_->errorMessage, message);
+                       set_shared_string(session->errorMessage, message);
                }
                signal_server(tag);
        }
 
-       void close_internal(bool unlockSession)
-       {
-               const char* tag = "close";
-               closed_ = true;
-               pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
closing").c_str());
-               preparedStatements_.clear();
-               if (connected_)
-               {
-                       SPI_finish();
-                       {
-                               SharedRingBuffer 
buffer(&(session_->bufferData), area_);
-                               buffer.free();
-                       }
-                       delete_session(session_);
-               }
-               else
-               {
-                       set_error_message("failed to connect", tag);
-                       session_->initialized = true;
-                       if (unlockSession)
-                       {
-                               dshash_release_lock(sessions_, session_);
-                       }
-                       signal_server(tag);
-               }
-               if (CurrentResourceOwner)
-               {
-                       auto resourceOwner = CurrentResourceOwner;
-                       CurrentResourceOwner = nullptr;
-                       ResourceOwnerRelease(
-                               resourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, 
false, true);
-                       ResourceOwnerRelease(resourceOwner, 
RESOURCE_RELEASE_LOCKS, false, true);
-                       ResourceOwnerRelease(
-                               resourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, 
false, true);
-                       ResourceOwnerDelete(resourceOwner);
-               }
-       }
-
-       bool check_password(const char* databaseName,
+       bool check_password(SharedSessionData* session,
+                           const char* databaseName,
                            const char* userName,
                            const char* password,
                            const char* clientAddress)
@@ -1620,7 +1717,7 @@ class Executor : public WorkerProcessor {
                Port port = {};
                port.database_name = pstrdup(databaseName);
                port.user_name = pstrdup(userName);
-               if (!fill_client_address(&port, clientAddress))
+               if (!fill_client_address(session, &port, clientAddress))
                {
                        return false;
                }
@@ -1628,18 +1725,18 @@ class Executor : public WorkerProcessor {
                hba_getauthmethod(&port);
                if (!port.hba)
                {
-                       set_error_message("failed to get auth method", tag);
+                       set_error_message(session, "failed to get auth method", 
tag);
                        return false;
                }
                switch (port.hba->auth_method)
                {
                        case uaMD5:
                                // TODO
-                               set_error_message("MD5 auth method isn't 
supported yet", tag);
+                               set_error_message(session, "MD5 auth method 
isn't supported yet", tag);
                                return false;
                        case uaSCRAM:
                                // TODO
-                               set_error_message("SCRAM auth method isn't 
supported yet", tag);
+                               set_error_message(session, "SCRAM auth method 
isn't supported yet", tag);
                                return false;
                        case uaPassword:
                        {
@@ -1647,7 +1744,8 @@ class Executor : public WorkerProcessor {
                                auto shadowPassword = 
get_role_password(port.user_name, &logDetail);
                                if (!shadowPassword)
                                {
-                                       set_error_message(std::string("failed 
to get password: ") + logDetail,
+                                       set_error_message(session,
+                                                         std::string("failed 
to get password: ") + logDetail,
                                                          tag);
                                        return false;
                                }
@@ -1656,7 +1754,9 @@ class Executor : public WorkerProcessor {
                                if (result != STATUS_OK)
                                {
                                        set_error_message(
-                                               std::string("failed to verify 
password: ") + logDetail, tag);
+                                               session,
+                                               std::string("failed to verify 
password: ") + logDetail,
+                                               tag);
                                        return false;
                                }
                                return true;
@@ -1664,14 +1764,17 @@ class Executor : public WorkerProcessor {
                        case uaTrust:
                                return true;
                        default:
-                               set_error_message(std::string("unsupported auth 
method: ") +
+                               set_error_message(session,
+                                                 std::string("unsupported auth 
method: ") +
                                                      
hba_authname(port.hba->auth_method),
                                                  tag);
                                return false;
                }
        }
 
-       bool fill_client_address(Port* port, const char* clientAddress)
+       bool fill_client_address(SharedSessionData* session,
+                                Port* port,
+                                const char* clientAddress)
        {
                const char* tag = "fill client address";
                // clientAddress: "ipv4:127.0.0.1:40468"
@@ -1688,7 +1791,9 @@ class Executor : public WorkerProcessor {
                if (!(clientFamily == "ipv4" || clientFamily == "ipv6"))
                {
                        set_error_message(
-                               std::string("client family must be ipv4 or 
ipv6: ") + clientFamily, tag);
+                               session,
+                               std::string("client family must be ipv4 or 
ipv6: ") + clientFamily,
+                               tag);
                        return false;
                }
                auto clientPortStart = clientPort.c_str();
@@ -1696,17 +1801,19 @@ class Executor : public WorkerProcessor {
                auto clientPortNumber = std::strtoul(clientPortStart, 
&clientPortEnd, 10);
                if (clientPortEnd[0] != '\0')
                {
-                       set_error_message(std::string("client port is invalid: 
") + clientPort, tag);
+                       set_error_message(
+                               session, std::string("client port is invalid: 
") + clientPort, tag);
                        return false;
                }
                if (clientPortNumber == 0)
                {
-                       set_error_message(std::string("client port must not 
0"), tag);
+                       set_error_message(session, std::string("client port 
must not 0"), tag);
                        return false;
                }
                if (clientPortNumber > 65535)
                {
-                       set_error_message(std::string("client port is too 
large: ") +
+                       set_error_message(session,
+                                         std::string("client port is too 
large: ") +
                                              std::to_string(clientPortNumber),
                                          tag);
                        return false;
@@ -1720,7 +1827,9 @@ class Executor : public WorkerProcessor {
                        if (inet_pton(AF_INET, clientHost.c_str(), 
&(raddr->sin_addr)) == 0)
                        {
                                set_error_message(
-                                       std::string("client IPv4 address is 
invalid: ") + clientHost, tag);
+                                       session,
+                                       std::string("client IPv4 address is 
invalid: ") + clientHost,
+                                       tag);
                                return false;
                        }
                }
@@ -1734,7 +1843,9 @@ class Executor : public WorkerProcessor {
                        if (inet_pton(AF_INET6, clientHost.c_str(), 
&(raddr->sin6_addr)) == 0)
                        {
                                set_error_message(
-                                       std::string("client IPv6 address is 
invalid: ") + clientHost, tag);
+                                       session,
+                                       std::string("client IPv6 address is 
invalid: ") + clientHost,
+                                       tag);
                                return false;
                        }
                        raddr->sin6_scope_id = 0;
@@ -1745,22 +1856,26 @@ class Executor : public WorkerProcessor {
        void select()
        {
                const char* tag = "select";
-               if (!DsaPointerIsValid(session_->selectQuery))
+               pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
selecting").c_str());
+
+               auto session = find_session();
+               SharedSessionReleaser sessionReleaser(sessions_, session);
+               if (!DsaPointerIsValid(session->selectQuery))
                {
                        set_error_message(
-                               std::string(Tag) + ": " + tag_ + ": " + tag + 
": query is missing", tag);
+                               session,
+                               std::string(Tag) + ": " + tag_ + ": " + tag + 
": query is missing",
+                               tag);
                        return;
                }
 
-               pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
selecting").c_str());
-
                std::string query;
                {
                        ProcessorLockGuard lock(this);
                        query =
-                               static_cast<const char*>(dsa_get_address(area_, 
session_->selectQuery));
-                       dsa_free(area_, session_->selectQuery);
-                       session_->selectQuery = InvalidDsaPointer;
+                               static_cast<const char*>(dsa_get_address(area_, 
session->selectQuery));
+                       dsa_free(area_, session->selectQuery);
+                       session->selectQuery = InvalidDsaPointer;
                }
                P("%s: %s: %s: %s", Tag, tag_, tag, query.c_str());
 
@@ -1770,36 +1885,37 @@ class Executor : public WorkerProcessor {
 
                        SetCurrentStatementStartTimestamp();
                        auto result = SPI_execute(query.c_str(), true, 0);
-
+                       needFinish_ = true;
                        if (result > 0)
                        {
                                pgstat_report_activity(
                                        STATE_RUNNING, (std::string(Tag) + ": " 
+ tag + ": writing").c_str());
-                               auto status = write(tag);
-                               if (status.ok())
-                               {
-                                       signal_server(tag);
-                               }
-                               else
+                               auto status = write_record_batches(tag);
+                               if (!status.ok())
                                {
-                                       set_error_message(std::string(Tag) + ": 
" + tag_ + ": " + tag +
+                                       set_error_message(session,
+                                                         std::string(Tag) + ": 
" + tag_ + ": " + tag +
                                                              ": failed to 
write: " + status.ToString(),
                                                          tag);
                                }
                        }
                        else
                        {
-                               set_error_message(std::string(Tag) + ": " + 
tag_ + ": " + tag +
+                               set_error_message(session,
+                                                 std::string(Tag) + ": " + 
tag_ + ": " + tag +
                                                      ": failed to run a query: 
<" + query +
                                                      ">: " + 
SPI_result_code_string(result),
                                                  tag);
                        }
+                       SPI_finish();
+                       needFinish_ = false;
+                       signal_server(tag);
                }
        }
 
-       arrow::Status write(const char* tag)
+       arrow::Status write_record_batches(const char* tag)
        {
-               SharedRingBufferOutputStream output(this, session_);
+               SharedRingBufferOutputStream output(this, localSession_);
                std::vector<std::shared_ptr<arrow::Field>> fields;
                for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
                {
@@ -1915,22 +2031,26 @@ class Executor : public WorkerProcessor {
        void update()
        {
                const char* tag = "update";
-               if (!DsaPointerIsValid(session_->updateQuery))
+               pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
updating").c_str());
+
+               auto session = find_session();
+               SharedSessionReleaser sessionReleaser(sessions_, session);
+               if (!DsaPointerIsValid(session->updateQuery))
                {
                        set_error_message(
-                               std::string(Tag) + ": " + tag_ + ": " + tag + 
": query is missing", tag);
+                               session,
+                               std::string(Tag) + ": " + tag_ + ": " + tag + 
": query is missing",
+                               tag);
                        return;
                }
 
-               pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
updating").c_str());
-
                std::string query;
                {
                        ProcessorLockGuard lock(this);
                        query =
-                               static_cast<const char*>(dsa_get_address(area_, 
session_->updateQuery));
-                       dsa_free(area_, session_->updateQuery);
-                       session_->updateQuery = InvalidDsaPointer;
+                               static_cast<const char*>(dsa_get_address(area_, 
session->updateQuery));
+                       dsa_free(area_, session->updateQuery);
+                       session->updateQuery = InvalidDsaPointer;
                }
                P("%s: %s: %s: %s", Tag, tag_, tag, query.c_str());
 
@@ -1940,40 +2060,48 @@ class Executor : public WorkerProcessor {
 
                        SetCurrentStatementStartTimestamp();
                        auto result = SPI_execute(query.c_str(), false, 0);
+                       needFinish_ = true;
                        if (result > 0)
                        {
-                               session_->nUpdatedRecords = SPI_processed;
-                               signal_server(tag);
+                               session->nUpdatedRecords = SPI_processed;
                        }
                        else
                        {
-                               set_error_message(std::string(Tag) + ": " + 
tag_ + ": " + tag +
+                               set_error_message(session,
+                                                 std::string(Tag) + ": " + 
tag_ + ": " + tag +
                                                      ": failed to run a query: 
<" + query +
                                                      ">: " + 
SPI_result_code_string(result),
                                                  tag);
                        }
+                       SPI_finish();
+                       needFinish_ = false;
+                       signal_server(tag);
                }
        }
 
        void prepare()
        {
                const char* tag = "prepare";
-               if (!DsaPointerIsValid(session_->prepareQuery))
+               pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
preparing").c_str());
+
+               auto session = find_session();
+               SharedSessionReleaser sessionReleaser(sessions_, session);
+               if (!DsaPointerIsValid(session->prepareQuery))
                {
                        set_error_message(
-                               std::string(Tag) + ": " + tag_ + ": " + tag + 
": query is missing", tag);
+                               session,
+                               std::string(Tag) + ": " + tag_ + ": " + tag + 
": query is missing",
+                               tag);
                        return;
                }
 
-               pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": 
preparing").c_str());
-
                std::string query;
                {
                        ProcessorLockGuard lock(this);
                        query =
-                               static_cast<const char*>(dsa_get_address(area_, 
session_->prepareQuery));
-                       dsa_free(area_, session_->prepareQuery);
-                       session_->prepareQuery = InvalidDsaPointer;
+                               static_cast<const char*>(dsa_get_address(area_, 
session->prepareQuery));
+                       dsa_free(area_, session->prepareQuery);
+                       session->prepareQuery = InvalidDsaPointer;
                }
                P("%s: %s: %s: %s", Tag, tag_, tag, query.c_str());
                std::string handle(std::to_string(nextPreparedStatementID_++));
@@ -1981,26 +2109,28 @@ class Executor : public WorkerProcessor {
                        std::make_pair(handle, 
PreparedStatement(std::move(query))));
                {
                        ProcessorLockGuard lock(this);
-                       set_shared_string(session_->preparedStatementHandle, 
handle);
+                       set_shared_string(session->preparedStatementHandle, 
handle);
                }
                signal_server(tag);
        }
 
-       bool extract_handle(std::string& handle, const char* tag)
+       bool extract_handle(SharedSessionData* session, std::string& handle, 
const char* tag)
        {
-               if (!DsaPointerIsValid(session_->preparedStatementHandle))
+               if (!DsaPointerIsValid(session->preparedStatementHandle))
                {
                        set_error_message(
-                               std::string(Tag) + ": " + tag_ + ": " + tag + 
": handle is missing", tag);
+                               session,
+                               std::string(Tag) + ": " + tag_ + ": " + tag + 
": handle is missing",
+                               tag);
                        return false;
                }
 
                {
                        ProcessorLockGuard lock(this);
                        handle = static_cast<const char*>(
-                               dsa_get_address(area_, 
session_->preparedStatementHandle));
-                       dsa_free(area_, session_->preparedStatementHandle);
-                       session_->preparedStatementHandle = InvalidDsaPointer;
+                               dsa_get_address(area_, 
session->preparedStatementHandle));
+                       dsa_free(area_, session->preparedStatementHandle);
+                       session->preparedStatementHandle = InvalidDsaPointer;
                }
 
                return true;
@@ -2012,27 +2142,29 @@ class Executor : public WorkerProcessor {
 
                pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": " 
+ tag).c_str());
 
+               auto session = find_session();
+               SharedSessionReleaser sessionReleaser(sessions_, session);
                std::string handle;
-               if (!extract_handle(handle, tag))
+               if (!extract_handle(session, handle, tag))
                {
                        return;
                }
                P("%s: %s: %s: %s", Tag, tag_, tag, handle.c_str());
-               if (preparedStatements_.erase(handle) > 0)
-               {
-                       signal_server(tag);
-               }
-               else
+               if (preparedStatements_.erase(handle) == 0)
                {
-                       set_error_message(std::string(Tag) + ": " + tag_ + ": " 
+ tag +
+                       set_error_message(session,
+                                         std::string(Tag) + ": " + tag_ + ": " 
+ tag +
                                              ": nonexistent handle: <" + 
handle + ">",
                                          tag);
                }
+               signal_server(tag);
        }
 
-       PreparedStatement* find_prepared_statement(std::string& handle, const 
char* tag)
+       PreparedStatement* find_prepared_statement(SharedSessionData* session,
+                                                  std::string& handle,
+                                                  const char* tag)
        {
-               if (!extract_handle(handle, tag))
+               if (!extract_handle(session, handle, tag))
                {
                        return nullptr;
                }
@@ -2041,7 +2173,8 @@ class Executor : public WorkerProcessor {
                auto it = preparedStatements_.find(handle);
                if (it == preparedStatements_.end())
                {
-                       set_error_message(std::string(Tag) + ": " + tag_ + ": " 
+ tag +
+                       set_error_message(session,
+                                         std::string(Tag) + ": " + tag_ + ": " 
+ tag +
                                              ": nonexistent handle: <" + 
handle + ">",
                                          tag);
                        return nullptr;
@@ -2058,8 +2191,10 @@ class Executor : public WorkerProcessor {
 
                pgstat_report_activity(STATE_RUNNING,
                                       (std::string(Tag) + ": setting 
parameters").c_str());
+               auto session = find_session();
+               SharedSessionReleaser sessionReleaser(sessions_, session);
                std::string handle;
-               auto preparedStatement = find_prepared_statement(handle, tag);
+               auto preparedStatement = find_prepared_statement(session, 
handle, tag);
                P("%s: %s: %s: %s", Tag, tag_, tag, handle.c_str());
 
                if (!preparedStatement)
@@ -2067,19 +2202,18 @@ class Executor : public WorkerProcessor {
                        return;
                }
 
-               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, session_);
+               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, localSession_);
                auto status = preparedStatement->set_parameters(input);
-               if (status.ok())
-               {
-                       signal_server(tag);
-               }
-               else
+               if (!status.ok())
                {
-                       set_error_message(std::string(Tag) + ": " + tag_ + ": " 
+ tag +
+                       set_error_message(session,
+                                         std::string(Tag) + ": " + tag_ + ": " 
+ tag +
                                              ": failed to set parameters: <" + 
handle +
                                              ">: " + status.ToString(),
                                          tag);
                }
+               session->setParametersFinished = true;
+               signal_server(tag);
        }
 
        void select_prepared_statement()
@@ -2089,8 +2223,10 @@ class Executor : public WorkerProcessor {
                pgstat_report_activity(
                        STATE_RUNNING, (std::string(Tag) + ": selecting 
prepared statement").c_str());
 
+               auto session = find_session();
+               SharedSessionReleaser sessionReleaser(sessions_, session);
                std::string handle;
-               auto preparedStatement = find_prepared_statement(handle, tag);
+               auto preparedStatement = find_prepared_statement(session, 
handle, tag);
                P("%s: %s: %s: %s", Tag, tag_, tag, handle.c_str());
 
                if (!preparedStatement)
@@ -2108,20 +2244,21 @@ class Executor : public WorkerProcessor {
                auto status = preparedStatement->select(
                        [](void* data) {
                                auto d = static_cast<Data*>(data);
-                               return d->executor->write(d->tag);
+                               return 
d->executor->write_record_batches(d->tag);
                        },
                        &data);
-               if (status.ok())
-               {
-                       signal_server(tag);
-               }
-               else
+               needFinish_ = true;
+               if (!status.ok())
                {
-                       set_error_message(std::string(Tag) + ": " + tag_ + ": " 
+ tag +
+                       set_error_message(session,
+                                         std::string(Tag) + ": " + tag_ + ": " 
+ tag +
                                              ": failed to select a prepared 
statement: <" + handle +
                                              ">: " + status.ToString(),
                                          tag);
                }
+               SPI_finish();
+               needFinish_ = false;
+               signal_server(tag);
        }
 
        void update_prepared_statement()
@@ -2131,8 +2268,10 @@ class Executor : public WorkerProcessor {
                pgstat_report_activity(
                        STATE_RUNNING, (std::string(Tag) + ": updating prepared 
statement").c_str());
 
+               auto session = find_session();
+               SharedSessionReleaser sessionReleaser(sessions_, session);
                std::string handle;
-               auto preparedStatement = find_prepared_statement(handle, tag);
+               auto preparedStatement = find_prepared_statement(session, 
handle, tag);
                P("%s: %s: %s: %s", Tag, tag_, tag, handle.c_str());
 
                if (!preparedStatement)
@@ -2143,25 +2282,29 @@ class Executor : public WorkerProcessor {
                ScopedTransaction scopedTransaction;
                ScopedSnapshot scopedSnapshot;
 
-               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, session_);
+               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, localSession_);
                auto n_updated_records_result = 
preparedStatement->update(input);
+               needFinish_ = true;
                if (n_updated_records_result.ok())
                {
-                       session_->nUpdatedRecords = *n_updated_records_result;
-                       signal_server(tag);
+                       session->nUpdatedRecords = *n_updated_records_result;
                }
                else
                {
-                       set_error_message(std::string(Tag) + ": " + tag_ + ": " 
+ tag +
+                       set_error_message(session,
+                                         std::string(Tag) + ": " + tag_ + ": " 
+ tag +
                                              ": failed to update a prepared 
statement: <" + handle +
                                              ">: " + 
n_updated_records_result.status().ToString(),
                                          tag);
                }
+               SPI_finish();
+               needFinish_ = false;
+               signal_server(tag);
        }
 
        uint64_t sessionID_;
-       SessionData* session_;
-       bool connected_;
+       std::shared_ptr<LocalSessionData> localSession_;
+       bool needFinish_;
        bool closed_;
        uint64_t nextPreparedStatementID_;
        std::map<std::string, PreparedStatement> preparedStatements_;
@@ -2177,46 +2320,274 @@ SharedRingBufferOutputStream::Write(const void* data, 
int64_t nBytes)
        }
        if (ARROW_PREDICT_TRUE(nBytes > 0))
        {
-               auto buffer = processor_->create_shared_ring_buffer(session_);
+               SharedRingBuffer buffer(
+                       localSession_->bufferData, 
localSession_->bufferAddress, processor_->tag());
                size_t rest = static_cast<size_t>(nBytes);
                while (true)
                {
-                       processor_->lock_acquire();
-                       auto writtenSize = buffer.write(data, rest);
-                       processor_->lock_release();
+                       ARROW_ASSIGN_OR_RAISE(
+                               auto writtenBytes,
+                               processor_->write(localSession_.get(), &buffer, 
data, rest));
+                       if (INTERRUPTS_PENDING_CONDITION())
+                       {
+                               return arrow::Status::IOError(std::string(Tag) 
+ ": " +
+                                                             processor_->tag() 
+ ": interrupted");
+                       }
+
+                       if (writtenBytes == 0)
+                       {
+                               ARROW_RETURN_NOT_OK(processor_->wait(
+                                       localSession_.get(), &buffer, 
Processor::WaitMode::Read));
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return 
arrow::Status::IOError(std::string(Tag) + ": " +
+                                                                     
processor_->tag() + ": interrupted");
+                               }
+                               continue;
+                       }
 
-                       position_ += writtenSize;
-                       rest -= writtenSize;
-                       data = static_cast<const uint8_t*>(data) + writtenSize;
+                       position_ += writtenBytes;
+                       rest -= writtenBytes;
+                       data = static_cast<const uint8_t*>(data) + writtenBytes;
 
                        if (ARROW_PREDICT_TRUE(rest == 0))
                        {
                                break;
                        }
-
-                       ARROW_RETURN_NOT_OK(
-                               processor_->wait(session_, &buffer, 
Processor::WaitMode::Read));
                }
        }
        return arrow::Status::OK();
 }
 
+// There is only one Proxy object in a PostgreSQL instance. The Proxy
+// object communicates multiple "executor" processes.
+//
+// This class must be thread safe. We call PostgreSQL API only in
+// signaled(). We can't call PostgreSQL API in other methods because
+// they are called from gRPC threads not the main thread. PostgreSQL
+// API isn't thread safe and must be called only in the main thread.
+//
+// If we need to call PostgreSQL API from gRPC threads, we need to
+// create a request and process it in signaled().
 class Proxy : public WorkerProcessor {
    public:
        explicit Proxy()
-               : WorkerProcessor("proxy", false), randomSeed_(), 
randomEngine_(randomSeed_())
+               : WorkerProcessor("proxy"),
+                 mutex_(),
+                 conditionVariable_(),
+                 randomSeed_(),
+                 randomEngine_(randomSeed_()),
+                 localSessions_(),
+                 connectRequests_(),
+                 selectRequests_(),
+                 updateRequests_(),
+                 prepareRequests_(),
+                 closePreparedStatementRequests_(),
+                 setParametersRequests_(),
+                 selectPreparedStatementRequests_(),
+                 updatePreparedStatementRequests_(),
+                 readRequests_(),
+                 writeRequests_()
+       {
+       }
+
+   private:
+       std::mutex mutex_;
+       std::condition_variable conditionVariable_;
+       std::random_device randomSeed_;
+       std::mt19937_64 randomEngine_;
+       std::map<uint64_t, std::shared_ptr<LocalSessionData>> localSessions_;
+
+   public:
+       arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> open_reader(
+               uint64_t sessionID)
+       {
+               std::shared_ptr<LocalSessionData> localSession;
+               {
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       auto it = localSessions_.find(sessionID);
+                       if (it == localSessions_.end())
+                       {
+                               return arrow::Status::Invalid("unknown session: 
", sessionID);
+                       }
+                       localSession = it->second;
+               }
+               auto input =
+                       std::make_shared<SharedRingBufferInputStream>(this, 
std::move(localSession));
+               // Read another stream format data with record batches.
+               return arrow::ipc::RecordBatchStreamReader::Open(input);
+       }
+
+       // This is only called from PostgreSQL (main) thread. We can use
+       // PostgreSQL API only in this method and methods called from this
+       // method.
+       void signaled()
+       {
+               process_connect_requests();
+               process_select_requests();
+               process_update_requests();
+               process_prepare_requests();
+               process_close_prepared_statement_requests();
+               process_set_parameters_requests();
+               process_select_prepared_statement_requests();
+               process_update_prepared_statement_requests();
+               process_read_requests();
+               process_write_requests();
+               delete_finished_sessions();
+               conditionVariable_.notify_all();
+       }
+
+   private:
+       struct ConnectRequest {
+               ConnectRequest(uint64_t sessionID,
+                              std::string databaseName,
+                              std::string userName,
+                              std::string password,
+                              std::string clientAddress)
+                       : sessionID(sessionID),
+                         databaseName(std::move(databaseName)),
+                         userName(std::move(userName)),
+                         password(std::move(password)),
+                         clientAddress(std::move(clientAddress)),
+                         processing(false),
+                         finished(false),
+                         errorMessage(std::nullopt)
+               {
+               }
+               uint64_t sessionID;
+               std::string databaseName;
+               std::string userName;
+               std::string password;
+               std::string clientAddress;
+               bool processing;
+               bool finished;
+               std::optional<std::string> errorMessage;
+       };
+
+       std::list<std::shared_ptr<ConnectRequest>> connectRequests_;
+
+       // Can use PostgreSQL API.
+       void process_connect_requests()
+       {
+               std::lock_guard<std::mutex> lock(mutex_);
+               bool haveRequest = false;
+               for (auto it = connectRequests_.begin(); it != 
connectRequests_.end();)
+               {
+                       auto& request = *it;
+                       ProcessorLockGuard lock(this);
+                       if (request->processing)
+                       {
+                               auto session = static_cast<SharedSessionData*>(
+                                       dshash_find(sessions_, 
&(request->sessionID), false));
+                               if (!session)
+                               {
+                                       ++it;
+                                       continue;
+                               }
+                               const auto initialized = session->initialized;
+                               if (initialized)
+                               {
+                                       if 
(DsaPointerIsValid(session->errorMessage))
+                                       {
+                                               request->errorMessage = 
static_cast<const char*>(
+                                                       dsa_get_address(area_, 
session->errorMessage));
+                                       }
+                                       else
+                                       {
+                                               auto& localSession =
+                                                       
localSessions_.find(request->sessionID)->second;
+                                               localSession->valid = true;
+                                               localSession->peerPID = 
session->executorPID;
+                                               localSession->bufferData = 
&(session->bufferData);
+                                               localSession->bufferAddress =
+                                                       dsa_get_address(area_, 
session->bufferData.pointer);
+                                               P("%s: %s: connect: %" PRIu64,
+                                                 Tag,
+                                                 tag_,
+                                                 session->bufferData.pointer);
+                                       }
+                               }
+                               dshash_release_lock(sessions_, session);
+                               if (!initialized)
+                               {
+                                       ++it;
+                                       continue;
+                               }
+                               request->finished = true;
+                               it = connectRequests_.erase(it);
+                       }
+                       else
+                       {
+                               bool found;
+                               auto session = static_cast<SharedSessionData*>(
+                                       dshash_find_or_insert(sessions_, 
&(request->sessionID), &found));
+                               if (found)
+                               {
+                                       request->finished = true;
+                                       request->errorMessage = 
std::string("duplicated session ID: ") +
+                                                               
std::to_string(request->sessionID);
+                                       it = connectRequests_.erase(it);
+                               }
+                               else
+                               {
+                                       request->processing = true;
+                                       shared_session_data_initialize(session,
+                                                                      area_,
+                                                                      
request->databaseName,
+                                                                      
request->userName,
+                                                                      
request->password,
+                                                                      
request->clientAddress);
+                                       haveRequest = true;
+                                       ++it;
+                               }
+                               dshash_release_lock(sessions_, session);
+                       }
+               }
+               if (haveRequest)
+               {
+                       kill(sharedData_->mainPID, SIGUSR1);
+               }
+       }
+
+       // Don't use PostgreSQL API.
+       uint64_t assign_session_id()
        {
+               std::lock_guard<std::mutex> lock(mutex_);
+               while (true)
+               {
+                       auto id = randomEngine_();
+                       if (id == 0)
+                       {
+                               continue;
+                       }
+                       auto beforeSize = localSessions_.size();
+                       auto localSession = 
std::make_shared<LocalSessionData>();
+                       localSessions_.insert(std::make_pair(id, localSession));
+                       if (localSessions_.size() == beforeSize)
+                       {
+                               continue;
+                       }
+                       localSession->id = id;
+                       localSession->valid = false;
+                       return id;
+               }
        }
 
+   public:
+       // Don't use PostgreSQL API.
        arrow::Result<uint64_t> connect(const std::string& databaseName,
                                        const std::string& userName,
                                        const std::string& password,
                                        const std::string& clientAddress)
        {
-               auto session = create_session(databaseName, userName, password, 
clientAddress);
-               auto id = session->id;
-               dshash_release_lock(sessions_, session);
-               kill(sharedData_->mainPID, SIGUSR1);
+               auto id = assign_session_id();
+               auto request = std::make_shared<ConnectRequest>(
+                       id, databaseName, userName, password, clientAddress);
+               {
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       connectRequests_.push_back(request);
+               }
+               kill(MyProcPid, SIGUSR1);
                {
                        std::unique_lock<std::mutex> lock(mutex_);
                        conditionVariable_.wait(lock, [&] {
@@ -2224,25 +2595,12 @@ class Proxy : public WorkerProcessor {
                                {
                                        return true;
                                }
-                               session = 
static_cast<SessionData*>(dshash_find(sessions_, &id, false));
-                               if (!session)
-                               {
-                                       return true;
-                               }
-                               const auto initialized = session->initialized;
-                               dshash_release_lock(sessions_, session);
-                               return initialized;
+                               return request->finished;
                        });
                }
-               session = static_cast<SessionData*>(dshash_find(sessions_, &id, 
false));
-               if (!session)
-               {
-                       return arrow::Status::Invalid("session is stale: ", id);
-               }
-               SessionReleaser sessionReleaser(sessions_, session);
-               if (DsaPointerIsValid(session->errorMessage))
+               if (request->errorMessage.has_value())
                {
-                       return report_session_error(session);
+                       return 
arrow::Status::Invalid(request->errorMessage.value());
                }
                if (INTERRUPTS_PENDING_CONDITION())
                {
@@ -2251,166 +2609,562 @@ class Proxy : public WorkerProcessor {
                return id;
        }
 
+       // Don't use PostgreSQL API.
        bool is_valid_session(uint64_t sessionID)
        {
-               auto session = find_session(sessionID);
-               if (session)
+               std::lock_guard<std::mutex> lock(mutex_);
+               auto it = localSessions_.find(sessionID);
+               if (it == localSessions_.end())
                {
-                       dshash_release_lock(sessions_, session);
-                       return true;
+                       return false;
                }
-               else
+               return it->second->valid;
+       }
+
+   private:
+       struct SelectRequest {
+               SelectRequest(uint64_t sessionID, std::string query)
+                       : sessionID(sessionID),
+                         query(std::move(query)),
+                         finished(false),
+                         errorMessage(std::nullopt)
                {
-                       return false;
                }
+               uint64_t sessionID;
+               std::string query;
+               bool finished;
+               std::optional<std::string> errorMessage;
+       };
+
+       std::list<std::shared_ptr<SelectRequest>> selectRequests_;
+
+       // Can use PostgreSQL API.
+       void process_select_requests()
+       {
+#ifdef AFS_VERBOSE
+               const char* tag = "select";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto& request : selectRequests_)
+               {
+                       request->finished = true;
+                       auto session = static_cast<SharedSessionData*>(
+                               dshash_find(sessions_, &(request->sessionID), 
false));
+                       if (!session)
+                       {
+                               request->errorMessage =
+                                       std::string("stolen session: ") + 
std::to_string(request->sessionID);
+                               continue;
+                       }
+                       auto executorPID = session->executorPID;
+                       {
+                               SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                               ProcessorLockGuard lock(this);
+                               set_shared_string(session->selectQuery, 
request->query);
+                               session->action = Action::Select;
+                       }
+                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
executorPID);
+                       kill(executorPID, SIGUSR1);
+               }
+               selectRequests_.clear();
+       }
+
+       arrow::Result<std::shared_ptr<arrow::Schema>> read_schema(uint64_t 
sessionID,
+                                                                 const char* 
tag)
+       {
+               auto it = localSessions_.find(sessionID);
+               if (it == localSessions_.end())
+               {
+                       return arrow::Status::Invalid("Unknown session: ", 
sessionID);
+               }
+               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, it->second);
+
+               // Read schema only stream format data.
+               ARROW_ASSIGN_OR_RAISE(auto reader,
+                                     
arrow::ipc::RecordBatchStreamReader::Open(input));
+               while (true)
+               {
+                       std::shared_ptr<arrow::RecordBatch> recordBatch;
+                       P("%s: %s: %s: read next", Tag, tag_, tag);
+                       ARROW_RETURN_NOT_OK(reader->ReadNext(&recordBatch));
+                       if (!recordBatch)
+                       {
+                               break;
+                       }
+               }
+               return reader->schema();
        }
 
+   public:
        arrow::Result<std::shared_ptr<arrow::Schema>> select(uint64_t sessionID,
                                                             const std::string& 
query)
        {
                const char* tag = "select";
-               auto session = find_session(sessionID);
-               SessionReleaser sessionReleaser(sessions_, session);
-               set_shared_string(session->selectQuery, query);
-               session->action = Action::Select;
-               if (session->executorPID != InvalidPid)
+               auto request = std::make_shared<SelectRequest>(sessionID, 
query);
                {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       selectRequests_.push_back(request);
                }
+               kill(MyProcPid, SIGUSR1);
                {
-                       auto buffer = create_shared_ring_buffer(session);
                        std::unique_lock<std::mutex> lock(mutex_);
                        conditionVariable_.wait(lock, [&] {
-                               P("%s: %s: %s: wait", Tag, tag_, tag);
-                               return DsaPointerIsValid(session->errorMessage) 
|| buffer.size() > 0;
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
                        });
                }
-               if (DsaPointerIsValid(session->errorMessage))
+               if (request->errorMessage.has_value())
+               {
+                       return 
arrow::Status::Invalid(request->errorMessage.value());
+               }
+               if (INTERRUPTS_PENDING_CONDITION())
                {
-                       return report_session_error(session);
+                       return arrow::Status::Invalid("interrupted");
                }
                P("%s: %s: %s: open", Tag, tag_, tag);
-               auto schema = read_schema(session, tag);
+               auto schema = read_schema(sessionID, tag);
                P("%s: %s: %s: schema", Tag, tag_, tag);
                return schema;
        }
 
+   private:
+       struct UpdateRequest {
+               UpdateRequest(uint64_t sessionID, std::string query)
+                       : sessionID(sessionID),
+                         query(std::move(query)),
+                         processing(false),
+                         finished(false),
+                         nUpdatedRecords(0),
+                         errorMessage(std::nullopt)
+               {
+               }
+               uint64_t sessionID;
+               std::string query;
+               bool processing;
+               bool finished;
+               int64_t nUpdatedRecords;
+               std::optional<std::string> errorMessage;
+       };
+
+       std::list<std::shared_ptr<UpdateRequest>> updateRequests_;
+
+       // Can use PostgreSQL API.
+       void process_update_requests()
+       {
+#ifdef AFS_VERBOSE
+               const char* tag = "update";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto it = updateRequests_.begin(); it != 
updateRequests_.end();)
+               {
+                       auto& request = *it;
+                       auto session = static_cast<SharedSessionData*>(
+                               dshash_find(sessions_, &(request->sessionID), 
false));
+                       if (!session)
+                       {
+                               request->finished = true;
+                               request->errorMessage =
+                                       std::string("stolen session: ") + 
std::to_string(request->sessionID);
+                               it = updateRequests_.erase(it);
+                               continue;
+                       }
+                       if (request->processing)
+                       {
+                               SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                               if (DsaPointerIsValid(session->errorMessage))
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       request->errorMessage = 
static_cast<const char*>(
+                                               dsa_get_address(area_, 
session->errorMessage));
+                                       it = updateRequests_.erase(it);
+                               }
+                               else if (session->nUpdatedRecords >= 0)
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       request->nUpdatedRecords = 
session->nUpdatedRecords;
+                                       it = updateRequests_.erase(it);
+                               }
+                               else
+                               {
+                                       ++it;
+                               }
+                       }
+                       else
+                       {
+                               auto executorPID = session->executorPID;
+                               {
+                                       SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                                       ProcessorLockGuard lock(this);
+                                       set_shared_string(session->updateQuery, 
request->query);
+                                       session->action = Action::Update;
+                                       session->nUpdatedRecords = -1;
+                               }
+                               P("%s: %s: %s: kill executor: %d", Tag, tag_, 
tag, executorPID);
+                               kill(executorPID, SIGUSR1);
+                               request->processing = true;
+                               ++it;
+                       }
+               }
+       }
+
+   public:
        arrow::Result<int64_t> update(uint64_t sessionID, const std::string& 
query)
        {
 #ifdef AFS_VERBOSE
                const char* tag = "update";
 #endif
-               auto session = find_session(sessionID);
-               SessionReleaser sessionReleaser(sessions_, session);
-               lock_acquire();
-               set_shared_string(session->updateQuery, query);
-               session->action = Action::Update;
-               session->nUpdatedRecords = -1;
-               lock_release();
-               if (session->executorPID != InvalidPid)
+               auto request = std::make_shared<UpdateRequest>(sessionID, 
query);
                {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       updateRequests_.push_back(request);
                }
+               kill(MyProcPid, SIGUSR1);
                {
                        std::unique_lock<std::mutex> lock(mutex_);
                        conditionVariable_.wait(lock, [&] {
-                               P("%s: %s: %s: wait", Tag, tag_, tag);
-                               return DsaPointerIsValid(session->errorMessage) 
||
-                                      session->nUpdatedRecords >= 0;
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
                        });
                }
-               if (DsaPointerIsValid(session->errorMessage))
+               if (request->errorMessage.has_value())
                {
-                       return report_session_error(session);
+                       return 
arrow::Status::Invalid(request->errorMessage.value());
                }
-               P("%s: %s: %s: done: %ld", Tag, tag_, tag, 
session->nUpdatedRecords);
-               return session->nUpdatedRecords;
+               if (INTERRUPTS_PENDING_CONDITION())
+               {
+                       return arrow::Status::Invalid("interrupted");
+               }
+               P("%s: %s: %s: done: %" PRIu64, Tag, tag_, tag, 
request->nUpdatedRecords);
+               return request->nUpdatedRecords;
        }
 
-       arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> read(uint64_t 
sessionID)
+   private:
+       struct PrepareRequest {
+               PrepareRequest(uint64_t sessionID, std::string query)
+                       : sessionID(sessionID),
+                         query(std::move(query)),
+                         processing(false),
+                         finished(false),
+                         handle(),
+                         errorMessage(std::nullopt)
+               {
+               }
+               uint64_t sessionID;
+               std::string query;
+               bool processing;
+               bool finished;
+               std::string handle;
+               std::optional<std::string> errorMessage;
+       };
+
+       std::list<std::shared_ptr<PrepareRequest>> prepareRequests_;
+
+       // Can use PostgreSQL API.
+       void process_prepare_requests()
        {
-               auto session = find_session(sessionID);
-               SessionReleaser sessionReleaser(sessions_, session);
-               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, session);
-               // Read another stream format data with record batches.
-               return arrow::ipc::RecordBatchStreamReader::Open(input);
+#ifdef AFS_VERBOSE
+               const char* tag = "prepare";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto it = prepareRequests_.begin(); it != 
prepareRequests_.end();)
+               {
+                       auto& request = *it;
+                       auto session = static_cast<SharedSessionData*>(
+                               dshash_find(sessions_, &(request->sessionID), 
false));
+                       if (!session)
+                       {
+                               request->finished = true;
+                               request->errorMessage =
+                                       std::string("stolen session: ") + 
std::to_string(request->sessionID);
+                               it = prepareRequests_.erase(it);
+                               continue;
+                       }
+                       if (request->processing)
+                       {
+                               SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                               if (DsaPointerIsValid(session->errorMessage))
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       request->errorMessage = 
static_cast<const char*>(
+                                               dsa_get_address(area_, 
session->errorMessage));
+                                       it = prepareRequests_.erase(it);
+                               }
+                               else if 
(DsaPointerIsValid(session->preparedStatementHandle))
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       request->handle = static_cast<const 
char*>(
+                                               dsa_get_address(area_, 
session->preparedStatementHandle));
+                                       it = prepareRequests_.erase(it);
+                               }
+                               else
+                               {
+                                       ++it;
+                               }
+                       }
+                       else
+                       {
+                               auto executorPID = session->executorPID;
+                               {
+                                       SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                                       ProcessorLockGuard lock(this);
+                                       
set_shared_string(session->prepareQuery, request->query);
+                                       session->action = Action::Prepare;
+                                       session->nUpdatedRecords = -1;
+                                       
set_shared_string(session->preparedStatementHandle, std::string(""));
+                               }
+                               P("%s: %s: %s: kill executor: %d", Tag, tag_, 
tag, executorPID);
+                               kill(executorPID, SIGUSR1);
+                               request->processing = true;
+                               ++it;
+                       }
+               }
        }
 
+   public:
        arrow::Result<arrow::flight::sql::ActionCreatePreparedStatementResult> 
prepare(
                uint64_t sessionID, const std::string& query)
        {
 #ifdef AFS_VERBOSE
                const char* tag = "prepare";
 #endif
-               auto session = find_session(sessionID);
-               SessionReleaser sessionReleaser(sessions_, session);
-               lock_acquire();
-               set_shared_string(session->prepareQuery, query);
-               session->action = Action::Prepare;
-               set_shared_string(session->preparedStatementHandle, 
std::string(""));
-               lock_release();
-               if (session->executorPID != InvalidPid)
+               auto request = std::make_shared<PrepareRequest>(sessionID, 
query);
                {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       prepareRequests_.push_back(request);
                }
+               kill(MyProcPid, SIGUSR1);
                {
                        std::unique_lock<std::mutex> lock(mutex_);
                        conditionVariable_.wait(lock, [&] {
-                               P("%s: %s: %s: wait", Tag, tag_, tag);
-                               return DsaPointerIsValid(session->errorMessage) 
||
-                                      
DsaPointerIsValid(session->preparedStatementHandle);
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
                        });
                }
-               if (DsaPointerIsValid(session->errorMessage))
+               if (request->errorMessage.has_value())
+               {
+                       return 
arrow::Status::Invalid(request->errorMessage.value());
+               }
+               if (INTERRUPTS_PENDING_CONDITION())
                {
-                       return report_session_error(session);
+                       return arrow::Status::Invalid("interrupted");
                }
-               std::string handle(static_cast<const char*>(
-                       dsa_get_address(area_, 
session->preparedStatementHandle)));
                arrow::flight::sql::ActionCreatePreparedStatementResult result 
= {
                        nullptr,
                        nullptr,
-                       std::move(handle),
+                       std::move(request->handle),
                };
                P("%s: %s: %s: done", Tag, tag_, tag);
                return result;
        }
 
+   private:
+       struct ClosePreparedStatementRequest {
+               ClosePreparedStatementRequest(uint64_t sessionID, std::string 
handle)
+                       : sessionID(sessionID),
+                         handle(std::move(handle)),
+                         processing(false),
+                         finished(false),
+                         errorMessage(std::nullopt)
+               {
+               }
+               uint64_t sessionID;
+               std::string handle;
+               bool processing;
+               bool finished;
+               std::optional<std::string> errorMessage;
+       };
+
+       std::list<std::shared_ptr<ClosePreparedStatementRequest>>
+               closePreparedStatementRequests_;
+
+       // Can use PostgreSQL API.
+       void process_close_prepared_statement_requests()
+       {
+#ifdef AFS_VERBOSE
+               const char* tag = "close prepared statement";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto it = closePreparedStatementRequests_.begin();
+                    it != closePreparedStatementRequests_.end();)
+               {
+                       auto& request = *it;
+                       auto session = static_cast<SharedSessionData*>(
+                               dshash_find(sessions_, &(request->sessionID), 
false));
+                       if (!session)
+                       {
+                               request->finished = true;
+                               request->errorMessage =
+                                       std::string("stolen session: ") + 
std::to_string(request->sessionID);
+                               it = closePreparedStatementRequests_.erase(it);
+                               continue;
+                       }
+                       if (request->processing)
+                       {
+                               SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                               if (DsaPointerIsValid(session->errorMessage))
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       request->errorMessage = 
static_cast<const char*>(
+                                               dsa_get_address(area_, 
session->errorMessage));
+                                       it = 
closePreparedStatementRequests_.erase(it);
+                               }
+                               else if 
(DsaPointerIsValid(session->preparedStatementHandle))
+                               {
+                                       ++it;
+                               }
+                               else
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       it = 
closePreparedStatementRequests_.erase(it);
+                               }
+                       }
+                       else
+                       {
+                               auto executorPID = session->executorPID;
+                               {
+                                       SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                                       ProcessorLockGuard lock(this);
+                                       
set_shared_string(session->preparedStatementHandle, request->handle);
+                                       session->action = 
Action::ClosePreparedStatement;
+                               }
+                               P("%s: %s: %s: kill executor: %d", Tag, tag_, 
tag, executorPID);
+                               kill(executorPID, SIGUSR1);
+                               request->processing = true;
+                               ++it;
+                       }
+               }
+       }
+
+   public:
        arrow::Status close_prepared_statement(uint64_t sessionID, const 
std::string& handle)
        {
 #ifdef AFS_VERBOSE
                const char* tag = "close prepared statement";
 #endif
-               auto session = find_session(sessionID);
-               SessionReleaser sessionReleaser(sessions_, session);
-               lock_acquire();
-               set_shared_string(session->preparedStatementHandle, handle);
-               session->action = Action::ClosePreparedStatement;
-               lock_release();
-               if (session->executorPID != InvalidPid)
+               auto request = 
std::make_shared<ClosePreparedStatementRequest>(sessionID, handle);
                {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       closePreparedStatementRequests_.push_back(request);
                }
+               kill(MyProcPid, SIGUSR1);
                {
                        std::unique_lock<std::mutex> lock(mutex_);
                        conditionVariable_.wait(lock, [&] {
-                               P("%s: %s: %s: wait", Tag, tag_, tag);
-                               return DsaPointerIsValid(session->errorMessage) 
||
-                                      
!DsaPointerIsValid(session->preparedStatementHandle);
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
                        });
                }
-               if (DsaPointerIsValid(session->errorMessage))
+               if (request->errorMessage.has_value())
+               {
+                       return 
arrow::Status::Invalid(request->errorMessage.value());
+               }
+               if (INTERRUPTS_PENDING_CONDITION())
                {
-                       return report_session_error(session);
+                       return arrow::Status::Invalid("interrupted");
                }
                P("%s: %s: %s: done", Tag, tag_, tag);
                return arrow::Status::OK();
        }
 
+   private:
+       struct SetParametersRequest {
+               SetParametersRequest(uint64_t sessionID, std::string handle)
+                       : sessionID(sessionID),
+                         handle(std::move(handle)),
+                         processing(false),
+                         finished(false),
+                         errorMessage(std::nullopt)
+               {
+               }
+               uint64_t sessionID;
+               std::string handle;
+               bool processing;
+               bool finished;
+               std::optional<std::string> errorMessage;
+       };
+
+       std::list<std::shared_ptr<SetParametersRequest>> setParametersRequests_;
+
+       // Can use PostgreSQL API.
+       void process_set_parameters_requests()
+       {
+#ifdef AFS_VERBOSE
+               const char* tag = "set parameters";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto it = setParametersRequests_.begin();
+                    it != setParametersRequests_.end();)
+               {
+                       auto& request = *it;
+                       auto session = static_cast<SharedSessionData*>(
+                               dshash_find(sessions_, &(request->sessionID), 
false));
+                       if (!session)
+                       {
+                               request->finished = true;
+                               request->errorMessage =
+                                       std::string("stolen session: ") + 
std::to_string(request->sessionID);
+                               it = setParametersRequests_.erase(it);
+                               continue;
+                       }
+                       if (request->processing)
+                       {
+                               SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                               if (DsaPointerIsValid(session->errorMessage))
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       request->errorMessage = 
static_cast<const char*>(
+                                               dsa_get_address(area_, 
session->errorMessage));
+                                       it = setParametersRequests_.erase(it);
+                               }
+                               else if (session->setParametersFinished)
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       it = setParametersRequests_.erase(it);
+                               }
+                               else
+                               {
+                                       ++it;
+                               }
+                       }
+                       else
+                       {
+                               auto executorPID = session->executorPID;
+                               {
+                                       SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                                       ProcessorLockGuard lock(this);
+                                       
set_shared_string(session->preparedStatementHandle, request->handle);
+                                       session->action = Action::SetParameters;
+                                       session->setParametersFinished = false;
+                               }
+                               P("%s: %s: %s: kill executor: %d", Tag, tag_, 
tag, executorPID);
+                               kill(executorPID, SIGUSR1);
+                               request->processing = true;
+                               ++it;
+                       }
+               }
+       }
+
+   public:
        arrow::Status set_parameters(uint64_t sessionID,
                                     const std::string& handle,
                                     arrow::flight::FlightMessageReader* reader,
@@ -2419,20 +3173,17 @@ class Proxy : public WorkerProcessor {
 #ifdef AFS_VERBOSE
                const char* tag = "set parameters";
 #endif
-               auto session = find_session(sessionID);
-               SessionReleaser sessionReleaser(sessions_, session);
-               lock_acquire();
-               set_shared_string(session->preparedStatementHandle, handle);
-               session->action = Action::SetParameters;
-               lock_release();
-               if (session->executorPID != InvalidPid)
+               auto request = 
std::make_shared<SetParametersRequest>(sessionID, handle);
                {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       setParametersRequests_.push_back(request);
                }
+               kill(MyProcPid, SIGUSR1);
+               auto session = localSessions_.find(sessionID)->second;
+               auto executorPID = session->peerPID;
                {
                        ARROW_ASSIGN_OR_RAISE(const auto& schema, 
reader->GetSchema());
-                       SharedRingBufferOutputStream output(this, session);
+                       SharedRingBufferOutputStream output(this, 
std::move(session));
                        auto options = arrow::ipc::IpcWriteOptions::Defaults();
                        options.emit_dictionary_deltas = true;
                        ARROW_ASSIGN_OR_RAISE(auto writer,
@@ -2447,61 +3198,201 @@ class Proxy : public WorkerProcessor {
                                
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*(chunk.data)));
                        }
                        ARROW_RETURN_NOT_OK(writer->Close());
-               }
-               if (session->executorPID != InvalidPid)
-               {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
executorPID);
+                       kill(executorPID, SIGUSR1);
                }
                {
-                       auto buffer = create_shared_ring_buffer(session);
                        std::unique_lock<std::mutex> lock(mutex_);
                        conditionVariable_.wait(lock, [&] {
-                               P("%s: %s: %s: wait", Tag, tag_, tag);
-                               return DsaPointerIsValid(session->errorMessage) 
|| buffer.size() == 0;
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
                        });
                }
-               if (DsaPointerIsValid(session->errorMessage))
+               if (request->errorMessage.has_value())
+               {
+                       return 
arrow::Status::Invalid(request->errorMessage.value());
+               }
+               if (INTERRUPTS_PENDING_CONDITION())
                {
-                       return report_session_error(session);
+                       return arrow::Status::Invalid("interrupted");
                }
                P("%s: %s: %s: done", Tag, tag_, tag);
                return arrow::Status::OK();
        }
 
+   private:
+       struct SelectPreparedStatementRequest {
+               SelectPreparedStatementRequest(uint64_t sessionID, std::string 
handle)
+                       : sessionID(sessionID),
+                         handle(std::move(handle)),
+                         finished(false),
+                         errorMessage(std::nullopt)
+               {
+               }
+               uint64_t sessionID;
+               std::string handle;
+               bool finished;
+               std::optional<std::string> errorMessage;
+       };
+
+       std::list<std::shared_ptr<SelectPreparedStatementRequest>>
+               selectPreparedStatementRequests_;
+
+       // Can use PostgreSQL API.
+       void process_select_prepared_statement_requests()
+       {
+#ifdef AFS_VERBOSE
+               const char* tag = "select prepared statement";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto& request : selectPreparedStatementRequests_)
+               {
+                       request->finished = true;
+                       auto session = static_cast<SharedSessionData*>(
+                               dshash_find(sessions_, &(request->sessionID), 
false));
+                       if (!session)
+                       {
+                               request->errorMessage =
+                                       std::string("stolen session: ") + 
std::to_string(request->sessionID);
+                               continue;
+                       }
+                       auto executorPID = session->executorPID;
+                       {
+                               SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                               ProcessorLockGuard lock(this);
+                               
set_shared_string(session->preparedStatementHandle, request->handle);
+                               session->action = 
Action::SelectPreparedStatement;
+                       }
+                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
executorPID);
+                       kill(executorPID, SIGUSR1);
+               }
+               selectPreparedStatementRequests_.clear();
+       }
+
+   public:
        arrow::Result<std::shared_ptr<arrow::Schema>> select_prepared_statement(
                uint64_t sessionID, const std::string& handle)
        {
                const char* tag = "select prepared statement";
-               auto session = find_session(sessionID);
-               SessionReleaser sessionReleaser(sessions_, session);
-               lock_acquire();
-               set_shared_string(session->preparedStatementHandle, handle);
-               session->action = Action::SelectPreparedStatement;
-               lock_release();
-               if (session->executorPID != InvalidPid)
+               auto request =
+                       
std::make_shared<SelectPreparedStatementRequest>(sessionID, handle);
                {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       selectPreparedStatementRequests_.push_back(request);
                }
+               kill(MyProcPid, SIGUSR1);
                {
-                       auto buffer = create_shared_ring_buffer(session);
                        std::unique_lock<std::mutex> lock(mutex_);
                        conditionVariable_.wait(lock, [&] {
-                               P("%s: %s: %s: wait", Tag, tag_, tag);
-                               return DsaPointerIsValid(session->errorMessage) 
|| buffer.size() > 0;
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
                        });
                }
-               if (DsaPointerIsValid(session->errorMessage))
+               if (request->errorMessage.has_value())
                {
-                       return report_session_error(session);
+                       return 
arrow::Status::Invalid(request->errorMessage.value());
+               }
+               if (INTERRUPTS_PENDING_CONDITION())
+               {
+                       return arrow::Status::Invalid("interrupted");
                }
                P("%s: %s: %s: open", Tag, tag_, tag);
-               auto schema = read_schema(session, tag);
+               auto schema = read_schema(sessionID, tag);
                P("%s: %s: %s: schema", Tag, tag_, tag);
                return schema;
        }
 
+   private:
+       struct UpdatePreparedStatementRequest {
+               UpdatePreparedStatementRequest(uint64_t sessionID, std::string 
handle)
+                       : sessionID(sessionID),
+                         handle(std::move(handle)),
+                         processing(false),
+                         finished(false),
+                         nUpdatedRecords(0),
+                         errorMessage(std::nullopt)
+               {
+               }
+               uint64_t sessionID;
+               std::string handle;
+               bool processing;
+               bool finished;
+               int64_t nUpdatedRecords;
+               std::optional<std::string> errorMessage;
+       };
+
+       std::list<std::shared_ptr<UpdatePreparedStatementRequest>>
+               updatePreparedStatementRequests_;
+
+       // Can use PostgreSQL API.
+       void process_update_prepared_statement_requests()
+       {
+#ifdef AFS_VERBOSE
+               const char* tag = "update prepared statement";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto it = updatePreparedStatementRequests_.begin();
+                    it != updatePreparedStatementRequests_.end();)
+               {
+                       auto& request = *it;
+                       auto session = static_cast<SharedSessionData*>(
+                               dshash_find(sessions_, &(request->sessionID), 
false));
+                       if (!session)
+                       {
+                               request->finished = true;
+                               request->errorMessage =
+                                       std::string("stolen session: ") + 
std::to_string(request->sessionID);
+                               it = updatePreparedStatementRequests_.erase(it);
+                               continue;
+                       }
+                       if (request->processing)
+                       {
+                               SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                               if (DsaPointerIsValid(session->errorMessage))
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       request->errorMessage = 
static_cast<const char*>(
+                                               dsa_get_address(area_, 
session->errorMessage));
+                                       it = 
updatePreparedStatementRequests_.erase(it);
+                               }
+                               else if (session->nUpdatedRecords >= 0)
+                               {
+                                       request->processing = false;
+                                       request->finished = true;
+                                       request->nUpdatedRecords = 
session->nUpdatedRecords;
+                                       it = 
updatePreparedStatementRequests_.erase(it);
+                               }
+                               else
+                               {
+                                       ++it;
+                               }
+                       }
+                       else
+                       {
+                               auto executorPID = session->executorPID;
+                               {
+                                       SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                                       ProcessorLockGuard lock(this);
+                                       
set_shared_string(session->preparedStatementHandle, request->handle);
+                                       session->action = 
Action::UpdatePreparedStatement;
+                                       session->nUpdatedRecords = -1;
+                               }
+                               P("%s: %s: %s: kill executor: %d", Tag, tag_, 
tag, executorPID);
+                               kill(executorPID, SIGUSR1);
+                               request->processing = true;
+                               ++it;
+                       }
+               }
+       }
+
+   public:
        arrow::Result<int64_t> update_prepared_statement(
                uint64_t sessionID,
                const std::string& handle,
@@ -2510,21 +3401,18 @@ class Proxy : public WorkerProcessor {
 #ifdef AFS_VERBOSE
                const char* tag = "update prepared statement";
 #endif
-               auto session = find_session(sessionID);
-               SessionReleaser sessionReleaser(sessions_, session);
-               lock_acquire();
-               set_shared_string(session->preparedStatementHandle, handle);
-               session->action = Action::UpdatePreparedStatement;
-               session->nUpdatedRecords = -1;
-               lock_release();
-               if (session->executorPID != InvalidPid)
+               auto request =
+                       
std::make_shared<UpdatePreparedStatementRequest>(sessionID, handle);
                {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       updatePreparedStatementRequests_.push_back(request);
                }
+               kill(MyProcPid, SIGUSR1);
+               auto session = localSessions_.find(sessionID)->second;
+               auto executorPID = session->peerPID;
                {
                        ARROW_ASSIGN_OR_RAISE(const auto& schema, 
reader->GetSchema());
-                       SharedRingBufferOutputStream output(this, session);
+                       SharedRingBufferOutputStream output(this, 
std::move(session));
                        auto options = arrow::ipc::IpcWriteOptions::Defaults();
                        options.emit_dictionary_deltas = true;
                        ARROW_ASSIGN_OR_RAISE(auto writer,
@@ -2539,103 +3427,258 @@ class Proxy : public WorkerProcessor {
                                
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*(chunk.data)));
                        }
                        ARROW_RETURN_NOT_OK(writer->Close());
+                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
executorPID);
+                       kill(executorPID, SIGUSR1);
                }
-               if (session->executorPID != InvalidPid)
                {
-                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
session->executorPID);
-                       kill(session->executorPID, SIGUSR1);
+                       std::unique_lock<std::mutex> lock(mutex_);
+                       conditionVariable_.wait(lock, [&] {
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
+                       });
+               }
+               if (request->errorMessage.has_value())
+               {
+                       return 
arrow::Status::Invalid(request->errorMessage.value());
                }
+               if (INTERRUPTS_PENDING_CONDITION())
+               {
+                       return arrow::Status::Invalid("interrupted");
+               }
+               P("%s: %s: %s: done: %" PRIu64, Tag, tag_, tag, 
request->nUpdatedRecords);
+               return request->nUpdatedRecords;
+       }
+
+   private:
+       struct ReadRequest {
+               ReadRequest(LocalSessionData* session,
+                           SharedRingBuffer* buffer,
+                           size_t n,
+                           void* output)
+                       : session(session),
+                         buffer(buffer),
+                         n(n),
+                         output(output),
+                         finished(false),
+                         readBytes(0)
+               {
+               }
+               LocalSessionData* session;
+               SharedRingBuffer* buffer;
+               size_t n;
+               void* output;
+               bool finished;
+               size_t readBytes;
+       };
+
+       std::list<std::shared_ptr<ReadRequest>> readRequests_;
+
+       // Can use PostgreSQL API.
+       void process_read_requests()
+       {
+#ifdef AFS_VERBOSE
+               const char* tag = "read";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto& request : readRequests_)
+               {
+                       request->finished = true;
+                       {
+                               ProcessorLockGuard lock(this);
+                               request->readBytes = 
request->buffer->read(request->n, request->output);
+                       }
+                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
request->session->peerPID);
+                       kill(request->session->peerPID, SIGUSR1);
+               }
+               readRequests_.clear();
+       }
+
+   public:
+       // Don't use PostgreSQL API.
+       arrow::Result<size_t> read(LocalSessionData* session,
+                                  SharedRingBuffer* buffer,
+                                  size_t n,
+                                  void* output) override
+       {
+               auto request = std::make_shared<ReadRequest>(session, buffer, 
n, output);
+               {
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       readRequests_.push_back(request);
+               }
+               kill(MyProcPid, SIGUSR1);
                {
                        std::unique_lock<std::mutex> lock(mutex_);
                        conditionVariable_.wait(lock, [&] {
-                               P("%s: %s: %s: wait", Tag, tag_, tag);
-                               return DsaPointerIsValid(session->errorMessage) 
||
-                                      session->nUpdatedRecords >= 0;
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
                        });
                }
-               if (DsaPointerIsValid(session->errorMessage))
+               if (INTERRUPTS_PENDING_CONDITION())
                {
-                       return report_session_error(session);
+                       return arrow::Status::IOError("interrupted");
                }
-               P("%s: %s: %s: done: %ld", Tag, tag_, tag, 
session->nUpdatedRecords);
-               return session->nUpdatedRecords;
+               return request->readBytes;
        }
 
-   protected:
-       pid_t peer_pid(SessionData* session) override { return 
session->executorPID; }
+   private:
+       struct WriteRequest {
+               WriteRequest(LocalSessionData* session,
+                            SharedRingBuffer* buffer,
+                            const void* data,
+                            size_t n)
+                       : session(session),
+                         buffer(buffer),
+                         data(data),
+                         n(n),
+                         finished(false),
+                         writtenBytes(0),
+                         errorMessage(std::nullopt)
+               {
+               }
+               LocalSessionData* session;
+               SharedRingBuffer* buffer;
+               const void* data;
+               size_t n;
+               bool finished;
+               size_t writtenBytes;
+               std::optional<std::string> errorMessage;
+       };
 
-       const char* peer_name(SessionData* session) override { return 
"executor"; }
+       std::list<std::shared_ptr<WriteRequest>> writeRequests_;
 
-   private:
-       SessionData* create_session(const std::string& databaseName,
-                                   const std::string& userName,
-                                   const std::string& password,
-                                   const std::string& clientAddress)
+       // Can use PostgreSQL API.
+       void process_write_requests()
        {
-               lock_acquire();
-               uint64_t id = 0;
-               SessionData* session = nullptr;
-               do
+#ifdef AFS_VERBOSE
+               const char* tag = "write";
+#endif
+               std::lock_guard<std::mutex> lock(mutex_);
+               for (auto& request : writeRequests_)
                {
-                       id = randomEngine_();
-                       if (id == 0)
+                       auto session = static_cast<SharedSessionData*>(
+                               dshash_find(sessions_, &(request->session->id), 
false));
+                       if (!session)
                        {
+                               request->finished = true;
+                               request->errorMessage = std::string("stolen 
session: ") +
+                                                       
std::to_string(request->session->id);
                                continue;
                        }
-                       bool found = false;
-                       session =
-                               
static_cast<SessionData*>(dshash_find_or_insert(sessions_, &id, &found));
-                       if (!found)
                        {
-                               break;
+                               SharedSessionReleaser 
sessionReleaser(sessions_, session);
+                               if (DsaPointerIsValid(session->errorMessage))
+                               {
+                                       request->errorMessage = 
static_cast<const char*>(
+                                               dsa_get_address(area_, 
session->errorMessage));
+                               }
+                               else
+                               {
+                                       ProcessorLockGuard lock(this);
+                                       request->writtenBytes =
+                                               
request->buffer->write(request->data, request->n);
+                               }
+                               request->finished = true;
                        }
-               } while (true);
-               session_data_initialize(
-                       session, area_, databaseName, userName, password, 
clientAddress);
-               lock_release();
-               return session;
+                       P("%s: %s: %s: kill executor: %d", Tag, tag_, tag, 
request->session->peerPID);
+                       kill(request->session->peerPID, SIGUSR1);
+               }
+               writeRequests_.clear();
        }
 
-       SessionData* find_session(uint64_t sessionID)
+   public:
+       // Don't use PostgreSQL API.
+       arrow::Result<size_t> write(LocalSessionData* session,
+                                   SharedRingBuffer* buffer,
+                                   const void* data,
+                                   size_t n) override
        {
-               return static_cast<SessionData*>(dshash_find(sessions_, 
&sessionID, false));
+               auto request = std::make_shared<WriteRequest>(session, buffer, 
data, n);
+               {
+                       std::lock_guard<std::mutex> lock(mutex_);
+                       writeRequests_.push_back(request);
+               }
+               kill(MyProcPid, SIGUSR1);
+               {
+                       std::unique_lock<std::mutex> lock(mutex_);
+                       conditionVariable_.wait(lock, [&] {
+                               if (INTERRUPTS_PENDING_CONDITION())
+                               {
+                                       return true;
+                               }
+                               return request->finished;
+                       });
+               }
+               if (request->errorMessage.has_value())
+               {
+                       return 
arrow::Status::IOError(request->errorMessage.value());
+               }
+               if (INTERRUPTS_PENDING_CONDITION())
+               {
+                       return arrow::Status::IOError("interrupted");
+               }
+               return request->writtenBytes;
        }
 
-       arrow::Status report_session_error(SessionData* session)
+   private:
+       // Can use PostgreSQL API.
+       void delete_finished_sessions()
        {
-               auto status = arrow::Status::Invalid(
-                       static_cast<const char*>(dsa_get_address(area_, 
session->errorMessage)));
-               P("%s: %s: %s: kill SIGTERM executor: %d",
-                 Tag,
-                 tag_,
-                 AFS_FUNC,
-                 session->executorPID);
-               kill(session->executorPID, SIGTERM);
-               return status;
+               dshash_seq_status sessionsStatus;
+               dshash_seq_init(&sessionsStatus, sessions_, false);
+               SharedSessionData* session;
+               while (
+                       (session = 
static_cast<SharedSessionData*>(dshash_seq_next(&sessionsStatus))))
+               {
+                       if (!session->finished)
+                       {
+                               continue;
+                       }
+                       {
+                               std::lock_guard<std::mutex> lock(mutex_);
+                               
localSessions_.erase(localSessions_.find(session->id));
+                       }
+                       shared_session_data_finalize(session, area_);
+                       dshash_delete_current(&sessionsStatus);
+               }
+               dshash_seq_term(&sessionsStatus);
        }
 
-       arrow::Result<std::shared_ptr<arrow::Schema>> read_schema(SessionData* 
session,
-                                                                 const char* 
tag)
+   protected:
+       pid_t peer_pid(LocalSessionData* session) override { return 
session->peerPID; }
+
+       const char* peer_name() override { return "executor"; }
+
+       arrow::Status wait_internal(LocalSessionData* session,
+                                   SharedRingBuffer* buffer,
+                                   WaitMode mode,
+                                   const char* tag) override
        {
-               auto input = 
std::make_shared<SharedRingBufferInputStream>(this, session);
-               // Read schema only stream format data.
-               ARROW_ASSIGN_OR_RAISE(auto reader,
-                                     
arrow::ipc::RecordBatchStreamReader::Open(input));
-               while (true)
-               {
-                       std::shared_ptr<arrow::RecordBatch> recordBatch;
-                       P("%s: %s: %s: read next", Tag, tag_, tag);
-                       ARROW_RETURN_NOT_OK(reader->ReadNext(&recordBatch));
-                       if (!recordBatch)
+               std::unique_lock<std::mutex> lock(mutex_);
+               conditionVariable_.wait(lock, [&] {
+                       P("%s: %s: %s: %s: wait: %" PRIsize,
+                         Tag,
+                         tag_,
+                         tag,
+                         peer_name(),
+                         get_waiting_buffer_size(buffer, mode));
+                       if (INTERRUPTS_PENDING_CONDITION())
                        {
-                               break;
+                               return true;
                        }
+                       return get_waiting_buffer_size(buffer, mode) > 0;
+               });
+               if (INTERRUPTS_PENDING_CONDITION())
+               {
+                       return arrow::Status::Invalid(tag_, ": ", tag, ": ", 
"interrupted");
                }
-               return reader->schema();
+               return arrow::Status::OK();
        }
-
-       std::random_device randomSeed_;
-       std::mt19937_64 randomEngine_;
 };
 
 arrow::Result<int64_t>
@@ -2646,13 +3689,30 @@ SharedRingBufferInputStream::Read(int64_t nBytes, void* 
out)
                return arrow::Status::IOError(std::string(Tag) + ": " + 
processor_->tag() +
                                              ": SharedRingBufferInputStream is 
closed");
        }
-       auto buffer = processor_->create_shared_ring_buffer(session_);
        size_t rest = static_cast<size_t>(nBytes);
+       SharedRingBuffer buffer(
+               localSession_->bufferData, localSession_->bufferAddress, 
processor_->tag());
        while (true)
        {
-               processor_->lock_acquire();
-               auto readBytes = buffer.read(rest, out);
-               processor_->lock_release();
+               ARROW_ASSIGN_OR_RAISE(auto readBytes,
+                                     processor_->read(localSession_.get(), 
&buffer, rest, out));
+               if (INTERRUPTS_PENDING_CONDITION())
+               {
+                       return arrow::Status::IOError(std::string(Tag) + ": " + 
processor_->tag() +
+                                                     ": interrupted");
+               }
+
+               if (readBytes == 0)
+               {
+                       ARROW_RETURN_NOT_OK(processor_->wait(
+                               localSession_.get(), &buffer, 
Processor::WaitMode::Written));
+                       if (INTERRUPTS_PENDING_CONDITION())
+                       {
+                               return arrow::Status::IOError(std::string(Tag) 
+ ": " +
+                                                             processor_->tag() 
+ ": interrupted");
+                       }
+                       continue;
+               }
 
                position_ += readBytes;
                rest -= readBytes;
@@ -2661,21 +3721,13 @@ SharedRingBufferInputStream::Read(int64_t nBytes, void* 
out)
                {
                        break;
                }
-
-               ARROW_RETURN_NOT_OK(
-                       processor_->wait(session_, &buffer, 
Processor::WaitMode::Written));
-               if (INTERRUPTS_PENDING_CONDITION())
-               {
-                       return arrow::Status::IOError(std::string(Tag) + ": " + 
processor_->tag() +
-                                                     ": interrupted");
-               }
        }
        return nBytes;
 }
 
 class MainProcessor : public Processor {
    public:
-       MainProcessor() : Processor("main", true), sessions_(nullptr)
+       MainProcessor() : Processor("main"), sessions_(nullptr), 
executorHandles_()
        {
                LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
                bool found;
@@ -2726,18 +3778,47 @@ class MainProcessor : public Processor {
                return handle;
        }
 
-       void process_connect_requests()
+       void signaled() { update_sessions(); }
+
+   private:
+       void update_sessions()
        {
+               const char* tag = "update sessions";
+               P("%s: %s: %s: start", Tag, tag_, tag);
                dshash_seq_status sessionsStatus;
                dshash_seq_init(&sessionsStatus, sessions_, false);
-               SessionData* session;
-               while ((session = 
static_cast<SessionData*>(dshash_seq_next(&sessionsStatus))))
+               while (true)
                {
-                       if (session->initialized)
+                       auto session =
+                               
static_cast<SharedSessionData*>(dshash_seq_next(&sessionsStatus));
+                       if (!session)
+                       {
+                               break;
+                       }
+
+                       if (session->started)
                        {
+                               if (!session->finished)
+                               {
+                                       try
+                                       {
+                                               auto handle = 
executorHandles_.at(session->id);
+                                               pid_t pid;
+                                               if 
(GetBackgroundWorkerPid(handle, &pid) == BGWH_STOPPED)
+                                               {
+                                                       // A finished session
+                                                       session->finished = 
true;
+                                                       
executorHandles_.erase(session->id);
+                                               }
+                                       } catch (const std::exception& 
exception)
+                                       {
+                                               session->finished = true;
+                                       }
+                               }
                                continue;
                        }
 
+                       // A new session request
                        BackgroundWorker worker = {};
                        snprintf(
                                worker.bgw_name, BGW_MAXLEN, "%s: executor: %" 
PRIu64, Tag, session->id);
@@ -2752,24 +3833,34 @@ class MainProcessor : public Processor {
                        worker.bgw_main_arg = Int64GetDatum(session->id);
                        worker.bgw_notify_pid = MyProcPid;
                        BackgroundWorkerHandle* handle;
+                       P("%s: %s: %s: register executor: %" PRIu64, Tag, tag_, 
tag, session->id);
+                       session->started = true;
                        if (RegisterDynamicBackgroundWorker(&worker, &handle))
                        {
+                               executorHandles_[session->id] = handle;
                                WaitForBackgroundWorkerStartup(handle, 
&(session->executorPID));
+                               P("%s: %s: %s: started executor: %" PRIu64, 
Tag, tag_, tag, session->id);
                        }
                        else
                        {
+                               P("%s: %s: %s: failed to start executor: %" 
PRIu64,
+                                 Tag,
+                                 tag_,
+                                 tag,
+                                 session->id);
                                set_shared_string(
                                        session->errorMessage,
-                                       std::string(Tag) + ": " + tag_ +
+                                       std::string(Tag) + ": " + tag_ + ": " + 
tag +
                                                ": failed to start executor: " 
+ std::to_string(session->id));
                        }
                }
                dshash_seq_term(&sessionsStatus);
+               P("%s: %s: %s: end", Tag, tag_, tag);
                kill(sharedData_->serverPID, SIGUSR1);
        }
 
-   private:
        dshash_table* sessions_;
+       std::map<uint64_t, BackgroundWorkerHandle*> executorHandles_;
 };
 
 class HeaderAuthServerMiddleware : public arrow::flight::ServerMiddleware {
@@ -2896,6 +3987,8 @@ class HeaderAuthServerMiddlewareFactory : public 
arrow::flight::ServerMiddleware
        Proxy* proxy_;
 };
 
+// Flight SQL server doesn't communicate an "executor" process
+// directly. The Proxy object communicates an "executor" process.
 class FlightSQLServer : public arrow::flight::sql::FlightSqlServerBase {
    public:
        explicit FlightSQLServer(Proxy* proxy)
@@ -2928,7 +4021,7 @@ class FlightSQLServer : public 
arrow::flight::sql::FlightSqlServerBase {
                const arrow::flight::sql::StatementQueryTicket& command) 
override
        {
                ARROW_ASSIGN_OR_RAISE(auto sessionID, session_id(context));
-               ARROW_ASSIGN_OR_RAISE(auto reader, proxy_->read(sessionID));
+               ARROW_ASSIGN_OR_RAISE(auto reader, 
proxy_->open_reader(sessionID));
                return 
std::make_unique<arrow::flight::RecordBatchStream>(reader);
        }
 
@@ -3067,8 +4160,14 @@ afs_server_internal(Proxy* proxy)
                        ProcessConfigFile(PGC_SIGHUP);
                }
 
-               if (GotSIGUSR1)
+               while (GotSIGUSR1)
                {
+                       // SIGUSR1 is emitted by:
+                       // * The "main" process notifies a new "executor" 
process is spawned
+                       // * The "main" process notifies a "executor" process 
is finished
+                       //   (may be crashed)
+                       // * The "executor" process requests a read/write
+                       // * The "executor" process notifies a request is 
finished
                        GotSIGUSR1 = false;
                        proxy->signaled();
                }
@@ -3131,8 +4230,10 @@ afs_executor(Datum arg)
                        ProcessConfigFile(PGC_SIGHUP);
                }
 
-               if (GotSIGUSR1)
+               while (GotSIGUSR1)
                {
+                       // SIGUSR1 is emitted by:
+                       // * The "server" process requests a new request
                        GotSIGUSR1 = false;
                        executor->signaled();
                }
@@ -3194,10 +4295,14 @@ afs_main(Datum arg)
                                ProcessConfigFile(PGC_SIGHUP);
                        }
 
-                       if (GotSIGUSR1)
+                       while (GotSIGUSR1)
                        {
+                               // SIGUSR1 is emitted by:
+                               // * The "server" process is exited (may be 
crashed)
+                               // * The "executor" process is exited (may be 
crashed)
+                               // * The "server" process requests a new session
                                GotSIGUSR1 = false;
-                               processor.process_connect_requests();
+                               processor.signaled();
                        }
 
                        CHECK_FOR_INTERRUPTS();
@@ -3254,7 +4359,7 @@ _PG_init(void)
                                NULL,
                                NULL);
 
-#ifdef PGRN_HAVE_SHMEM_REQUEST_HOOK
+#ifdef AFS_HAVE_SHMEM_REQUEST_HOOK
        PreviousShmemRequestHook = shmem_request_hook;
        shmem_request_hook = afs_shmem_request_hook;
 #else
diff --git a/test/helper/sandbox.rb b/test/helper/sandbox.rb
index 3459d5f..bbbf9b9 100644
--- a/test/helper/sandbox.rb
+++ b/test/helper/sandbox.rb
@@ -161,7 +161,7 @@ module Helper
         end
         conf.puts("logging_collector = on")
         conf.puts("log_filename = '#{@log_base_name}'")
-        conf.puts("log_min_messages = debug5") if ENV["AFS_DEBUG"] == "yes"
+        conf.puts("log_min_messages = debug5") if ENV["AFS_VERBOSE"] == "yes"
         conf.puts("shared_preload_libraries = " +
                   "'#{shared_preload_libraries.join(",")}'")
         conf.puts("arrow_flight_sql.uri = '#{@flight_sql_uri}'")

Reply via email to