pitrou commented on a change in pull request #7507:
URL: https://github.com/apache/arrow/pull/7507#discussion_r567756770
##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -74,6 +75,220 @@ class ArrayDataWrapper {
std::shared_ptr<Array>* out_;
};
+class ArrayDataEndianSwapper {
+ public:
+ ArrayDataEndianSwapper(std::shared_ptr<ArrayData>& data, int64_t length,
+ std::shared_ptr<ArrayData>* out)
+ : data_(data), length_(length), out_(out) {}
+
+ Status SwapType(const DataType& type) {
+ RETURN_NOT_OK(VisitTypeInline(type, this));
+ RETURN_NOT_OK(SwapChildren(type.fields()));
+ return Status::OK();
+ }
+
+ Status SwapChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+ int i = 0;
+ for (const auto& child_field : child_fields) {
+ ARROW_ASSIGN_OR_RAISE(
+ (*out_)->child_data[i],
+ SwapEndianArrayData(data_->child_data[i],
child_field.get()->type()));
+ i++;
+ }
+ return Status::OK();
+ }
+
+ template <typename T>
+ Result<std::shared_ptr<Buffer>> ByteSwapBuffer(std::shared_ptr<Buffer>&
in_buffer,
+ int64_t length, int64_t
extra_size) {
+ auto in_data = reinterpret_cast<const T*>(in_buffer->data());
+ ARROW_ASSIGN_OR_RAISE(auto out_buffer, AllocateBuffer(in_buffer->size()));
+ auto out_data = reinterpret_cast<T*>(out_buffer->mutable_data());
+ for (int64_t i = 0; i < length + extra_size; i++) {
+#if ARROW_LITTLE_ENDIAN
+ out_data[i] = BitUtil::FromBigEndian(in_data[i]);
+#else
+ out_data[i] = BitUtil::FromLittleEndian(in_data[i]);
+#endif
+ }
+ return std::move(out_buffer);
+ }
+
+ template <typename VALUE_TYPE>
+ Status SwapOffset(int index) {
+ if (data_->buffers[index] == nullptr || data_->buffers[index]->size() ==
0) {
+ (*out_)->buffers[index] = data_->buffers[index];
+ return Status::OK();
+ }
+ // offset has one more element rather than data->length
+ ARROW_ASSIGN_OR_RAISE((*out_)->buffers[index],
+ ByteSwapBuffer<VALUE_TYPE>(data_->buffers[index],
length_, 1));
+ return Status::OK();
+ }
+
+ Status SwapSmallOffset(int index = 1) { return SwapOffset<int32_t>(index); }
+
+ Status SwapLargeOffset() { return SwapOffset<int64_t>(1); }
+
+ template <typename T>
+ enable_if_t<std::is_base_of<FixedWidthType, T>::value &&
+ !std::is_base_of<FixedSizeBinaryType, T>::value &&
+ !std::is_base_of<DictionaryType, T>::value,
+ Status>
+ Visit(const T& type) {
+ using value_type = typename T::c_type;
+ ARROW_ASSIGN_OR_RAISE((*out_)->buffers[1],
+ ByteSwapBuffer<value_type>(data_->buffers[1],
length_, 0));
+ return Status::OK();
+ }
+
+ Status Visit(const Decimal128Type& type) {
+ auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+ ARROW_ASSIGN_OR_RAISE(auto new_buffer,
AllocateBuffer(data_->buffers[1]->size()));
+ auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+ int64_t length = length_;
+ for (int64_t i = 0; i < length; i++) {
+ uint64_t tmp;
+ auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+ tmp = BitUtil::FromBigEndian(data[idx]);
+ new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]);
+ new_data[idx + 1] = tmp;
+#else
+ tmp = BitUtil::FromLittleEndian(data[idx]);
+ new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]);
+ new_data[idx + 1] = tmp;
+#endif
+ }
+ (*out_)->buffers[1] = std::move(new_buffer);
+ return Status::OK();
+ }
+
+ Status Visit(const Decimal256Type& type) {
+ auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+ ARROW_ASSIGN_OR_RAISE(auto new_buffer,
AllocateBuffer(data_->buffers[1]->size()));
+ auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+ int64_t length = length_;
+ for (int64_t i = 0; i < length; i++) {
+ uint64_t tmp0, tmp1, tmp2;
+ auto idx = i * 4;
+#if ARROW_LITTLE_ENDIAN
+ tmp0 = BitUtil::FromBigEndian(data[idx]);
+ tmp1 = BitUtil::FromBigEndian(data[idx + 1]);
+ tmp2 = BitUtil::FromBigEndian(data[idx + 2]);
+ new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]);
+ new_data[idx + 1] = tmp2;
+ new_data[idx + 2] = tmp1;
+ new_data[idx + 3] = tmp0;
+#else
+ tmp0 = BitUtil::FromLittleEndian(data[idx]);
+ tmp1 = BitUtil::FromLittleEndian(data[idx + 1]);
+ tmp2 = BitUtil::FromLittleEndian(data[idx + 2]);
+ new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]);
+ new_data[idx + 1] = tmp2;
+ new_data[idx + 2] = tmp1;
+ new_data[idx + 3] = tmp0;
+#endif
+ }
+ (*out_)->buffers[1] = std::move(new_buffer);
+ return Status::OK();
+ }
+
+ Status Visit(const DayTimeIntervalType& type) {
+ ARROW_ASSIGN_OR_RAISE((*out_)->buffers[1],
+ ByteSwapBuffer<uint32_t>(data_->buffers[1], length_
* 2, 0));
+ return Status::OK();
+ }
+
+ Status CopyDataBuffer() {
+ if (data_->buffers[1]->data() == nullptr) {
+ return Status::OK();
+ }
+ ARROW_ASSIGN_OR_RAISE((*out_)->buffers[1],
+ data_->buffers[1]->CopySlice(0,
data_->buffers[1]->size()));
Review comment:
You can reuse existing buffers as long as you don't modify the contents.
When you want to modify the contents of a buffer (for example to byte-swap
it), you must do a copy.
##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -74,6 +75,220 @@ class ArrayDataWrapper {
std::shared_ptr<Array>* out_;
};
+class ArrayDataEndianSwapper {
+ public:
+ ArrayDataEndianSwapper(std::shared_ptr<ArrayData>& data, int64_t length,
+ std::shared_ptr<ArrayData>* out)
+ : data_(data), length_(length), out_(out) {}
+
+ Status SwapType(const DataType& type) {
+ RETURN_NOT_OK(VisitTypeInline(type, this));
+ RETURN_NOT_OK(SwapChildren(type.fields()));
+ return Status::OK();
+ }
+
+ Status SwapChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+ int i = 0;
+ for (const auto& child_field : child_fields) {
+ ARROW_ASSIGN_OR_RAISE(
+ (*out_)->child_data[i],
+ SwapEndianArrayData(data_->child_data[i],
child_field.get()->type()));
+ i++;
+ }
+ return Status::OK();
+ }
+
+ template <typename T>
+ Result<std::shared_ptr<Buffer>> ByteSwapBuffer(std::shared_ptr<Buffer>&
in_buffer,
+ int64_t length, int64_t
extra_size) {
+ auto in_data = reinterpret_cast<const T*>(in_buffer->data());
+ ARROW_ASSIGN_OR_RAISE(auto out_buffer, AllocateBuffer(in_buffer->size()));
+ auto out_data = reinterpret_cast<T*>(out_buffer->mutable_data());
+ for (int64_t i = 0; i < length + extra_size; i++) {
+#if ARROW_LITTLE_ENDIAN
+ out_data[i] = BitUtil::FromBigEndian(in_data[i]);
+#else
+ out_data[i] = BitUtil::FromLittleEndian(in_data[i]);
+#endif
+ }
+ return std::move(out_buffer);
+ }
+
+ template <typename VALUE_TYPE>
+ Status SwapOffset(int index) {
+ if (data_->buffers[index] == nullptr || data_->buffers[index]->size() ==
0) {
+ (*out_)->buffers[index] = data_->buffers[index];
+ return Status::OK();
+ }
+ // offset has one more element rather than data->length
+ ARROW_ASSIGN_OR_RAISE((*out_)->buffers[index],
+ ByteSwapBuffer<VALUE_TYPE>(data_->buffers[index],
length_, 1));
+ return Status::OK();
+ }
+
+ Status SwapSmallOffset(int index = 1) { return SwapOffset<int32_t>(index); }
+
+ Status SwapLargeOffset() { return SwapOffset<int64_t>(1); }
+
+ template <typename T>
+ enable_if_t<std::is_base_of<FixedWidthType, T>::value &&
+ !std::is_base_of<FixedSizeBinaryType, T>::value &&
+ !std::is_base_of<DictionaryType, T>::value,
+ Status>
+ Visit(const T& type) {
+ using value_type = typename T::c_type;
+ ARROW_ASSIGN_OR_RAISE((*out_)->buffers[1],
+ ByteSwapBuffer<value_type>(data_->buffers[1],
length_, 0));
+ return Status::OK();
+ }
+
+ Status Visit(const Decimal128Type& type) {
+ auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+ ARROW_ASSIGN_OR_RAISE(auto new_buffer,
AllocateBuffer(data_->buffers[1]->size()));
+ auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+ int64_t length = length_;
+ for (int64_t i = 0; i < length; i++) {
+ uint64_t tmp;
+ auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+ tmp = BitUtil::FromBigEndian(data[idx]);
+ new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]);
+ new_data[idx + 1] = tmp;
+#else
+ tmp = BitUtil::FromLittleEndian(data[idx]);
+ new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]);
+ new_data[idx + 1] = tmp;
+#endif
+ }
+ (*out_)->buffers[1] = std::move(new_buffer);
+ return Status::OK();
+ }
+
+ Status Visit(const Decimal256Type& type) {
+ auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+ ARROW_ASSIGN_OR_RAISE(auto new_buffer,
AllocateBuffer(data_->buffers[1]->size()));
+ auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+ int64_t length = length_;
+ for (int64_t i = 0; i < length; i++) {
+ uint64_t tmp0, tmp1, tmp2;
+ auto idx = i * 4;
+#if ARROW_LITTLE_ENDIAN
+ tmp0 = BitUtil::FromBigEndian(data[idx]);
+ tmp1 = BitUtil::FromBigEndian(data[idx + 1]);
+ tmp2 = BitUtil::FromBigEndian(data[idx + 2]);
+ new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]);
+ new_data[idx + 1] = tmp2;
+ new_data[idx + 2] = tmp1;
+ new_data[idx + 3] = tmp0;
+#else
+ tmp0 = BitUtil::FromLittleEndian(data[idx]);
+ tmp1 = BitUtil::FromLittleEndian(data[idx + 1]);
+ tmp2 = BitUtil::FromLittleEndian(data[idx + 2]);
+ new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]);
+ new_data[idx + 1] = tmp2;
+ new_data[idx + 2] = tmp1;
+ new_data[idx + 3] = tmp0;
+#endif
+ }
+ (*out_)->buffers[1] = std::move(new_buffer);
+ return Status::OK();
+ }
+
+ Status Visit(const DayTimeIntervalType& type) {
+ ARROW_ASSIGN_OR_RAISE((*out_)->buffers[1],
+ ByteSwapBuffer<uint32_t>(data_->buffers[1], length_
* 2, 0));
+ return Status::OK();
+ }
+
+ Status CopyDataBuffer() {
+ if (data_->buffers[1]->data() == nullptr) {
+ return Status::OK();
+ }
+ ARROW_ASSIGN_OR_RAISE((*out_)->buffers[1],
+ data_->buffers[1]->CopySlice(0,
data_->buffers[1]->size()));
Review comment:
You can reuse existing buffers as long as you don't modify their
contents.
When you want to modify the contents of a buffer (for example to byte-swap
it), you must do a copy.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]