WillAyd commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1339075802
##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
int64_t array_size_approx_bytes_;
};
+class PostgresCopyFieldWriter {
+ public:
+ void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+ virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError*
error) {
+ return ENOTSUP;
+ }
+
+ protected:
+ struct ArrowArrayView* array_view_;
+ std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+ void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+ int64_t child_i = static_cast<int64_t>(children_.size());
+ children_.push_back(std::move(child));
+ children_[child_i]->Init(array_view_->children[child_i]);
+ }
+
+ ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
+ if (index >= array_view_->length) {
+ return ENODATA;
+ }
+
+ const int16_t n_fields = children_.size();
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+ for (int16_t i = 0; i < n_fields; i++) {
+ children_[i]->Write(buffer, index, error);
+ }
+
+ return NANOARROW_OK;
+ }
+
+ private:
+ std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+ ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error)
override {
+ const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+ const int32_t field_size_bytes = is_null ? -1 : 1;
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes,
error));
+ if (is_null) {
+ return ADBC_STATUS_OK;
+ }
+
+ const int8_t value =
+ static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+ NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+ return ADBC_STATUS_OK;
+ }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType
arrow_type,
+ PostgresCopyFieldWriter** out,
+ ArrowError* error) {
+ switch (arrow_type) {
+ case NANOARROW_TYPE_BOOL:
+ *out = new PostgresCopyBooleanFieldWriter();
+ return NANOARROW_OK;
+ default:
+ return EINVAL;
+ }
+ return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+ ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+ ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+ schema_ = schema;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+ NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array,
nullptr));
+ root_writer_.Init(array_view_.get());
+ return NANOARROW_OK;
+ }
+
+ int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+ ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+ ArrowBufferAppend(buffer, kPgCopyBinarySignature,
sizeof(kPgCopyBinarySignature));
Review Comment:
Have to think more about this. I don't think you need 2 passes though? I
think this could be calculated up front
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]