This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/arrow-flight-sql-postgresql.git
The following commit(s) were added to refs/heads/main by this push:
new 14df9b5 Don't use PostgreSQL API in gRPC threads on server process
(#167)
14df9b5 is described below
commit 14df9b5fe61eda2d71bdfbf67c61a227741f616c
Author: Sutou Kouhei <[email protected]>
AuthorDate: Mon Nov 13 18:15:02 2023 +0900
Don't use PostgreSQL API in gRPC threads on server process (#167)
Closes GH-160
We must use the main thread to run PostgreSQL API because PostgreSQL API
isn't thread safe.
---
src/afs.cc | 2229 ++++++++++++++++++++++++++++++++++++------------
test/helper/sandbox.rb | 2 +-
2 files changed, 1668 insertions(+), 563 deletions(-)
diff --git a/src/afs.cc b/src/afs.cc
index 1c792bc..23940b9 100644
--- a/src/afs.cc
+++ b/src/afs.cc
@@ -47,7 +47,7 @@ extern "C"
#undef Abs
#if PG_VERSION_NUM >= 150000
-# define PGRN_HAVE_SHMEM_REQUEST_HOOK
+# define AFS_HAVE_SHMEM_REQUEST_HOOK
#endif
#include <arrow/buffer.h>
@@ -59,6 +59,7 @@ extern "C"
#include <arrow/ipc/writer.h>
#include <arrow/table_builder.h>
#include <arrow/util/base64.h>
+#include <arrow/util/string.h>
#include <cinttypes>
#include <condition_variable>
@@ -77,6 +78,12 @@ extern "C"
# define AFS_FUNC __func__
#endif
+#ifdef _WIN32
+# define PRIsize "Iu"
+#else
+# define PRIsize "zu"
+#endif
+
// #define AFS_VERBOSE
#ifdef AFS_VERBOSE
# define P(...) ereport(DEBUG5, errmsg_internal(__VA_ARGS__))
@@ -147,14 +154,14 @@ afs_sigsegv(SIGNAL_ARGS)
}
#endif
-#ifdef PGRN_HAVE_SHMEM_REQUEST_HOOK
+#ifdef AFS_HAVE_SHMEM_REQUEST_HOOK
static shmem_request_hook_type PreviousShmemRequestHook = nullptr;
#endif
static const char* LWLockTrancheName = "arrow-flight-sql: lwlock tranche";
void
afs_shmem_request_hook(void)
{
-#ifdef PGRN_HAVE_SHMEM_REQUEST_HOOK
+#ifdef AFS_HAVE_SHMEM_REQUEST_HOOK
if (PreviousShmemRequestHook)
PreviousShmemRequestHook();
#endif
@@ -204,6 +211,8 @@ struct SharedRingBufferData {
};
// Naive ring buffer implementation. We can improve this later.
+//
+// We can't call PostgreSQL API in this class.
class SharedRingBuffer {
public:
static void initialize_data(SharedRingBufferData* data)
@@ -214,6 +223,14 @@ class SharedRingBuffer {
data->tail = 0;
}
+ static void allocate_data(SharedRingBufferData* data, dsa_area* area,
size_t total)
+ {
+ data->pointer = dsa_allocate(area, total);
+ data->total = total;
+ data->head = 0;
+ data->tail = 0;
+ }
+
static void free_data(SharedRingBufferData* data, dsa_area* area)
{
if (data->pointer != InvalidDsaPointer)
@@ -221,21 +238,11 @@ class SharedRingBuffer {
initialize_data(data);
}
- SharedRingBuffer(SharedRingBufferData* data, dsa_area* area)
- : data_(data), area_(area)
- {
- }
-
- void allocate(size_t total)
+ SharedRingBuffer(SharedRingBufferData* data, void* address, const char*
tag)
+ : data_(data), address_(static_cast<uint8_t*>(address)),
tag_(tag)
{
- data_->pointer = dsa_allocate(area_, total);
- data_->total = total;
- data_->head = 0;
- data_->tail = 0;
}
- void free() { free_data(data_, area_); }
-
size_t size() const
{
if (data_->head <= data_->tail)
@@ -250,14 +257,25 @@ class SharedRingBuffer {
size_t rest_size() const { return data_->total - size() - 1; }
+ // Need LWLockAcquire() by caller.
size_t write(const void* data, size_t n)
{
- P("%s: %s: before: (%d:%d) %d", Tag, AFS_FUNC, data_->head,
data_->tail, n);
+#ifdef AFS_VERBOSE
+ const char* tag = "write";
+#endif
+ P("%s: %s: %s: before: (%" PRIsize ":%" PRIsize ") %" PRIsize,
+ Tag,
+ tag_,
+ tag,
+ data_->head,
+ data_->tail,
+ n);
if (rest_size() == 0)
{
- P("%s: %s: after: no space: (%d:%d) %d:0",
+ P("%s: %s: %s: after: no space: (%" PRIsize ":%"
PRIsize ") %" PRIsize ":0",
Tag,
- AFS_FUNC,
+ tag_,
+ tag,
data_->head,
data_->tail,
n);
@@ -265,7 +283,7 @@ class SharedRingBuffer {
}
size_t writtenSize = 0;
- auto output = address();
+ auto output = address_;
if (data_->head <= data_->tail)
{
auto restSize = data_->total - data_->tail;
@@ -274,9 +292,11 @@ class SharedRingBuffer {
restSize--;
}
const auto firstHalfWriteSize = std::min(n, restSize);
- P("%s: %s: first half: (%d:%d) %d:%d",
+ P("%s: %s: %s: first half: (%" PRIsize ":%" PRIsize ")
%" PRIsize
+ ":%" PRIsize,
Tag,
- AFS_FUNC,
+ tag_,
+ tag,
data_->head,
data_->tail,
n,
@@ -289,9 +309,10 @@ class SharedRingBuffer {
if (n > 0 && rest_size() > 0)
{
const auto lastHalfWriteSize = std::min(n, data_->head
- data_->tail - 1);
- P("%s: %s: last half: (%d:%d) %d:%d",
+ P("%s: %s: %s: last half: (%" PRIsize ":%" PRIsize ")
%" PRIsize ":%" PRIsize,
Tag,
- AFS_FUNC,
+ tag_,
+ tag,
data_->head,
data_->tail,
n,
@@ -303,27 +324,43 @@ class SharedRingBuffer {
n -= lastHalfWriteSize;
writtenSize += lastHalfWriteSize;
}
- P("%s: %s: after: (%d:%d) %d:%d",
+ P("%s: %s: %s: after: (%" PRIsize ":%" PRIsize ") %" PRIsize
":%" PRIsize
+ " [%s] [%s]",
Tag,
- AFS_FUNC,
+ tag_,
+ tag,
data_->head,
data_->tail,
n,
- writtenSize);
+ writtenSize,
+ arrow::HexEncode(reinterpret_cast<const uint8_t*>(data),
writtenSize).c_str(),
+ arrow::HexEncode(reinterpret_cast<uint8_t*>(output),
writtenSize).c_str());
return writtenSize;
}
+ // Need LWLockAcquire() by caller.
size_t read(size_t n, void* output)
{
- P("%s: %s: before: (%d:%d) %d", Tag, AFS_FUNC, data_->head,
data_->tail, n);
+#ifdef AFS_VERBOSE
+ const char* tag = "read";
+#endif
+ P("%s: %s: %s: before: (%" PRIsize ":%" PRIsize ") %" PRIsize,
+ Tag,
+ tag_,
+ tag,
+ data_->head,
+ data_->tail,
+ n);
size_t readSize = 0;
- const auto input = address();
+ const auto input = address_;
if (data_->head > data_->tail)
{
const auto firstHalfReadSize = std::min(n, data_->total
- data_->head);
- P("%s: %s: first half: (%d:%d) %d:%d",
+ P("%s: %s: %s: first half: (%" PRIsize ":%" PRIsize ")
%" PRIsize
+ ":%" PRIsize,
Tag,
- AFS_FUNC,
+ tag_,
+ tag,
data_->head,
data_->tail,
n,
@@ -336,9 +373,10 @@ class SharedRingBuffer {
if (n > 0 && data_->head != data_->tail)
{
const auto lastHalfReadSize = std::min(n, data_->tail -
data_->head);
- P("%s: %s: last half: (%d:%d) %d:%d",
+ P("%s: %s: %s: last half: (%" PRIsize ":%" PRIsize ")
%" PRIsize ":%" PRIsize,
Tag,
- AFS_FUNC,
+ tag_,
+ tag,
data_->head,
data_->tail,
n,
@@ -350,24 +388,24 @@ class SharedRingBuffer {
n -= lastHalfReadSize;
readSize += lastHalfReadSize;
}
- P("%s: %s: after: (%d:%d) %d:%d",
+ P("%s: %s: %s: after: (%" PRIsize ":%" PRIsize ") %" PRIsize
":%" PRIsize
+ " [%s] [%s]",
Tag,
- AFS_FUNC,
+ tag_,
+ tag,
data_->head,
data_->tail,
n,
- readSize);
+ readSize,
+ arrow::HexEncode(reinterpret_cast<const uint8_t*>(output),
readSize).c_str(),
+ arrow::HexEncode(reinterpret_cast<const uint8_t*>(input),
readSize).c_str());
return readSize;
}
private:
SharedRingBufferData* data_;
- dsa_area* area_;
-
- uint8_t* address()
- {
- return static_cast<uint8_t*>(dsa_get_address(area_,
data_->pointer));
- }
+ uint8_t* address_;
+ const char* tag_;
};
enum class Action
@@ -424,12 +462,38 @@ dsa_pointer_set_string(dsa_pointer& pointer, dsa_area*
area, const std::string&
memcpy(dsa_get_address(area, pointer), input.c_str(), input.size() + 1);
}
-// Put only data (don't add methods) to use with dshash.
-struct SessionData {
+// Session data used in only one process.
+//
+// They are based on data in SharedSessionData and process local data.
+struct LocalSessionData {
+ LocalSessionData()
+ : id(0),
+ valid(false),
+ peerPID(InvalidPid),
+ bufferData(nullptr),
+ bufferAddress(nullptr)
+ {
+ }
+
+ uint64_t id;
+ bool valid;
+ pid_t peerPID;
+ SharedRingBufferData* bufferData;
+ void* bufferAddress;
+};
+
+// Session data shared with multiple processes. LWLockAcquire() with
+// Processor's lock_ is needed when we change this data.
+//
+// This struct must have only data (don't add methods) to use with
+// dshash.
+struct SharedSessionData {
uint64_t id;
dsa_pointer errorMessage;
pid_t executorPID;
+ bool started;
bool initialized;
+ bool finished;
dsa_pointer databaseName;
dsa_pointer userName;
dsa_pointer password;
@@ -440,20 +504,23 @@ struct SessionData {
int64_t nUpdatedRecords;
dsa_pointer prepareQuery;
dsa_pointer preparedStatementHandle;
+ bool setParametersFinished;
SharedRingBufferData bufferData;
};
void
-session_data_initialize(SessionData* session,
- dsa_area* area,
- const std::string& databaseName,
- const std::string& userName,
- const std::string& password,
- const std::string& clientAddress)
+shared_session_data_initialize(SharedSessionData* session,
+ dsa_area* area,
+ const std::string& databaseName,
+ const std::string& userName,
+ const std::string& password,
+ const std::string& clientAddress)
{
session->errorMessage = InvalidDsaPointer;
session->executorPID = InvalidPid;
+ session->started = false;
session->initialized = false;
+ session->finished = false;
dsa_pointer_set_string(session->databaseName, area, databaseName);
dsa_pointer_set_string(session->userName, area, userName);
dsa_pointer_set_string(session->password, area, password);
@@ -464,11 +531,12 @@ session_data_initialize(SessionData* session,
session->nUpdatedRecords = -1;
session->prepareQuery = InvalidDsaPointer;
session->preparedStatementHandle = InvalidDsaPointer;
+ session->setParametersFinished = false;
SharedRingBuffer::initialize_data(&(session->bufferData));
}
void
-session_data_finalize(SessionData* session, dsa_area* area)
+shared_session_data_finalize(SharedSessionData* session, dsa_area* area)
{
if (DsaPointerIsValid(session->errorMessage))
dsa_free(area, session->errorMessage);
@@ -489,23 +557,23 @@ session_data_finalize(SessionData* session, dsa_area*
area)
SharedRingBuffer::free_data(&(session->bufferData), area);
}
-class SessionReleaser {
+class SharedSessionReleaser {
public:
- explicit SessionReleaser(dshash_table* sessions, SessionData* data)
+ explicit SharedSessionReleaser(dshash_table* sessions,
SharedSessionData* data)
: sessions_(sessions), data_(data)
{
}
- ~SessionReleaser() { dshash_release_lock(sessions_, data_); }
+ ~SharedSessionReleaser() { dshash_release_lock(sessions_, data_); }
private:
dshash_table* sessions_;
- SessionData* data_;
+ SharedSessionData* data_;
};
static dshash_parameters SessionsParams = {
sizeof(uint64_t),
- sizeof(SessionData),
+ sizeof(SharedSessionData),
dshash_memcmp,
dshash_memhash,
0, // Set later because this is determined dynamically.
@@ -528,14 +596,7 @@ class Processor {
Written,
};
- Processor(const char* tag, bool runInPGThread)
- : tag_(tag),
- runInPGThread_(runInPGThread),
- sharedData_(nullptr),
- area_(nullptr),
- lock_(),
- mutex_(),
- conditionVariable_()
+ Processor(const char* tag) : tag_(tag), sharedData_(nullptr),
area_(nullptr), lock_()
{
}
@@ -553,87 +614,39 @@ class Processor {
void lock_release() { LWLockRelease(lock_); }
- SharedRingBuffer create_shared_ring_buffer(SessionData* session)
+ // Executor and Server must implement this.
+ virtual arrow::Result<size_t> read(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ size_t n,
+ void* output)
+ {
+ return 0;
+ }
+
+ // Executor and Server must implement this.
+ virtual arrow::Result<size_t> write(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ const void* data,
+ size_t n)
{
- return SharedRingBuffer(&(session->bufferData), area_);
+ return 0;
}
- arrow::Status wait(SessionData* session, SharedRingBuffer* buffer,
WaitMode mode)
+ arrow::Status wait(LocalSessionData* session, SharedRingBuffer* buffer,
WaitMode mode)
{
- const bool read = (mode == WaitMode::Read);
- const char* tag = read ? "wait read" : "wait written";
+ const char* tag = (mode == WaitMode::Read) ? "wait read" :
"wait written";
auto peerPID = peer_pid(session);
- auto peerName = peer_name(session);
if (ARROW_PREDICT_FALSE(peerPID == InvalidPid))
{
return arrow::Status::IOError(
- Tag, ": ", tag_, ": ", tag, ": ", peerName, ":
not alive");
+ Tag, ": ", tag_, ": ", tag, ": ", peer_name(),
": not alive");
}
- P("%s: %s: %s: %s: kill: %d", Tag, tag_, tag, peerName,
peerPID);
+ P("%s: %s: %s: %s: kill: %d", Tag, tag_, tag, peer_name(),
peerPID);
kill(peerPID, SIGUSR1);
- auto get_target_size =
- read ? [](SharedRingBuffer* buffer) { return
buffer->rest_size(); }
- : [](SharedRingBuffer* buffer) { return
buffer->size(); };
- if (runInPGThread_)
- {
- while (true)
- {
- int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
- WaitLatch(MyLatch, events, -1,
PG_WAIT_EXTENSION);
- if (GotSIGTERM)
- {
- break;
- }
- ResetLatch(MyLatch);
-
- if (GotSIGUSR1)
- {
- GotSIGUSR1 = false;
- P("%s: %s: %s: %s: wait: %d:%d",
- Tag,
- tag_,
- tag,
- peerName,
- get_target_size(buffer),
- targetSize);
- if (get_target_size(buffer) > 0)
- {
- break;
- }
- }
-
- // TODO: Convert PG error to arrow::Status.
- CHECK_FOR_INTERRUPTS();
- }
- }
- else
- {
- std::unique_lock<std::mutex> lock(mutex_);
- conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: %s: wait: %d:%d",
- Tag,
- tag_,
- tag,
- peerName,
- get_target_size(buffer),
- targetSize);
- if (INTERRUPTS_PENDING_CONDITION())
- {
- return true;
- }
- return get_target_size(buffer) > 0;
- });
- }
- return arrow::Status::OK();
- }
- virtual void signaled()
- {
- P("%s: %s: signaled: before", Tag, tag_);
- conditionVariable_.notify_all();
- P("%s: %s: signaled: after", Tag, tag_);
+ return wait_internal(session, buffer, mode, tag);
}
protected:
@@ -642,17 +655,34 @@ class Processor {
dsa_pointer_set_string(pointer, area_, input);
}
- virtual pid_t peer_pid(SessionData* session) { return InvalidPid; }
+ virtual pid_t peer_pid(LocalSessionData* session) { return InvalidPid; }
+
+ virtual const char* peer_name() { return "unknown"; }
+
+ virtual arrow::Status wait_internal(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ WaitMode mode,
+ const char* tag)
+ {
+ return arrow::Status::NotImplemented("wait_internal");
+ }
- virtual const char* peer_name(SessionData* session) { return "unknown";
}
+ size_t get_waiting_buffer_size(SharedRingBuffer* buffer, WaitMode mode)
+ {
+ if (mode == WaitMode::Read)
+ {
+ return buffer->rest_size();
+ }
+ else
+ {
+ return buffer->size();
+ }
+ }
const char* tag_;
- bool runInPGThread_;
SharedData* sharedData_;
dsa_area* area_;
LWLock* lock_;
- std::mutex mutex_;
- std::condition_variable conditionVariable_;
};
struct ProcessorLockGuard {
@@ -666,13 +696,13 @@ struct ProcessorLockGuard {
Processor* processor_;
};
-class Proxy;
class SharedRingBufferInputStream : public arrow::io::InputStream {
public:
- SharedRingBufferInputStream(Processor* processor, SessionData* session)
+ SharedRingBufferInputStream(Processor* processor,
+ std::shared_ptr<LocalSessionData>
localSession)
: arrow::io::InputStream(),
processor_(processor),
- session_(session),
+ localSession_(std::move(localSession)),
position_(0),
is_open_(true)
{
@@ -701,18 +731,18 @@ class SharedRingBufferInputStream : public
arrow::io::InputStream {
private:
Processor* processor_;
- SessionData* session_;
+ std::shared_ptr<LocalSessionData> localSession_;
int64_t position_;
bool is_open_;
};
-class Executor;
class SharedRingBufferOutputStream : public arrow::io::OutputStream {
public:
- SharedRingBufferOutputStream(Processor* processor, SessionData* session)
+ SharedRingBufferOutputStream(Processor* processor,
+ std::shared_ptr<LocalSessionData>
localSession)
: arrow::io::OutputStream(),
processor_(processor),
- session_(session),
+ localSession_(std::move(localSession)),
position_(0),
is_open_(true)
{
@@ -734,15 +764,14 @@ class SharedRingBufferOutputStream : public
arrow::io::OutputStream {
private:
Processor* processor_;
- SessionData* session_;
+ std::shared_ptr<LocalSessionData> localSession_;
int64_t position_;
bool is_open_;
};
class WorkerProcessor : public Processor {
public:
- explicit WorkerProcessor(const char* tag, bool runInPGThread)
- : Processor(tag, runInPGThread), sessions_(nullptr)
+ explicit WorkerProcessor(const char* tag) : Processor(tag),
sessions_(nullptr)
{
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
bool found;
@@ -765,13 +794,6 @@ class WorkerProcessor : public Processor {
~WorkerProcessor() override { dshash_detach(sessions_); }
- protected:
- void delete_session(SessionData* session)
- {
- session_data_finalize(session, area_);
- dshash_delete_entry(sessions_, session);
- }
-
protected:
dshash_table* sessions_;
};
@@ -1426,21 +1448,23 @@ class PreparedStatement {
class Executor : public WorkerProcessor {
public:
explicit Executor(uint64_t sessionID)
- : WorkerProcessor("executor", true),
+ : WorkerProcessor("executor"),
sessionID_(sessionID),
- session_(nullptr),
- connected_(false),
+ localSession_(std::make_shared<LocalSessionData>()),
+ needFinish_(false),
closed_(false),
nextPreparedStatementID_(1),
preparedStatements_()
{
+ localSession_->id = sessionID_;
+ localSession_->peerPID = sharedData_->serverPID;
}
~Executor()
{
if (!closed_)
{
- close_internal(false);
+ close();
}
}
@@ -1450,45 +1474,101 @@ class Executor : public WorkerProcessor {
// pg_usleep(5000000);
// pg_usleep(5000000);
pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
opening").c_str());
- session_ = static_cast<SessionData*>(dshash_find(sessions_,
&sessionID_, false));
- auto databaseName =
- static_cast<const char*>(dsa_get_address(area_,
session_->databaseName));
- auto userName =
- static_cast<const char*>(dsa_get_address(area_,
session_->userName));
- auto password =
- static_cast<const char*>(dsa_get_address(area_,
session_->password));
- auto clientAddress =
- static_cast<const char*>(dsa_get_address(area_,
session_->clientAddress));
- BackgroundWorkerInitializeConnection(databaseName, userName, 0);
- CurrentResourceOwner = ResourceOwnerCreate(nullptr,
"arrow-flight-sql: Executor");
- if (!check_password(databaseName, userName, password,
clientAddress))
- {
- session_->initialized = true;
+ auto session = find_session();
+ PG_TRY();
+ {
+ auto databaseName =
+ static_cast<const char*>(dsa_get_address(area_,
session->databaseName));
+ auto userName =
+ static_cast<const char*>(dsa_get_address(area_,
session->userName));
+ auto password =
+ static_cast<const char*>(dsa_get_address(area_,
session->password));
+ auto clientAddress =
+ static_cast<const char*>(dsa_get_address(area_,
session->clientAddress));
+ BackgroundWorkerInitializeConnection(databaseName,
userName, 0);
+ CurrentResourceOwner =
+ ResourceOwnerCreate(nullptr, "arrow-flight-sql:
Executor");
+ if (check_password(session, databaseName, userName,
password, clientAddress))
+ {
+ // TODO: Customizable.
+ SharedRingBuffer::allocate_data(
+ &(session->bufferData), area_, 1L *
1024L * 1024L);
+
+ SetCurrentStatementStartTimestamp();
+ SPI_connect();
+ needFinish_ = true;
+ }
+ session->initialized = true;
+ localSession_->valid = true;
+ localSession_->bufferData = &(session->bufferData);
+ localSession_->bufferAddress =
+ dsa_get_address(area_,
session->bufferData.pointer);
+ P("%s: %s: open: %" PRIu64, Tag, tag_,
session->bufferData.pointer);
+ dshash_release_lock(sessions_, session);
signal_server(tag);
- return;
}
+ PG_CATCH();
+ {
+ set_error_message(session, "failed to connect", tag);
+ session->initialized = true;
+ dshash_release_lock(sessions_, session);
+ signal_server(tag);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+
+ void close()
+ {
+ closed_ = true;
+ pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
closing").c_str());
+ preparedStatements_.clear();
+ if (needFinish_)
{
- SharedRingBuffer buffer(&(session_->bufferData), area_);
- // TODO: Customizable.
- buffer.allocate(1L * 1024L * 1024L);
+ SPI_finish();
+ needFinish_ = false;
}
- SetCurrentStatementStartTimestamp();
- SPI_connect();
- pgstat_report_activity(STATE_IDLE, NULL);
- session_->initialized = true;
- connected_ = true;
- signal_server(tag);
+ if (CurrentResourceOwner)
+ {
+ auto resourceOwner = CurrentResourceOwner;
+ CurrentResourceOwner = nullptr;
+ ResourceOwnerRelease(
+ resourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS,
false, true);
+ ResourceOwnerRelease(resourceOwner,
RESOURCE_RELEASE_LOCKS, false, true);
+ ResourceOwnerRelease(
+ resourceOwner, RESOURCE_RELEASE_AFTER_LOCKS,
false, true);
+ ResourceOwnerDelete(resourceOwner);
+ }
+ pgstat_report_activity(STATE_IDLE, nullptr);
+ }
+
+ arrow::Result<size_t> read(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ size_t n,
+ void* output) override
+ {
+ ProcessorLockGuard lock(this);
+ return buffer->read(n, output);
}
- void close() { close_internal(true); }
+ arrow::Result<size_t> write(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ const void* data,
+ size_t n) override
+ {
+ ProcessorLockGuard lock(this);
+ return buffer->write(data, n);
+ }
- void signaled() override
+ void signaled()
{
Action action;
{
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_,
session);
ProcessorLockGuard lock(this);
- action = session_->action;
- session_->action = Action::None;
+ action = session->action;
+ session->action = Action::None;
}
P("%s: %s: signaled: before: %s", Tag, tag_,
action_name(action));
PG_TRY();
@@ -1517,34 +1597,86 @@ class Executor : public WorkerProcessor {
update_prepared_statement();
break;
default:
- Processor::signaled();
+ // TODO: Report an error
break;
}
}
PG_CATCH();
{
- if (session_ &&
!DsaPointerIsValid(session_->errorMessage))
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_,
session);
+ if (!DsaPointerIsValid(session->errorMessage))
{
auto error = CopyErrorData();
- set_error_message(std::string("failed to run:
") + action_name(action) +
+ set_error_message(session,
+ std::string("failed to run:
") + action_name(action) +
": " + error->message,
"unexpected error");
FreeErrorData(error);
}
- pgstat_report_activity(STATE_IDLE, NULL);
+ pgstat_report_activity(STATE_IDLE, nullptr);
PG_RE_THROW();
}
PG_END_TRY();
- pgstat_report_activity(STATE_IDLE, NULL);
+ pgstat_report_activity(STATE_IDLE, nullptr);
P("%s: %s: signaled: after: %s", Tag, tag_,
action_name(action));
}
protected:
- pid_t peer_pid(SessionData* session) override { return
sharedData_->serverPID; }
+ pid_t peer_pid(LocalSessionData* session) override { return
sharedData_->serverPID; }
+
+ const char* peer_name() override { return "server"; }
+
+ arrow::Status wait_internal(LocalSessionData* sessio51n,
+ SharedRingBuffer* buffer,
+ WaitMode mode,
+ const char* tag) override
+ {
+ while (true)
+ {
+ int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+ WaitLatch(MyLatch, events, -1, PG_WAIT_EXTENSION);
+ if (GotSIGTERM)
+ {
+ break;
+ }
+ ResetLatch(MyLatch);
+
+ if (GotSIGUSR1)
+ {
+ GotSIGUSR1 = false;
+ P("%s: %s: %s: %s: wait: %" PRIsize,
+ Tag,
+ tag_,
+ tag,
+ peer_name(),
+ get_waiting_buffer_size(buffer, mode));
+ if (get_waiting_buffer_size(buffer, mode) > 0)
+ {
+ break;
+ }
+ }
- const char* peer_name(SessionData* session) override { return "server";
}
+ // TODO: Convert PG error to arrow::Status.
+ CHECK_FOR_INTERRUPTS();
+ }
+ return arrow::Status::OK();
+ }
private:
+ SharedSessionData* find_session()
+ {
+ auto session =
+ static_cast<SharedSessionData*>(dshash_find(sessions_,
&sessionID_, false));
+ if (!session)
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("%s: %s: unknown session: %" PRIu64,
Tag, tag_, sessionID_));
+ }
+ return session;
+ }
+
void signal_server(const char* tag)
{
if (sharedData_->serverPID == InvalidPid)
@@ -1555,58 +1687,23 @@ class Executor : public WorkerProcessor {
kill(sharedData_->serverPID, SIGUSR1);
}
- void set_error_message(const std::string& message, const char* tag)
+ void set_error_message(SharedSessionData* session,
+ const std::string& message,
+ const char* tag)
{
- if (DsaPointerIsValid(session_->errorMessage))
+ if (DsaPointerIsValid(session->errorMessage))
{
return;
}
{
ProcessorLockGuard lock(this);
- set_shared_string(session_->errorMessage, message);
+ set_shared_string(session->errorMessage, message);
}
signal_server(tag);
}
- void close_internal(bool unlockSession)
- {
- const char* tag = "close";
- closed_ = true;
- pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
closing").c_str());
- preparedStatements_.clear();
- if (connected_)
- {
- SPI_finish();
- {
- SharedRingBuffer
buffer(&(session_->bufferData), area_);
- buffer.free();
- }
- delete_session(session_);
- }
- else
- {
- set_error_message("failed to connect", tag);
- session_->initialized = true;
- if (unlockSession)
- {
- dshash_release_lock(sessions_, session_);
- }
- signal_server(tag);
- }
- if (CurrentResourceOwner)
- {
- auto resourceOwner = CurrentResourceOwner;
- CurrentResourceOwner = nullptr;
- ResourceOwnerRelease(
- resourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS,
false, true);
- ResourceOwnerRelease(resourceOwner,
RESOURCE_RELEASE_LOCKS, false, true);
- ResourceOwnerRelease(
- resourceOwner, RESOURCE_RELEASE_AFTER_LOCKS,
false, true);
- ResourceOwnerDelete(resourceOwner);
- }
- }
-
- bool check_password(const char* databaseName,
+ bool check_password(SharedSessionData* session,
+ const char* databaseName,
const char* userName,
const char* password,
const char* clientAddress)
@@ -1620,7 +1717,7 @@ class Executor : public WorkerProcessor {
Port port = {};
port.database_name = pstrdup(databaseName);
port.user_name = pstrdup(userName);
- if (!fill_client_address(&port, clientAddress))
+ if (!fill_client_address(session, &port, clientAddress))
{
return false;
}
@@ -1628,18 +1725,18 @@ class Executor : public WorkerProcessor {
hba_getauthmethod(&port);
if (!port.hba)
{
- set_error_message("failed to get auth method", tag);
+ set_error_message(session, "failed to get auth method",
tag);
return false;
}
switch (port.hba->auth_method)
{
case uaMD5:
// TODO
- set_error_message("MD5 auth method isn't
supported yet", tag);
+ set_error_message(session, "MD5 auth method
isn't supported yet", tag);
return false;
case uaSCRAM:
// TODO
- set_error_message("SCRAM auth method isn't
supported yet", tag);
+ set_error_message(session, "SCRAM auth method
isn't supported yet", tag);
return false;
case uaPassword:
{
@@ -1647,7 +1744,8 @@ class Executor : public WorkerProcessor {
auto shadowPassword =
get_role_password(port.user_name, &logDetail);
if (!shadowPassword)
{
- set_error_message(std::string("failed
to get password: ") + logDetail,
+ set_error_message(session,
+ std::string("failed
to get password: ") + logDetail,
tag);
return false;
}
@@ -1656,7 +1754,9 @@ class Executor : public WorkerProcessor {
if (result != STATUS_OK)
{
set_error_message(
- std::string("failed to verify
password: ") + logDetail, tag);
+ session,
+ std::string("failed to verify
password: ") + logDetail,
+ tag);
return false;
}
return true;
@@ -1664,14 +1764,17 @@ class Executor : public WorkerProcessor {
case uaTrust:
return true;
default:
- set_error_message(std::string("unsupported auth
method: ") +
+ set_error_message(session,
+ std::string("unsupported auth
method: ") +
hba_authname(port.hba->auth_method),
tag);
return false;
}
}
- bool fill_client_address(Port* port, const char* clientAddress)
+ bool fill_client_address(SharedSessionData* session,
+ Port* port,
+ const char* clientAddress)
{
const char* tag = "fill client address";
// clientAddress: "ipv4:127.0.0.1:40468"
@@ -1688,7 +1791,9 @@ class Executor : public WorkerProcessor {
if (!(clientFamily == "ipv4" || clientFamily == "ipv6"))
{
set_error_message(
- std::string("client family must be ipv4 or
ipv6: ") + clientFamily, tag);
+ session,
+ std::string("client family must be ipv4 or
ipv6: ") + clientFamily,
+ tag);
return false;
}
auto clientPortStart = clientPort.c_str();
@@ -1696,17 +1801,19 @@ class Executor : public WorkerProcessor {
auto clientPortNumber = std::strtoul(clientPortStart,
&clientPortEnd, 10);
if (clientPortEnd[0] != '\0')
{
- set_error_message(std::string("client port is invalid:
") + clientPort, tag);
+ set_error_message(
+ session, std::string("client port is invalid:
") + clientPort, tag);
return false;
}
if (clientPortNumber == 0)
{
- set_error_message(std::string("client port must not
0"), tag);
+ set_error_message(session, std::string("client port
must not 0"), tag);
return false;
}
if (clientPortNumber > 65535)
{
- set_error_message(std::string("client port is too
large: ") +
+ set_error_message(session,
+ std::string("client port is too
large: ") +
std::to_string(clientPortNumber),
tag);
return false;
@@ -1720,7 +1827,9 @@ class Executor : public WorkerProcessor {
if (inet_pton(AF_INET, clientHost.c_str(),
&(raddr->sin_addr)) == 0)
{
set_error_message(
- std::string("client IPv4 address is
invalid: ") + clientHost, tag);
+ session,
+ std::string("client IPv4 address is
invalid: ") + clientHost,
+ tag);
return false;
}
}
@@ -1734,7 +1843,9 @@ class Executor : public WorkerProcessor {
if (inet_pton(AF_INET6, clientHost.c_str(),
&(raddr->sin6_addr)) == 0)
{
set_error_message(
- std::string("client IPv6 address is
invalid: ") + clientHost, tag);
+ session,
+ std::string("client IPv6 address is
invalid: ") + clientHost,
+ tag);
return false;
}
raddr->sin6_scope_id = 0;
@@ -1745,22 +1856,26 @@ class Executor : public WorkerProcessor {
void select()
{
const char* tag = "select";
- if (!DsaPointerIsValid(session_->selectQuery))
+ pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
selecting").c_str());
+
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_, session);
+ if (!DsaPointerIsValid(session->selectQuery))
{
set_error_message(
- std::string(Tag) + ": " + tag_ + ": " + tag +
": query is missing", tag);
+ session,
+ std::string(Tag) + ": " + tag_ + ": " + tag +
": query is missing",
+ tag);
return;
}
- pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
selecting").c_str());
-
std::string query;
{
ProcessorLockGuard lock(this);
query =
- static_cast<const char*>(dsa_get_address(area_,
session_->selectQuery));
- dsa_free(area_, session_->selectQuery);
- session_->selectQuery = InvalidDsaPointer;
+ static_cast<const char*>(dsa_get_address(area_,
session->selectQuery));
+ dsa_free(area_, session->selectQuery);
+ session->selectQuery = InvalidDsaPointer;
}
P("%s: %s: %s: %s", Tag, tag_, tag, query.c_str());
@@ -1770,36 +1885,37 @@ class Executor : public WorkerProcessor {
SetCurrentStatementStartTimestamp();
auto result = SPI_execute(query.c_str(), true, 0);
-
+ needFinish_ = true;
if (result > 0)
{
pgstat_report_activity(
STATE_RUNNING, (std::string(Tag) + ": "
+ tag + ": writing").c_str());
- auto status = write(tag);
- if (status.ok())
- {
- signal_server(tag);
- }
- else
+ auto status = write_record_batches(tag);
+ if (!status.ok())
{
- set_error_message(std::string(Tag) + ":
" + tag_ + ": " + tag +
+ set_error_message(session,
+ std::string(Tag) + ":
" + tag_ + ": " + tag +
": failed to
write: " + status.ToString(),
tag);
}
}
else
{
- set_error_message(std::string(Tag) + ": " +
tag_ + ": " + tag +
+ set_error_message(session,
+ std::string(Tag) + ": " +
tag_ + ": " + tag +
": failed to run a query:
<" + query +
">: " +
SPI_result_code_string(result),
tag);
}
+ SPI_finish();
+ needFinish_ = false;
+ signal_server(tag);
}
}
- arrow::Status write(const char* tag)
+ arrow::Status write_record_batches(const char* tag)
{
- SharedRingBufferOutputStream output(this, session_);
+ SharedRingBufferOutputStream output(this, localSession_);
std::vector<std::shared_ptr<arrow::Field>> fields;
for (int i = 0; i < SPI_tuptable->tupdesc->natts; ++i)
{
@@ -1915,22 +2031,26 @@ class Executor : public WorkerProcessor {
void update()
{
const char* tag = "update";
- if (!DsaPointerIsValid(session_->updateQuery))
+ pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
updating").c_str());
+
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_, session);
+ if (!DsaPointerIsValid(session->updateQuery))
{
set_error_message(
- std::string(Tag) + ": " + tag_ + ": " + tag +
": query is missing", tag);
+ session,
+ std::string(Tag) + ": " + tag_ + ": " + tag +
": query is missing",
+ tag);
return;
}
- pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
updating").c_str());
-
std::string query;
{
ProcessorLockGuard lock(this);
query =
- static_cast<const char*>(dsa_get_address(area_,
session_->updateQuery));
- dsa_free(area_, session_->updateQuery);
- session_->updateQuery = InvalidDsaPointer;
+ static_cast<const char*>(dsa_get_address(area_,
session->updateQuery));
+ dsa_free(area_, session->updateQuery);
+ session->updateQuery = InvalidDsaPointer;
}
P("%s: %s: %s: %s", Tag, tag_, tag, query.c_str());
@@ -1940,40 +2060,48 @@ class Executor : public WorkerProcessor {
SetCurrentStatementStartTimestamp();
auto result = SPI_execute(query.c_str(), false, 0);
+ needFinish_ = true;
if (result > 0)
{
- session_->nUpdatedRecords = SPI_processed;
- signal_server(tag);
+ session->nUpdatedRecords = SPI_processed;
}
else
{
- set_error_message(std::string(Tag) + ": " +
tag_ + ": " + tag +
+ set_error_message(session,
+ std::string(Tag) + ": " +
tag_ + ": " + tag +
": failed to run a query:
<" + query +
">: " +
SPI_result_code_string(result),
tag);
}
+ SPI_finish();
+ needFinish_ = false;
+ signal_server(tag);
}
}
void prepare()
{
const char* tag = "prepare";
- if (!DsaPointerIsValid(session_->prepareQuery))
+ pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
preparing").c_str());
+
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_, session);
+ if (!DsaPointerIsValid(session->prepareQuery))
{
set_error_message(
- std::string(Tag) + ": " + tag_ + ": " + tag +
": query is missing", tag);
+ session,
+ std::string(Tag) + ": " + tag_ + ": " + tag +
": query is missing",
+ tag);
return;
}
- pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ":
preparing").c_str());
-
std::string query;
{
ProcessorLockGuard lock(this);
query =
- static_cast<const char*>(dsa_get_address(area_,
session_->prepareQuery));
- dsa_free(area_, session_->prepareQuery);
- session_->prepareQuery = InvalidDsaPointer;
+ static_cast<const char*>(dsa_get_address(area_,
session->prepareQuery));
+ dsa_free(area_, session->prepareQuery);
+ session->prepareQuery = InvalidDsaPointer;
}
P("%s: %s: %s: %s", Tag, tag_, tag, query.c_str());
std::string handle(std::to_string(nextPreparedStatementID_++));
@@ -1981,26 +2109,28 @@ class Executor : public WorkerProcessor {
std::make_pair(handle,
PreparedStatement(std::move(query))));
{
ProcessorLockGuard lock(this);
- set_shared_string(session_->preparedStatementHandle,
handle);
+ set_shared_string(session->preparedStatementHandle,
handle);
}
signal_server(tag);
}
- bool extract_handle(std::string& handle, const char* tag)
+ bool extract_handle(SharedSessionData* session, std::string& handle,
const char* tag)
{
- if (!DsaPointerIsValid(session_->preparedStatementHandle))
+ if (!DsaPointerIsValid(session->preparedStatementHandle))
{
set_error_message(
- std::string(Tag) + ": " + tag_ + ": " + tag +
": handle is missing", tag);
+ session,
+ std::string(Tag) + ": " + tag_ + ": " + tag +
": handle is missing",
+ tag);
return false;
}
{
ProcessorLockGuard lock(this);
handle = static_cast<const char*>(
- dsa_get_address(area_,
session_->preparedStatementHandle));
- dsa_free(area_, session_->preparedStatementHandle);
- session_->preparedStatementHandle = InvalidDsaPointer;
+ dsa_get_address(area_,
session->preparedStatementHandle));
+ dsa_free(area_, session->preparedStatementHandle);
+ session->preparedStatementHandle = InvalidDsaPointer;
}
return true;
@@ -2012,27 +2142,29 @@ class Executor : public WorkerProcessor {
pgstat_report_activity(STATE_RUNNING, (std::string(Tag) + ": "
+ tag).c_str());
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_, session);
std::string handle;
- if (!extract_handle(handle, tag))
+ if (!extract_handle(session, handle, tag))
{
return;
}
P("%s: %s: %s: %s", Tag, tag_, tag, handle.c_str());
- if (preparedStatements_.erase(handle) > 0)
- {
- signal_server(tag);
- }
- else
+ if (preparedStatements_.erase(handle) == 0)
{
- set_error_message(std::string(Tag) + ": " + tag_ + ": "
+ tag +
+ set_error_message(session,
+ std::string(Tag) + ": " + tag_ + ": "
+ tag +
": nonexistent handle: <" +
handle + ">",
tag);
}
+ signal_server(tag);
}
- PreparedStatement* find_prepared_statement(std::string& handle, const
char* tag)
+ PreparedStatement* find_prepared_statement(SharedSessionData* session,
+ std::string& handle,
+ const char* tag)
{
- if (!extract_handle(handle, tag))
+ if (!extract_handle(session, handle, tag))
{
return nullptr;
}
@@ -2041,7 +2173,8 @@ class Executor : public WorkerProcessor {
auto it = preparedStatements_.find(handle);
if (it == preparedStatements_.end())
{
- set_error_message(std::string(Tag) + ": " + tag_ + ": "
+ tag +
+ set_error_message(session,
+ std::string(Tag) + ": " + tag_ + ": "
+ tag +
": nonexistent handle: <" +
handle + ">",
tag);
return nullptr;
@@ -2058,8 +2191,10 @@ class Executor : public WorkerProcessor {
pgstat_report_activity(STATE_RUNNING,
(std::string(Tag) + ": setting
parameters").c_str());
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_, session);
std::string handle;
- auto preparedStatement = find_prepared_statement(handle, tag);
+ auto preparedStatement = find_prepared_statement(session,
handle, tag);
P("%s: %s: %s: %s", Tag, tag_, tag, handle.c_str());
if (!preparedStatement)
@@ -2067,19 +2202,18 @@ class Executor : public WorkerProcessor {
return;
}
- auto input =
std::make_shared<SharedRingBufferInputStream>(this, session_);
+ auto input =
std::make_shared<SharedRingBufferInputStream>(this, localSession_);
auto status = preparedStatement->set_parameters(input);
- if (status.ok())
- {
- signal_server(tag);
- }
- else
+ if (!status.ok())
{
- set_error_message(std::string(Tag) + ": " + tag_ + ": "
+ tag +
+ set_error_message(session,
+ std::string(Tag) + ": " + tag_ + ": "
+ tag +
": failed to set parameters: <" +
handle +
">: " + status.ToString(),
tag);
}
+ session->setParametersFinished = true;
+ signal_server(tag);
}
void select_prepared_statement()
@@ -2089,8 +2223,10 @@ class Executor : public WorkerProcessor {
pgstat_report_activity(
STATE_RUNNING, (std::string(Tag) + ": selecting
prepared statement").c_str());
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_, session);
std::string handle;
- auto preparedStatement = find_prepared_statement(handle, tag);
+ auto preparedStatement = find_prepared_statement(session,
handle, tag);
P("%s: %s: %s: %s", Tag, tag_, tag, handle.c_str());
if (!preparedStatement)
@@ -2108,20 +2244,21 @@ class Executor : public WorkerProcessor {
auto status = preparedStatement->select(
[](void* data) {
auto d = static_cast<Data*>(data);
- return d->executor->write(d->tag);
+ return
d->executor->write_record_batches(d->tag);
},
&data);
- if (status.ok())
- {
- signal_server(tag);
- }
- else
+ needFinish_ = true;
+ if (!status.ok())
{
- set_error_message(std::string(Tag) + ": " + tag_ + ": "
+ tag +
+ set_error_message(session,
+ std::string(Tag) + ": " + tag_ + ": "
+ tag +
": failed to select a prepared
statement: <" + handle +
">: " + status.ToString(),
tag);
}
+ SPI_finish();
+ needFinish_ = false;
+ signal_server(tag);
}
void update_prepared_statement()
@@ -2131,8 +2268,10 @@ class Executor : public WorkerProcessor {
pgstat_report_activity(
STATE_RUNNING, (std::string(Tag) + ": updating prepared
statement").c_str());
+ auto session = find_session();
+ SharedSessionReleaser sessionReleaser(sessions_, session);
std::string handle;
- auto preparedStatement = find_prepared_statement(handle, tag);
+ auto preparedStatement = find_prepared_statement(session,
handle, tag);
P("%s: %s: %s: %s", Tag, tag_, tag, handle.c_str());
if (!preparedStatement)
@@ -2143,25 +2282,29 @@ class Executor : public WorkerProcessor {
ScopedTransaction scopedTransaction;
ScopedSnapshot scopedSnapshot;
- auto input =
std::make_shared<SharedRingBufferInputStream>(this, session_);
+ auto input =
std::make_shared<SharedRingBufferInputStream>(this, localSession_);
auto n_updated_records_result =
preparedStatement->update(input);
+ needFinish_ = true;
if (n_updated_records_result.ok())
{
- session_->nUpdatedRecords = *n_updated_records_result;
- signal_server(tag);
+ session->nUpdatedRecords = *n_updated_records_result;
}
else
{
- set_error_message(std::string(Tag) + ": " + tag_ + ": "
+ tag +
+ set_error_message(session,
+ std::string(Tag) + ": " + tag_ + ": "
+ tag +
": failed to update a prepared
statement: <" + handle +
">: " +
n_updated_records_result.status().ToString(),
tag);
}
+ SPI_finish();
+ needFinish_ = false;
+ signal_server(tag);
}
uint64_t sessionID_;
- SessionData* session_;
- bool connected_;
+ std::shared_ptr<LocalSessionData> localSession_;
+ bool needFinish_;
bool closed_;
uint64_t nextPreparedStatementID_;
std::map<std::string, PreparedStatement> preparedStatements_;
@@ -2177,46 +2320,274 @@ SharedRingBufferOutputStream::Write(const void* data,
int64_t nBytes)
}
if (ARROW_PREDICT_TRUE(nBytes > 0))
{
- auto buffer = processor_->create_shared_ring_buffer(session_);
+ SharedRingBuffer buffer(
+ localSession_->bufferData,
localSession_->bufferAddress, processor_->tag());
size_t rest = static_cast<size_t>(nBytes);
while (true)
{
- processor_->lock_acquire();
- auto writtenSize = buffer.write(data, rest);
- processor_->lock_release();
+ ARROW_ASSIGN_OR_RAISE(
+ auto writtenBytes,
+ processor_->write(localSession_.get(), &buffer,
data, rest));
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::IOError(std::string(Tag)
+ ": " +
+ processor_->tag()
+ ": interrupted");
+ }
+
+ if (writtenBytes == 0)
+ {
+ ARROW_RETURN_NOT_OK(processor_->wait(
+ localSession_.get(), &buffer,
Processor::WaitMode::Read));
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return
arrow::Status::IOError(std::string(Tag) + ": " +
+
processor_->tag() + ": interrupted");
+ }
+ continue;
+ }
- position_ += writtenSize;
- rest -= writtenSize;
- data = static_cast<const uint8_t*>(data) + writtenSize;
+ position_ += writtenBytes;
+ rest -= writtenBytes;
+ data = static_cast<const uint8_t*>(data) + writtenBytes;
if (ARROW_PREDICT_TRUE(rest == 0))
{
break;
}
-
- ARROW_RETURN_NOT_OK(
- processor_->wait(session_, &buffer,
Processor::WaitMode::Read));
}
}
return arrow::Status::OK();
}
+// There is only one Proxy object in a PostgreSQL instance. The Proxy
+// object communicates multiple "executor" processes.
+//
+// This class must be thread safe. We call PostgreSQL API only in
+// signaled(). We can't call PostgreSQL API in other methods because
+// they are called from gRPC threads not the main thread. PostgreSQL
+// API isn't thread safe and must be called only in the main thread.
+//
+// If we need to call PostgreSQL API from gRPC threads, we need to
+// create a request and process it in signaled().
class Proxy : public WorkerProcessor {
public:
explicit Proxy()
- : WorkerProcessor("proxy", false), randomSeed_(),
randomEngine_(randomSeed_())
+ : WorkerProcessor("proxy"),
+ mutex_(),
+ conditionVariable_(),
+ randomSeed_(),
+ randomEngine_(randomSeed_()),
+ localSessions_(),
+ connectRequests_(),
+ selectRequests_(),
+ updateRequests_(),
+ prepareRequests_(),
+ closePreparedStatementRequests_(),
+ setParametersRequests_(),
+ selectPreparedStatementRequests_(),
+ updatePreparedStatementRequests_(),
+ readRequests_(),
+ writeRequests_()
+ {
+ }
+
+ private:
+ std::mutex mutex_;
+ std::condition_variable conditionVariable_;
+ std::random_device randomSeed_;
+ std::mt19937_64 randomEngine_;
+ std::map<uint64_t, std::shared_ptr<LocalSessionData>> localSessions_;
+
+ public:
+ arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> open_reader(
+ uint64_t sessionID)
+ {
+ std::shared_ptr<LocalSessionData> localSession;
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto it = localSessions_.find(sessionID);
+ if (it == localSessions_.end())
+ {
+ return arrow::Status::Invalid("unknown session:
", sessionID);
+ }
+ localSession = it->second;
+ }
+ auto input =
+ std::make_shared<SharedRingBufferInputStream>(this,
std::move(localSession));
+ // Read another stream format data with record batches.
+ return arrow::ipc::RecordBatchStreamReader::Open(input);
+ }
+
+ // This is only called from PostgreSQL (main) thread. We can use
+ // PostgreSQL API only in this method and methods called from this
+ // method.
+ void signaled()
+ {
+ process_connect_requests();
+ process_select_requests();
+ process_update_requests();
+ process_prepare_requests();
+ process_close_prepared_statement_requests();
+ process_set_parameters_requests();
+ process_select_prepared_statement_requests();
+ process_update_prepared_statement_requests();
+ process_read_requests();
+ process_write_requests();
+ delete_finished_sessions();
+ conditionVariable_.notify_all();
+ }
+
+ private:
+ struct ConnectRequest {
+ ConnectRequest(uint64_t sessionID,
+ std::string databaseName,
+ std::string userName,
+ std::string password,
+ std::string clientAddress)
+ : sessionID(sessionID),
+ databaseName(std::move(databaseName)),
+ userName(std::move(userName)),
+ password(std::move(password)),
+ clientAddress(std::move(clientAddress)),
+ processing(false),
+ finished(false),
+ errorMessage(std::nullopt)
+ {
+ }
+ uint64_t sessionID;
+ std::string databaseName;
+ std::string userName;
+ std::string password;
+ std::string clientAddress;
+ bool processing;
+ bool finished;
+ std::optional<std::string> errorMessage;
+ };
+
+ std::list<std::shared_ptr<ConnectRequest>> connectRequests_;
+
+ // Can use PostgreSQL API.
+ void process_connect_requests()
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ bool haveRequest = false;
+ for (auto it = connectRequests_.begin(); it !=
connectRequests_.end();)
+ {
+ auto& request = *it;
+ ProcessorLockGuard lock(this);
+ if (request->processing)
+ {
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_,
&(request->sessionID), false));
+ if (!session)
+ {
+ ++it;
+ continue;
+ }
+ const auto initialized = session->initialized;
+ if (initialized)
+ {
+ if
(DsaPointerIsValid(session->errorMessage))
+ {
+ request->errorMessage =
static_cast<const char*>(
+ dsa_get_address(area_,
session->errorMessage));
+ }
+ else
+ {
+ auto& localSession =
+
localSessions_.find(request->sessionID)->second;
+ localSession->valid = true;
+ localSession->peerPID =
session->executorPID;
+ localSession->bufferData =
&(session->bufferData);
+ localSession->bufferAddress =
+ dsa_get_address(area_,
session->bufferData.pointer);
+ P("%s: %s: connect: %" PRIu64,
+ Tag,
+ tag_,
+ session->bufferData.pointer);
+ }
+ }
+ dshash_release_lock(sessions_, session);
+ if (!initialized)
+ {
+ ++it;
+ continue;
+ }
+ request->finished = true;
+ it = connectRequests_.erase(it);
+ }
+ else
+ {
+ bool found;
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find_or_insert(sessions_,
&(request->sessionID), &found));
+ if (found)
+ {
+ request->finished = true;
+ request->errorMessage =
std::string("duplicated session ID: ") +
+
std::to_string(request->sessionID);
+ it = connectRequests_.erase(it);
+ }
+ else
+ {
+ request->processing = true;
+ shared_session_data_initialize(session,
+ area_,
+
request->databaseName,
+
request->userName,
+
request->password,
+
request->clientAddress);
+ haveRequest = true;
+ ++it;
+ }
+ dshash_release_lock(sessions_, session);
+ }
+ }
+ if (haveRequest)
+ {
+ kill(sharedData_->mainPID, SIGUSR1);
+ }
+ }
+
+ // Don't use PostgreSQL API.
+ uint64_t assign_session_id()
{
+ std::lock_guard<std::mutex> lock(mutex_);
+ while (true)
+ {
+ auto id = randomEngine_();
+ if (id == 0)
+ {
+ continue;
+ }
+ auto beforeSize = localSessions_.size();
+ auto localSession =
std::make_shared<LocalSessionData>();
+ localSessions_.insert(std::make_pair(id, localSession));
+ if (localSessions_.size() == beforeSize)
+ {
+ continue;
+ }
+ localSession->id = id;
+ localSession->valid = false;
+ return id;
+ }
}
+ public:
+ // Don't use PostgreSQL API.
arrow::Result<uint64_t> connect(const std::string& databaseName,
const std::string& userName,
const std::string& password,
const std::string& clientAddress)
{
- auto session = create_session(databaseName, userName, password,
clientAddress);
- auto id = session->id;
- dshash_release_lock(sessions_, session);
- kill(sharedData_->mainPID, SIGUSR1);
+ auto id = assign_session_id();
+ auto request = std::make_shared<ConnectRequest>(
+ id, databaseName, userName, password, clientAddress);
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ connectRequests_.push_back(request);
+ }
+ kill(MyProcPid, SIGUSR1);
{
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
@@ -2224,25 +2595,12 @@ class Proxy : public WorkerProcessor {
{
return true;
}
- session =
static_cast<SessionData*>(dshash_find(sessions_, &id, false));
- if (!session)
- {
- return true;
- }
- const auto initialized = session->initialized;
- dshash_release_lock(sessions_, session);
- return initialized;
+ return request->finished;
});
}
- session = static_cast<SessionData*>(dshash_find(sessions_, &id,
false));
- if (!session)
- {
- return arrow::Status::Invalid("session is stale: ", id);
- }
- SessionReleaser sessionReleaser(sessions_, session);
- if (DsaPointerIsValid(session->errorMessage))
+ if (request->errorMessage.has_value())
{
- return report_session_error(session);
+ return
arrow::Status::Invalid(request->errorMessage.value());
}
if (INTERRUPTS_PENDING_CONDITION())
{
@@ -2251,166 +2609,562 @@ class Proxy : public WorkerProcessor {
return id;
}
+ // Don't use PostgreSQL API.
bool is_valid_session(uint64_t sessionID)
{
- auto session = find_session(sessionID);
- if (session)
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto it = localSessions_.find(sessionID);
+ if (it == localSessions_.end())
{
- dshash_release_lock(sessions_, session);
- return true;
+ return false;
}
- else
+ return it->second->valid;
+ }
+
+ private:
+ struct SelectRequest {
+ SelectRequest(uint64_t sessionID, std::string query)
+ : sessionID(sessionID),
+ query(std::move(query)),
+ finished(false),
+ errorMessage(std::nullopt)
{
- return false;
}
+ uint64_t sessionID;
+ std::string query;
+ bool finished;
+ std::optional<std::string> errorMessage;
+ };
+
+ std::list<std::shared_ptr<SelectRequest>> selectRequests_;
+
+ // Can use PostgreSQL API.
+ void process_select_requests()
+ {
+#ifdef AFS_VERBOSE
+ const char* tag = "select";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto& request : selectRequests_)
+ {
+ request->finished = true;
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_, &(request->sessionID),
false));
+ if (!session)
+ {
+ request->errorMessage =
+ std::string("stolen session: ") +
std::to_string(request->sessionID);
+ continue;
+ }
+ auto executorPID = session->executorPID;
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ ProcessorLockGuard lock(this);
+ set_shared_string(session->selectQuery,
request->query);
+ session->action = Action::Select;
+ }
+ P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
executorPID);
+ kill(executorPID, SIGUSR1);
+ }
+ selectRequests_.clear();
+ }
+
+ arrow::Result<std::shared_ptr<arrow::Schema>> read_schema(uint64_t
sessionID,
+ const char*
tag)
+ {
+ auto it = localSessions_.find(sessionID);
+ if (it == localSessions_.end())
+ {
+ return arrow::Status::Invalid("Unknown session: ",
sessionID);
+ }
+ auto input =
std::make_shared<SharedRingBufferInputStream>(this, it->second);
+
+ // Read schema only stream format data.
+ ARROW_ASSIGN_OR_RAISE(auto reader,
+
arrow::ipc::RecordBatchStreamReader::Open(input));
+ while (true)
+ {
+ std::shared_ptr<arrow::RecordBatch> recordBatch;
+ P("%s: %s: %s: read next", Tag, tag_, tag);
+ ARROW_RETURN_NOT_OK(reader->ReadNext(&recordBatch));
+ if (!recordBatch)
+ {
+ break;
+ }
+ }
+ return reader->schema();
}
+ public:
arrow::Result<std::shared_ptr<arrow::Schema>> select(uint64_t sessionID,
const std::string&
query)
{
const char* tag = "select";
- auto session = find_session(sessionID);
- SessionReleaser sessionReleaser(sessions_, session);
- set_shared_string(session->selectQuery, query);
- session->action = Action::Select;
- if (session->executorPID != InvalidPid)
+ auto request = std::make_shared<SelectRequest>(sessionID,
query);
{
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ std::lock_guard<std::mutex> lock(mutex_);
+ selectRequests_.push_back(request);
}
+ kill(MyProcPid, SIGUSR1);
{
- auto buffer = create_shared_ring_buffer(session);
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: wait", Tag, tag_, tag);
- return DsaPointerIsValid(session->errorMessage)
|| buffer.size() > 0;
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
});
}
- if (DsaPointerIsValid(session->errorMessage))
+ if (request->errorMessage.has_value())
+ {
+ return
arrow::Status::Invalid(request->errorMessage.value());
+ }
+ if (INTERRUPTS_PENDING_CONDITION())
{
- return report_session_error(session);
+ return arrow::Status::Invalid("interrupted");
}
P("%s: %s: %s: open", Tag, tag_, tag);
- auto schema = read_schema(session, tag);
+ auto schema = read_schema(sessionID, tag);
P("%s: %s: %s: schema", Tag, tag_, tag);
return schema;
}
+ private:
+ struct UpdateRequest {
+ UpdateRequest(uint64_t sessionID, std::string query)
+ : sessionID(sessionID),
+ query(std::move(query)),
+ processing(false),
+ finished(false),
+ nUpdatedRecords(0),
+ errorMessage(std::nullopt)
+ {
+ }
+ uint64_t sessionID;
+ std::string query;
+ bool processing;
+ bool finished;
+ int64_t nUpdatedRecords;
+ std::optional<std::string> errorMessage;
+ };
+
+ std::list<std::shared_ptr<UpdateRequest>> updateRequests_;
+
+ // Can use PostgreSQL API.
+ void process_update_requests()
+ {
+#ifdef AFS_VERBOSE
+ const char* tag = "update";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto it = updateRequests_.begin(); it !=
updateRequests_.end();)
+ {
+ auto& request = *it;
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_, &(request->sessionID),
false));
+ if (!session)
+ {
+ request->finished = true;
+ request->errorMessage =
+ std::string("stolen session: ") +
std::to_string(request->sessionID);
+ it = updateRequests_.erase(it);
+ continue;
+ }
+ if (request->processing)
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ if (DsaPointerIsValid(session->errorMessage))
+ {
+ request->processing = false;
+ request->finished = true;
+ request->errorMessage =
static_cast<const char*>(
+ dsa_get_address(area_,
session->errorMessage));
+ it = updateRequests_.erase(it);
+ }
+ else if (session->nUpdatedRecords >= 0)
+ {
+ request->processing = false;
+ request->finished = true;
+ request->nUpdatedRecords =
session->nUpdatedRecords;
+ it = updateRequests_.erase(it);
+ }
+ else
+ {
+ ++it;
+ }
+ }
+ else
+ {
+ auto executorPID = session->executorPID;
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ ProcessorLockGuard lock(this);
+ set_shared_string(session->updateQuery,
request->query);
+ session->action = Action::Update;
+ session->nUpdatedRecords = -1;
+ }
+ P("%s: %s: %s: kill executor: %d", Tag, tag_,
tag, executorPID);
+ kill(executorPID, SIGUSR1);
+ request->processing = true;
+ ++it;
+ }
+ }
+ }
+
+ public:
arrow::Result<int64_t> update(uint64_t sessionID, const std::string&
query)
{
#ifdef AFS_VERBOSE
const char* tag = "update";
#endif
- auto session = find_session(sessionID);
- SessionReleaser sessionReleaser(sessions_, session);
- lock_acquire();
- set_shared_string(session->updateQuery, query);
- session->action = Action::Update;
- session->nUpdatedRecords = -1;
- lock_release();
- if (session->executorPID != InvalidPid)
+ auto request = std::make_shared<UpdateRequest>(sessionID,
query);
{
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ std::lock_guard<std::mutex> lock(mutex_);
+ updateRequests_.push_back(request);
}
+ kill(MyProcPid, SIGUSR1);
{
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: wait", Tag, tag_, tag);
- return DsaPointerIsValid(session->errorMessage)
||
- session->nUpdatedRecords >= 0;
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
});
}
- if (DsaPointerIsValid(session->errorMessage))
+ if (request->errorMessage.has_value())
{
- return report_session_error(session);
+ return
arrow::Status::Invalid(request->errorMessage.value());
}
- P("%s: %s: %s: done: %ld", Tag, tag_, tag,
session->nUpdatedRecords);
- return session->nUpdatedRecords;
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::Invalid("interrupted");
+ }
+ P("%s: %s: %s: done: %" PRIu64, Tag, tag_, tag,
request->nUpdatedRecords);
+ return request->nUpdatedRecords;
}
- arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> read(uint64_t
sessionID)
+ private:
+ struct PrepareRequest {
+ PrepareRequest(uint64_t sessionID, std::string query)
+ : sessionID(sessionID),
+ query(std::move(query)),
+ processing(false),
+ finished(false),
+ handle(),
+ errorMessage(std::nullopt)
+ {
+ }
+ uint64_t sessionID;
+ std::string query;
+ bool processing;
+ bool finished;
+ std::string handle;
+ std::optional<std::string> errorMessage;
+ };
+
+ std::list<std::shared_ptr<PrepareRequest>> prepareRequests_;
+
+ // Can use PostgreSQL API.
+ void process_prepare_requests()
{
- auto session = find_session(sessionID);
- SessionReleaser sessionReleaser(sessions_, session);
- auto input =
std::make_shared<SharedRingBufferInputStream>(this, session);
- // Read another stream format data with record batches.
- return arrow::ipc::RecordBatchStreamReader::Open(input);
+#ifdef AFS_VERBOSE
+ const char* tag = "prepare";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto it = prepareRequests_.begin(); it !=
prepareRequests_.end();)
+ {
+ auto& request = *it;
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_, &(request->sessionID),
false));
+ if (!session)
+ {
+ request->finished = true;
+ request->errorMessage =
+ std::string("stolen session: ") +
std::to_string(request->sessionID);
+ it = prepareRequests_.erase(it);
+ continue;
+ }
+ if (request->processing)
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ if (DsaPointerIsValid(session->errorMessage))
+ {
+ request->processing = false;
+ request->finished = true;
+ request->errorMessage =
static_cast<const char*>(
+ dsa_get_address(area_,
session->errorMessage));
+ it = prepareRequests_.erase(it);
+ }
+ else if
(DsaPointerIsValid(session->preparedStatementHandle))
+ {
+ request->processing = false;
+ request->finished = true;
+ request->handle = static_cast<const
char*>(
+ dsa_get_address(area_,
session->preparedStatementHandle));
+ it = prepareRequests_.erase(it);
+ }
+ else
+ {
+ ++it;
+ }
+ }
+ else
+ {
+ auto executorPID = session->executorPID;
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ ProcessorLockGuard lock(this);
+
set_shared_string(session->prepareQuery, request->query);
+ session->action = Action::Prepare;
+ session->nUpdatedRecords = -1;
+
set_shared_string(session->preparedStatementHandle, std::string(""));
+ }
+ P("%s: %s: %s: kill executor: %d", Tag, tag_,
tag, executorPID);
+ kill(executorPID, SIGUSR1);
+ request->processing = true;
+ ++it;
+ }
+ }
}
+ public:
arrow::Result<arrow::flight::sql::ActionCreatePreparedStatementResult>
prepare(
uint64_t sessionID, const std::string& query)
{
#ifdef AFS_VERBOSE
const char* tag = "prepare";
#endif
- auto session = find_session(sessionID);
- SessionReleaser sessionReleaser(sessions_, session);
- lock_acquire();
- set_shared_string(session->prepareQuery, query);
- session->action = Action::Prepare;
- set_shared_string(session->preparedStatementHandle,
std::string(""));
- lock_release();
- if (session->executorPID != InvalidPid)
+ auto request = std::make_shared<PrepareRequest>(sessionID,
query);
{
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ std::lock_guard<std::mutex> lock(mutex_);
+ prepareRequests_.push_back(request);
}
+ kill(MyProcPid, SIGUSR1);
{
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: wait", Tag, tag_, tag);
- return DsaPointerIsValid(session->errorMessage)
||
-
DsaPointerIsValid(session->preparedStatementHandle);
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
});
}
- if (DsaPointerIsValid(session->errorMessage))
+ if (request->errorMessage.has_value())
+ {
+ return
arrow::Status::Invalid(request->errorMessage.value());
+ }
+ if (INTERRUPTS_PENDING_CONDITION())
{
- return report_session_error(session);
+ return arrow::Status::Invalid("interrupted");
}
- std::string handle(static_cast<const char*>(
- dsa_get_address(area_,
session->preparedStatementHandle)));
arrow::flight::sql::ActionCreatePreparedStatementResult result
= {
nullptr,
nullptr,
- std::move(handle),
+ std::move(request->handle),
};
P("%s: %s: %s: done", Tag, tag_, tag);
return result;
}
+ private:
+ struct ClosePreparedStatementRequest {
+ ClosePreparedStatementRequest(uint64_t sessionID, std::string
handle)
+ : sessionID(sessionID),
+ handle(std::move(handle)),
+ processing(false),
+ finished(false),
+ errorMessage(std::nullopt)
+ {
+ }
+ uint64_t sessionID;
+ std::string handle;
+ bool processing;
+ bool finished;
+ std::optional<std::string> errorMessage;
+ };
+
+ std::list<std::shared_ptr<ClosePreparedStatementRequest>>
+ closePreparedStatementRequests_;
+
+ // Can use PostgreSQL API.
+ void process_close_prepared_statement_requests()
+ {
+#ifdef AFS_VERBOSE
+ const char* tag = "close prepared statement";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto it = closePreparedStatementRequests_.begin();
+ it != closePreparedStatementRequests_.end();)
+ {
+ auto& request = *it;
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_, &(request->sessionID),
false));
+ if (!session)
+ {
+ request->finished = true;
+ request->errorMessage =
+ std::string("stolen session: ") +
std::to_string(request->sessionID);
+ it = closePreparedStatementRequests_.erase(it);
+ continue;
+ }
+ if (request->processing)
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ if (DsaPointerIsValid(session->errorMessage))
+ {
+ request->processing = false;
+ request->finished = true;
+ request->errorMessage =
static_cast<const char*>(
+ dsa_get_address(area_,
session->errorMessage));
+ it =
closePreparedStatementRequests_.erase(it);
+ }
+ else if
(DsaPointerIsValid(session->preparedStatementHandle))
+ {
+ ++it;
+ }
+ else
+ {
+ request->processing = false;
+ request->finished = true;
+ it =
closePreparedStatementRequests_.erase(it);
+ }
+ }
+ else
+ {
+ auto executorPID = session->executorPID;
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ ProcessorLockGuard lock(this);
+
set_shared_string(session->preparedStatementHandle, request->handle);
+ session->action =
Action::ClosePreparedStatement;
+ }
+ P("%s: %s: %s: kill executor: %d", Tag, tag_,
tag, executorPID);
+ kill(executorPID, SIGUSR1);
+ request->processing = true;
+ ++it;
+ }
+ }
+ }
+
+ public:
arrow::Status close_prepared_statement(uint64_t sessionID, const
std::string& handle)
{
#ifdef AFS_VERBOSE
const char* tag = "close prepared statement";
#endif
- auto session = find_session(sessionID);
- SessionReleaser sessionReleaser(sessions_, session);
- lock_acquire();
- set_shared_string(session->preparedStatementHandle, handle);
- session->action = Action::ClosePreparedStatement;
- lock_release();
- if (session->executorPID != InvalidPid)
+ auto request =
std::make_shared<ClosePreparedStatementRequest>(sessionID, handle);
{
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ std::lock_guard<std::mutex> lock(mutex_);
+ closePreparedStatementRequests_.push_back(request);
}
+ kill(MyProcPid, SIGUSR1);
{
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: wait", Tag, tag_, tag);
- return DsaPointerIsValid(session->errorMessage)
||
-
!DsaPointerIsValid(session->preparedStatementHandle);
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
});
}
- if (DsaPointerIsValid(session->errorMessage))
+ if (request->errorMessage.has_value())
+ {
+ return
arrow::Status::Invalid(request->errorMessage.value());
+ }
+ if (INTERRUPTS_PENDING_CONDITION())
{
- return report_session_error(session);
+ return arrow::Status::Invalid("interrupted");
}
P("%s: %s: %s: done", Tag, tag_, tag);
return arrow::Status::OK();
}
+ private:
+ struct SetParametersRequest {
+ SetParametersRequest(uint64_t sessionID, std::string handle)
+ : sessionID(sessionID),
+ handle(std::move(handle)),
+ processing(false),
+ finished(false),
+ errorMessage(std::nullopt)
+ {
+ }
+ uint64_t sessionID;
+ std::string handle;
+ bool processing;
+ bool finished;
+ std::optional<std::string> errorMessage;
+ };
+
+ std::list<std::shared_ptr<SetParametersRequest>> setParametersRequests_;
+
+ // Can use PostgreSQL API.
+ void process_set_parameters_requests()
+ {
+#ifdef AFS_VERBOSE
+ const char* tag = "set parameters";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto it = setParametersRequests_.begin();
+ it != setParametersRequests_.end();)
+ {
+ auto& request = *it;
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_, &(request->sessionID),
false));
+ if (!session)
+ {
+ request->finished = true;
+ request->errorMessage =
+ std::string("stolen session: ") +
std::to_string(request->sessionID);
+ it = setParametersRequests_.erase(it);
+ continue;
+ }
+ if (request->processing)
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ if (DsaPointerIsValid(session->errorMessage))
+ {
+ request->processing = false;
+ request->finished = true;
+ request->errorMessage =
static_cast<const char*>(
+ dsa_get_address(area_,
session->errorMessage));
+ it = setParametersRequests_.erase(it);
+ }
+ else if (session->setParametersFinished)
+ {
+ request->processing = false;
+ request->finished = true;
+ it = setParametersRequests_.erase(it);
+ }
+ else
+ {
+ ++it;
+ }
+ }
+ else
+ {
+ auto executorPID = session->executorPID;
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ ProcessorLockGuard lock(this);
+
set_shared_string(session->preparedStatementHandle, request->handle);
+ session->action = Action::SetParameters;
+ session->setParametersFinished = false;
+ }
+ P("%s: %s: %s: kill executor: %d", Tag, tag_,
tag, executorPID);
+ kill(executorPID, SIGUSR1);
+ request->processing = true;
+ ++it;
+ }
+ }
+ }
+
+ public:
arrow::Status set_parameters(uint64_t sessionID,
const std::string& handle,
arrow::flight::FlightMessageReader* reader,
@@ -2419,20 +3173,17 @@ class Proxy : public WorkerProcessor {
#ifdef AFS_VERBOSE
const char* tag = "set parameters";
#endif
- auto session = find_session(sessionID);
- SessionReleaser sessionReleaser(sessions_, session);
- lock_acquire();
- set_shared_string(session->preparedStatementHandle, handle);
- session->action = Action::SetParameters;
- lock_release();
- if (session->executorPID != InvalidPid)
+ auto request =
std::make_shared<SetParametersRequest>(sessionID, handle);
{
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ std::lock_guard<std::mutex> lock(mutex_);
+ setParametersRequests_.push_back(request);
}
+ kill(MyProcPid, SIGUSR1);
+ auto session = localSessions_.find(sessionID)->second;
+ auto executorPID = session->peerPID;
{
ARROW_ASSIGN_OR_RAISE(const auto& schema,
reader->GetSchema());
- SharedRingBufferOutputStream output(this, session);
+ SharedRingBufferOutputStream output(this,
std::move(session));
auto options = arrow::ipc::IpcWriteOptions::Defaults();
options.emit_dictionary_deltas = true;
ARROW_ASSIGN_OR_RAISE(auto writer,
@@ -2447,61 +3198,201 @@ class Proxy : public WorkerProcessor {
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*(chunk.data)));
}
ARROW_RETURN_NOT_OK(writer->Close());
- }
- if (session->executorPID != InvalidPid)
- {
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
executorPID);
+ kill(executorPID, SIGUSR1);
}
{
- auto buffer = create_shared_ring_buffer(session);
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: wait", Tag, tag_, tag);
- return DsaPointerIsValid(session->errorMessage)
|| buffer.size() == 0;
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
});
}
- if (DsaPointerIsValid(session->errorMessage))
+ if (request->errorMessage.has_value())
+ {
+ return
arrow::Status::Invalid(request->errorMessage.value());
+ }
+ if (INTERRUPTS_PENDING_CONDITION())
{
- return report_session_error(session);
+ return arrow::Status::Invalid("interrupted");
}
P("%s: %s: %s: done", Tag, tag_, tag);
return arrow::Status::OK();
}
+ private:
+ struct SelectPreparedStatementRequest {
+ SelectPreparedStatementRequest(uint64_t sessionID, std::string
handle)
+ : sessionID(sessionID),
+ handle(std::move(handle)),
+ finished(false),
+ errorMessage(std::nullopt)
+ {
+ }
+ uint64_t sessionID;
+ std::string handle;
+ bool finished;
+ std::optional<std::string> errorMessage;
+ };
+
+ std::list<std::shared_ptr<SelectPreparedStatementRequest>>
+ selectPreparedStatementRequests_;
+
+ // Can use PostgreSQL API.
+ void process_select_prepared_statement_requests()
+ {
+#ifdef AFS_VERBOSE
+ const char* tag = "select prepared statement";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto& request : selectPreparedStatementRequests_)
+ {
+ request->finished = true;
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_, &(request->sessionID),
false));
+ if (!session)
+ {
+ request->errorMessage =
+ std::string("stolen session: ") +
std::to_string(request->sessionID);
+ continue;
+ }
+ auto executorPID = session->executorPID;
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ ProcessorLockGuard lock(this);
+
set_shared_string(session->preparedStatementHandle, request->handle);
+ session->action =
Action::SelectPreparedStatement;
+ }
+ P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
executorPID);
+ kill(executorPID, SIGUSR1);
+ }
+ selectPreparedStatementRequests_.clear();
+ }
+
+ public:
arrow::Result<std::shared_ptr<arrow::Schema>> select_prepared_statement(
uint64_t sessionID, const std::string& handle)
{
const char* tag = "select prepared statement";
- auto session = find_session(sessionID);
- SessionReleaser sessionReleaser(sessions_, session);
- lock_acquire();
- set_shared_string(session->preparedStatementHandle, handle);
- session->action = Action::SelectPreparedStatement;
- lock_release();
- if (session->executorPID != InvalidPid)
+ auto request =
+
std::make_shared<SelectPreparedStatementRequest>(sessionID, handle);
{
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ std::lock_guard<std::mutex> lock(mutex_);
+ selectPreparedStatementRequests_.push_back(request);
}
+ kill(MyProcPid, SIGUSR1);
{
- auto buffer = create_shared_ring_buffer(session);
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: wait", Tag, tag_, tag);
- return DsaPointerIsValid(session->errorMessage)
|| buffer.size() > 0;
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
});
}
- if (DsaPointerIsValid(session->errorMessage))
+ if (request->errorMessage.has_value())
{
- return report_session_error(session);
+ return
arrow::Status::Invalid(request->errorMessage.value());
+ }
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::Invalid("interrupted");
}
P("%s: %s: %s: open", Tag, tag_, tag);
- auto schema = read_schema(session, tag);
+ auto schema = read_schema(sessionID, tag);
P("%s: %s: %s: schema", Tag, tag_, tag);
return schema;
}
+ private:
+ struct UpdatePreparedStatementRequest {
+ UpdatePreparedStatementRequest(uint64_t sessionID, std::string
handle)
+ : sessionID(sessionID),
+ handle(std::move(handle)),
+ processing(false),
+ finished(false),
+ nUpdatedRecords(0),
+ errorMessage(std::nullopt)
+ {
+ }
+ uint64_t sessionID;
+ std::string handle;
+ bool processing;
+ bool finished;
+ int64_t nUpdatedRecords;
+ std::optional<std::string> errorMessage;
+ };
+
+ std::list<std::shared_ptr<UpdatePreparedStatementRequest>>
+ updatePreparedStatementRequests_;
+
+ // Can use PostgreSQL API.
+ void process_update_prepared_statement_requests()
+ {
+#ifdef AFS_VERBOSE
+ const char* tag = "update prepared statement";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto it = updatePreparedStatementRequests_.begin();
+ it != updatePreparedStatementRequests_.end();)
+ {
+ auto& request = *it;
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_, &(request->sessionID),
false));
+ if (!session)
+ {
+ request->finished = true;
+ request->errorMessage =
+ std::string("stolen session: ") +
std::to_string(request->sessionID);
+ it = updatePreparedStatementRequests_.erase(it);
+ continue;
+ }
+ if (request->processing)
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ if (DsaPointerIsValid(session->errorMessage))
+ {
+ request->processing = false;
+ request->finished = true;
+ request->errorMessage =
static_cast<const char*>(
+ dsa_get_address(area_,
session->errorMessage));
+ it =
updatePreparedStatementRequests_.erase(it);
+ }
+ else if (session->nUpdatedRecords >= 0)
+ {
+ request->processing = false;
+ request->finished = true;
+ request->nUpdatedRecords =
session->nUpdatedRecords;
+ it =
updatePreparedStatementRequests_.erase(it);
+ }
+ else
+ {
+ ++it;
+ }
+ }
+ else
+ {
+ auto executorPID = session->executorPID;
+ {
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ ProcessorLockGuard lock(this);
+
set_shared_string(session->preparedStatementHandle, request->handle);
+ session->action =
Action::UpdatePreparedStatement;
+ session->nUpdatedRecords = -1;
+ }
+ P("%s: %s: %s: kill executor: %d", Tag, tag_,
tag, executorPID);
+ kill(executorPID, SIGUSR1);
+ request->processing = true;
+ ++it;
+ }
+ }
+ }
+
+ public:
arrow::Result<int64_t> update_prepared_statement(
uint64_t sessionID,
const std::string& handle,
@@ -2510,21 +3401,18 @@ class Proxy : public WorkerProcessor {
#ifdef AFS_VERBOSE
const char* tag = "update prepared statement";
#endif
- auto session = find_session(sessionID);
- SessionReleaser sessionReleaser(sessions_, session);
- lock_acquire();
- set_shared_string(session->preparedStatementHandle, handle);
- session->action = Action::UpdatePreparedStatement;
- session->nUpdatedRecords = -1;
- lock_release();
- if (session->executorPID != InvalidPid)
+ auto request =
+
std::make_shared<UpdatePreparedStatementRequest>(sessionID, handle);
{
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ std::lock_guard<std::mutex> lock(mutex_);
+ updatePreparedStatementRequests_.push_back(request);
}
+ kill(MyProcPid, SIGUSR1);
+ auto session = localSessions_.find(sessionID)->second;
+ auto executorPID = session->peerPID;
{
ARROW_ASSIGN_OR_RAISE(const auto& schema,
reader->GetSchema());
- SharedRingBufferOutputStream output(this, session);
+ SharedRingBufferOutputStream output(this,
std::move(session));
auto options = arrow::ipc::IpcWriteOptions::Defaults();
options.emit_dictionary_deltas = true;
ARROW_ASSIGN_OR_RAISE(auto writer,
@@ -2539,103 +3427,258 @@ class Proxy : public WorkerProcessor {
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*(chunk.data)));
}
ARROW_RETURN_NOT_OK(writer->Close());
+ P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
executorPID);
+ kill(executorPID, SIGUSR1);
}
- if (session->executorPID != InvalidPid)
{
- P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
session->executorPID);
- kill(session->executorPID, SIGUSR1);
+ std::unique_lock<std::mutex> lock(mutex_);
+ conditionVariable_.wait(lock, [&] {
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
+ });
+ }
+ if (request->errorMessage.has_value())
+ {
+ return
arrow::Status::Invalid(request->errorMessage.value());
}
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::Invalid("interrupted");
+ }
+ P("%s: %s: %s: done: %" PRIu64, Tag, tag_, tag,
request->nUpdatedRecords);
+ return request->nUpdatedRecords;
+ }
+
+ private:
+ struct ReadRequest {
+ ReadRequest(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ size_t n,
+ void* output)
+ : session(session),
+ buffer(buffer),
+ n(n),
+ output(output),
+ finished(false),
+ readBytes(0)
+ {
+ }
+ LocalSessionData* session;
+ SharedRingBuffer* buffer;
+ size_t n;
+ void* output;
+ bool finished;
+ size_t readBytes;
+ };
+
+ std::list<std::shared_ptr<ReadRequest>> readRequests_;
+
+ // Can use PostgreSQL API.
+ void process_read_requests()
+ {
+#ifdef AFS_VERBOSE
+ const char* tag = "read";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto& request : readRequests_)
+ {
+ request->finished = true;
+ {
+ ProcessorLockGuard lock(this);
+ request->readBytes =
request->buffer->read(request->n, request->output);
+ }
+ P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
request->session->peerPID);
+ kill(request->session->peerPID, SIGUSR1);
+ }
+ readRequests_.clear();
+ }
+
+ public:
+ // Don't use PostgreSQL API.
+ arrow::Result<size_t> read(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ size_t n,
+ void* output) override
+ {
+ auto request = std::make_shared<ReadRequest>(session, buffer,
n, output);
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ readRequests_.push_back(request);
+ }
+ kill(MyProcPid, SIGUSR1);
{
std::unique_lock<std::mutex> lock(mutex_);
conditionVariable_.wait(lock, [&] {
- P("%s: %s: %s: wait", Tag, tag_, tag);
- return DsaPointerIsValid(session->errorMessage)
||
- session->nUpdatedRecords >= 0;
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
});
}
- if (DsaPointerIsValid(session->errorMessage))
+ if (INTERRUPTS_PENDING_CONDITION())
{
- return report_session_error(session);
+ return arrow::Status::IOError("interrupted");
}
- P("%s: %s: %s: done: %ld", Tag, tag_, tag,
session->nUpdatedRecords);
- return session->nUpdatedRecords;
+ return request->readBytes;
}
- protected:
- pid_t peer_pid(SessionData* session) override { return
session->executorPID; }
+ private:
+ struct WriteRequest {
+ WriteRequest(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ const void* data,
+ size_t n)
+ : session(session),
+ buffer(buffer),
+ data(data),
+ n(n),
+ finished(false),
+ writtenBytes(0),
+ errorMessage(std::nullopt)
+ {
+ }
+ LocalSessionData* session;
+ SharedRingBuffer* buffer;
+ const void* data;
+ size_t n;
+ bool finished;
+ size_t writtenBytes;
+ std::optional<std::string> errorMessage;
+ };
- const char* peer_name(SessionData* session) override { return
"executor"; }
+ std::list<std::shared_ptr<WriteRequest>> writeRequests_;
- private:
- SessionData* create_session(const std::string& databaseName,
- const std::string& userName,
- const std::string& password,
- const std::string& clientAddress)
+ // Can use PostgreSQL API.
+ void process_write_requests()
{
- lock_acquire();
- uint64_t id = 0;
- SessionData* session = nullptr;
- do
+#ifdef AFS_VERBOSE
+ const char* tag = "write";
+#endif
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto& request : writeRequests_)
{
- id = randomEngine_();
- if (id == 0)
+ auto session = static_cast<SharedSessionData*>(
+ dshash_find(sessions_, &(request->session->id),
false));
+ if (!session)
{
+ request->finished = true;
+ request->errorMessage = std::string("stolen
session: ") +
+
std::to_string(request->session->id);
continue;
}
- bool found = false;
- session =
-
static_cast<SessionData*>(dshash_find_or_insert(sessions_, &id, &found));
- if (!found)
{
- break;
+ SharedSessionReleaser
sessionReleaser(sessions_, session);
+ if (DsaPointerIsValid(session->errorMessage))
+ {
+ request->errorMessage =
static_cast<const char*>(
+ dsa_get_address(area_,
session->errorMessage));
+ }
+ else
+ {
+ ProcessorLockGuard lock(this);
+ request->writtenBytes =
+
request->buffer->write(request->data, request->n);
+ }
+ request->finished = true;
}
- } while (true);
- session_data_initialize(
- session, area_, databaseName, userName, password,
clientAddress);
- lock_release();
- return session;
+ P("%s: %s: %s: kill executor: %d", Tag, tag_, tag,
request->session->peerPID);
+ kill(request->session->peerPID, SIGUSR1);
+ }
+ writeRequests_.clear();
}
- SessionData* find_session(uint64_t sessionID)
+ public:
+ // Don't use PostgreSQL API.
+ arrow::Result<size_t> write(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ const void* data,
+ size_t n) override
{
- return static_cast<SessionData*>(dshash_find(sessions_,
&sessionID, false));
+ auto request = std::make_shared<WriteRequest>(session, buffer,
data, n);
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ writeRequests_.push_back(request);
+ }
+ kill(MyProcPid, SIGUSR1);
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ conditionVariable_.wait(lock, [&] {
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return true;
+ }
+ return request->finished;
+ });
+ }
+ if (request->errorMessage.has_value())
+ {
+ return
arrow::Status::IOError(request->errorMessage.value());
+ }
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::IOError("interrupted");
+ }
+ return request->writtenBytes;
}
- arrow::Status report_session_error(SessionData* session)
+ private:
+ // Can use PostgreSQL API.
+ void delete_finished_sessions()
{
- auto status = arrow::Status::Invalid(
- static_cast<const char*>(dsa_get_address(area_,
session->errorMessage)));
- P("%s: %s: %s: kill SIGTERM executor: %d",
- Tag,
- tag_,
- AFS_FUNC,
- session->executorPID);
- kill(session->executorPID, SIGTERM);
- return status;
+ dshash_seq_status sessionsStatus;
+ dshash_seq_init(&sessionsStatus, sessions_, false);
+ SharedSessionData* session;
+ while (
+ (session =
static_cast<SharedSessionData*>(dshash_seq_next(&sessionsStatus))))
+ {
+ if (!session->finished)
+ {
+ continue;
+ }
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+
localSessions_.erase(localSessions_.find(session->id));
+ }
+ shared_session_data_finalize(session, area_);
+ dshash_delete_current(&sessionsStatus);
+ }
+ dshash_seq_term(&sessionsStatus);
}
- arrow::Result<std::shared_ptr<arrow::Schema>> read_schema(SessionData*
session,
- const char*
tag)
+ protected:
+ pid_t peer_pid(LocalSessionData* session) override { return
session->peerPID; }
+
+ const char* peer_name() override { return "executor"; }
+
+ arrow::Status wait_internal(LocalSessionData* session,
+ SharedRingBuffer* buffer,
+ WaitMode mode,
+ const char* tag) override
{
- auto input =
std::make_shared<SharedRingBufferInputStream>(this, session);
- // Read schema only stream format data.
- ARROW_ASSIGN_OR_RAISE(auto reader,
-
arrow::ipc::RecordBatchStreamReader::Open(input));
- while (true)
- {
- std::shared_ptr<arrow::RecordBatch> recordBatch;
- P("%s: %s: %s: read next", Tag, tag_, tag);
- ARROW_RETURN_NOT_OK(reader->ReadNext(&recordBatch));
- if (!recordBatch)
+ std::unique_lock<std::mutex> lock(mutex_);
+ conditionVariable_.wait(lock, [&] {
+ P("%s: %s: %s: %s: wait: %" PRIsize,
+ Tag,
+ tag_,
+ tag,
+ peer_name(),
+ get_waiting_buffer_size(buffer, mode));
+ if (INTERRUPTS_PENDING_CONDITION())
{
- break;
+ return true;
}
+ return get_waiting_buffer_size(buffer, mode) > 0;
+ });
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::Invalid(tag_, ": ", tag, ": ",
"interrupted");
}
- return reader->schema();
+ return arrow::Status::OK();
}
-
- std::random_device randomSeed_;
- std::mt19937_64 randomEngine_;
};
arrow::Result<int64_t>
@@ -2646,13 +3689,30 @@ SharedRingBufferInputStream::Read(int64_t nBytes, void*
out)
return arrow::Status::IOError(std::string(Tag) + ": " +
processor_->tag() +
": SharedRingBufferInputStream is
closed");
}
- auto buffer = processor_->create_shared_ring_buffer(session_);
size_t rest = static_cast<size_t>(nBytes);
+ SharedRingBuffer buffer(
+ localSession_->bufferData, localSession_->bufferAddress,
processor_->tag());
while (true)
{
- processor_->lock_acquire();
- auto readBytes = buffer.read(rest, out);
- processor_->lock_release();
+ ARROW_ASSIGN_OR_RAISE(auto readBytes,
+ processor_->read(localSession_.get(),
&buffer, rest, out));
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::IOError(std::string(Tag) + ": " +
processor_->tag() +
+ ": interrupted");
+ }
+
+ if (readBytes == 0)
+ {
+ ARROW_RETURN_NOT_OK(processor_->wait(
+ localSession_.get(), &buffer,
Processor::WaitMode::Written));
+ if (INTERRUPTS_PENDING_CONDITION())
+ {
+ return arrow::Status::IOError(std::string(Tag)
+ ": " +
+ processor_->tag()
+ ": interrupted");
+ }
+ continue;
+ }
position_ += readBytes;
rest -= readBytes;
@@ -2661,21 +3721,13 @@ SharedRingBufferInputStream::Read(int64_t nBytes, void*
out)
{
break;
}
-
- ARROW_RETURN_NOT_OK(
- processor_->wait(session_, &buffer,
Processor::WaitMode::Written));
- if (INTERRUPTS_PENDING_CONDITION())
- {
- return arrow::Status::IOError(std::string(Tag) + ": " +
processor_->tag() +
- ": interrupted");
- }
}
return nBytes;
}
class MainProcessor : public Processor {
public:
- MainProcessor() : Processor("main", true), sessions_(nullptr)
+ MainProcessor() : Processor("main"), sessions_(nullptr),
executorHandles_()
{
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
bool found;
@@ -2726,18 +3778,47 @@ class MainProcessor : public Processor {
return handle;
}
- void process_connect_requests()
+ void signaled() { update_sessions(); }
+
+ private:
+ void update_sessions()
{
+ const char* tag = "update sessions";
+ P("%s: %s: %s: start", Tag, tag_, tag);
dshash_seq_status sessionsStatus;
dshash_seq_init(&sessionsStatus, sessions_, false);
- SessionData* session;
- while ((session =
static_cast<SessionData*>(dshash_seq_next(&sessionsStatus))))
+ while (true)
{
- if (session->initialized)
+ auto session =
+
static_cast<SharedSessionData*>(dshash_seq_next(&sessionsStatus));
+ if (!session)
+ {
+ break;
+ }
+
+ if (session->started)
{
+ if (!session->finished)
+ {
+ try
+ {
+ auto handle =
executorHandles_.at(session->id);
+ pid_t pid;
+ if
(GetBackgroundWorkerPid(handle, &pid) == BGWH_STOPPED)
+ {
+ // A finished session
+ session->finished =
true;
+
executorHandles_.erase(session->id);
+ }
+ } catch (const std::exception&
exception)
+ {
+ session->finished = true;
+ }
+ }
continue;
}
+ // A new session request
BackgroundWorker worker = {};
snprintf(
worker.bgw_name, BGW_MAXLEN, "%s: executor: %"
PRIu64, Tag, session->id);
@@ -2752,24 +3833,34 @@ class MainProcessor : public Processor {
worker.bgw_main_arg = Int64GetDatum(session->id);
worker.bgw_notify_pid = MyProcPid;
BackgroundWorkerHandle* handle;
+ P("%s: %s: %s: register executor: %" PRIu64, Tag, tag_,
tag, session->id);
+ session->started = true;
if (RegisterDynamicBackgroundWorker(&worker, &handle))
{
+ executorHandles_[session->id] = handle;
WaitForBackgroundWorkerStartup(handle,
&(session->executorPID));
+ P("%s: %s: %s: started executor: %" PRIu64,
Tag, tag_, tag, session->id);
}
else
{
+ P("%s: %s: %s: failed to start executor: %"
PRIu64,
+ Tag,
+ tag_,
+ tag,
+ session->id);
set_shared_string(
session->errorMessage,
- std::string(Tag) + ": " + tag_ +
+ std::string(Tag) + ": " + tag_ + ": " +
tag +
": failed to start executor: "
+ std::to_string(session->id));
}
}
dshash_seq_term(&sessionsStatus);
+ P("%s: %s: %s: end", Tag, tag_, tag);
kill(sharedData_->serverPID, SIGUSR1);
}
- private:
dshash_table* sessions_;
+ std::map<uint64_t, BackgroundWorkerHandle*> executorHandles_;
};
class HeaderAuthServerMiddleware : public arrow::flight::ServerMiddleware {
@@ -2896,6 +3987,8 @@ class HeaderAuthServerMiddlewareFactory : public
arrow::flight::ServerMiddleware
Proxy* proxy_;
};
+// Flight SQL server doesn't communicate an "executor" process
+// directly. The Proxy object communicates an "executor" process.
class FlightSQLServer : public arrow::flight::sql::FlightSqlServerBase {
public:
explicit FlightSQLServer(Proxy* proxy)
@@ -2928,7 +4021,7 @@ class FlightSQLServer : public
arrow::flight::sql::FlightSqlServerBase {
const arrow::flight::sql::StatementQueryTicket& command)
override
{
ARROW_ASSIGN_OR_RAISE(auto sessionID, session_id(context));
- ARROW_ASSIGN_OR_RAISE(auto reader, proxy_->read(sessionID));
+ ARROW_ASSIGN_OR_RAISE(auto reader,
proxy_->open_reader(sessionID));
return
std::make_unique<arrow::flight::RecordBatchStream>(reader);
}
@@ -3067,8 +4160,14 @@ afs_server_internal(Proxy* proxy)
ProcessConfigFile(PGC_SIGHUP);
}
- if (GotSIGUSR1)
+ while (GotSIGUSR1)
{
+ // SIGUSR1 is emitted by:
+ // * The "main" process notifies a new "executor"
process is spawned
+ // * The "main" process notifies a "executor" process
is finished
+ // (may be crashed)
+ // * The "executor" process requests a read/write
+ // * The "executor" process notifies a request is
finished
GotSIGUSR1 = false;
proxy->signaled();
}
@@ -3131,8 +4230,10 @@ afs_executor(Datum arg)
ProcessConfigFile(PGC_SIGHUP);
}
- if (GotSIGUSR1)
+ while (GotSIGUSR1)
{
+ // SIGUSR1 is emitted by:
+ // * The "server" process requests a new request
GotSIGUSR1 = false;
executor->signaled();
}
@@ -3194,10 +4295,14 @@ afs_main(Datum arg)
ProcessConfigFile(PGC_SIGHUP);
}
- if (GotSIGUSR1)
+ while (GotSIGUSR1)
{
+ // SIGUSR1 is emitted by:
+ // * The "server" process is exited (may be
crashed)
+ // * The "executor" process is exited (may be
crashed)
+ // * The "server" process requests a new session
GotSIGUSR1 = false;
- processor.process_connect_requests();
+ processor.signaled();
}
CHECK_FOR_INTERRUPTS();
@@ -3254,7 +4359,7 @@ _PG_init(void)
NULL,
NULL);
-#ifdef PGRN_HAVE_SHMEM_REQUEST_HOOK
+#ifdef AFS_HAVE_SHMEM_REQUEST_HOOK
PreviousShmemRequestHook = shmem_request_hook;
shmem_request_hook = afs_shmem_request_hook;
#else
diff --git a/test/helper/sandbox.rb b/test/helper/sandbox.rb
index 3459d5f..bbbf9b9 100644
--- a/test/helper/sandbox.rb
+++ b/test/helper/sandbox.rb
@@ -161,7 +161,7 @@ module Helper
end
conf.puts("logging_collector = on")
conf.puts("log_filename = '#{@log_base_name}'")
- conf.puts("log_min_messages = debug5") if ENV["AFS_DEBUG"] == "yes"
+ conf.puts("log_min_messages = debug5") if ENV["AFS_VERBOSE"] == "yes"
conf.puts("shared_preload_libraries = " +
"'#{shared_preload_libraries.join(",")}'")
conf.puts("arrow_flight_sql.uri = '#{@flight_sql_uri}'")