bkietz commented on a change in pull request #9768: URL: https://github.com/apache/arrow/pull/9768#discussion_r616869161
########## File path: cpp/src/arrow/engine/key_encode.cc ########## @@ -0,0 +1,1604 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/key_encode.h" + +#include <memory.h> +#include <algorithm> + +#include "arrow/engine/util.h" +#include "arrow/util/bit_util.h" + +namespace arrow { +namespace compute { + +KeyEncoder::KeyRowArray::KeyRowArray() + : pool_(nullptr), rows_capacity_(0), bytes_capacity_(0) {} + +Status KeyEncoder::KeyRowArray::Init(MemoryPool* pool, const KeyRowMetadata& metadata) { + pool_ = pool; + metadata_ = metadata; + + ARROW_DCHECK(!null_masks_ && !offsets_ && !rows_); + + constexpr int64_t rows_capacity = 8; + constexpr int64_t bytes_capacity = 1024; + + // Null masks + ARROW_ASSIGN_OR_RAISE(auto null_masks, + AllocateResizableBuffer(size_null_masks(rows_capacity), pool_)); + null_masks_ = std::move(null_masks); + memset(null_masks_->mutable_data(), 0, size_null_masks(rows_capacity)); + + // Offsets and rows + if (!metadata.is_fixed_length) { + ARROW_ASSIGN_OR_RAISE(auto offsets, + AllocateResizableBuffer(size_offsets(rows_capacity), pool_)); + offsets_ = std::move(offsets); + memset(offsets_->mutable_data(), 0, size_offsets(rows_capacity)); + reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0; + + ARROW_ASSIGN_OR_RAISE( + auto rows, + AllocateResizableBuffer(size_rows_varying_length(bytes_capacity), pool_)); + rows_ = std::move(rows); + memset(rows_->mutable_data(), 0, size_rows_varying_length(bytes_capacity)); + bytes_capacity_ = size_rows_varying_length(bytes_capacity) - padding_for_vectors; + } else { + ARROW_ASSIGN_OR_RAISE( + auto rows, AllocateResizableBuffer(size_rows_fixed_length(rows_capacity), pool_)); + rows_ = std::move(rows); + memset(rows_->mutable_data(), 0, size_rows_fixed_length(rows_capacity)); + bytes_capacity_ = size_rows_fixed_length(rows_capacity) - padding_for_vectors; + } + + update_buffer_pointers(); + + rows_capacity_ = rows_capacity; + + num_rows_ = 0; + num_rows_for_has_any_nulls_ = 0; + has_any_nulls_ = false; + + return Status::OK(); +} + +void KeyEncoder::KeyRowArray::Clean() { + num_rows_ = 0; + num_rows_for_has_any_nulls_ = 0; + has_any_nulls_ = false; + + if (!metadata_.is_fixed_length) { + reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0; + } +} + +int64_t KeyEncoder::KeyRowArray::size_null_masks(int64_t num_rows) { + return num_rows * metadata_.null_masks_bytes_per_row + padding_for_vectors; +} + +int64_t KeyEncoder::KeyRowArray::size_offsets(int64_t num_rows) { + return (num_rows + 1) * sizeof(uint32_t) + padding_for_vectors; +} + +int64_t KeyEncoder::KeyRowArray::size_rows_fixed_length(int64_t num_rows) { + return num_rows * metadata_.fixed_length + padding_for_vectors; +} + +int64_t KeyEncoder::KeyRowArray::size_rows_varying_length(int64_t num_bytes) { + return num_bytes + padding_for_vectors; +} + +void KeyEncoder::KeyRowArray::update_buffer_pointers() { + buffers_[0] = mutable_buffers_[0] = null_masks_->mutable_data(); + if (metadata_.is_fixed_length) { + buffers_[1] = mutable_buffers_[1] = rows_->mutable_data(); + buffers_[2] = mutable_buffers_[2] = nullptr; + } else { + buffers_[1] = mutable_buffers_[1] = offsets_->mutable_data(); + buffers_[2] = mutable_buffers_[2] = rows_->mutable_data(); + } +} + +Status KeyEncoder::KeyRowArray::ResizeFixedLengthBuffers(int64_t num_extra_rows) { + if (rows_capacity_ >= num_rows_ + num_extra_rows) { + return Status::OK(); + } + + int64_t rows_capacity_new = std::max(static_cast<int64_t>(1), 2 * rows_capacity_); + while (rows_capacity_new < num_rows_ + num_extra_rows) { + rows_capacity_new *= 2; + } + + // Null masks + RETURN_NOT_OK(null_masks_->Resize(size_null_masks(rows_capacity_new), false)); + memset(null_masks_->mutable_data() + size_null_masks(rows_capacity_), 0, + size_null_masks(rows_capacity_new) - size_null_masks(rows_capacity_)); + + // Either offsets or rows + if (!metadata_.is_fixed_length) { + RETURN_NOT_OK(offsets_->Resize(size_offsets(rows_capacity_new), false)); + memset(offsets_->mutable_data() + size_offsets(rows_capacity_), 0, + size_offsets(rows_capacity_new) - size_offsets(rows_capacity_)); + } else { + RETURN_NOT_OK(rows_->Resize(size_rows_fixed_length(rows_capacity_new), false)); + memset(rows_->mutable_data() + size_rows_fixed_length(rows_capacity_), 0, + size_rows_fixed_length(rows_capacity_new) - + size_rows_fixed_length(rows_capacity_)); + bytes_capacity_ = size_rows_fixed_length(rows_capacity_new) - padding_for_vectors; + } + + update_buffer_pointers(); + + rows_capacity_ = rows_capacity_new; + + return Status::OK(); +} + +Status KeyEncoder::KeyRowArray::ResizeOptionalVaryingLengthBuffer( + int64_t num_extra_bytes) { + int64_t num_bytes = get_offsets()[num_rows_]; + if (bytes_capacity_ >= num_bytes + num_extra_bytes || metadata_.is_fixed_length) { + return Status::OK(); + } + + int64_t bytes_capacity_new = std::max(static_cast<int64_t>(1), 2 * bytes_capacity_); + while (bytes_capacity_new < num_bytes + num_extra_bytes) { + bytes_capacity_new *= 2; + } + + RETURN_NOT_OK(rows_->Resize(size_rows_varying_length(bytes_capacity_new), false)); + memset(rows_->mutable_data() + size_rows_varying_length(bytes_capacity_), 0, + size_rows_varying_length(bytes_capacity_new) - + size_rows_varying_length(bytes_capacity_)); + + update_buffer_pointers(); + + bytes_capacity_ = bytes_capacity_new; + + return Status::OK(); +} + +Status KeyEncoder::KeyRowArray::AppendSelectionFrom(const KeyRowArray& from, + uint32_t num_rows_to_append, + const uint16_t* source_row_ids) { + ARROW_DCHECK(metadata_.is_fixed_length == from.get_metadata().is_fixed_length && + metadata_.fixed_length == from.get_metadata().fixed_length && + metadata_.cumulative_lengths_length == + from.get_metadata().cumulative_lengths_length && + metadata_.null_masks_bytes_per_row == + from.get_metadata().null_masks_bytes_per_row); + + RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append)); + + if (!metadata_.is_fixed_length) { + // Varying-length rows + const uint32_t* from_offsets = + reinterpret_cast<const uint32_t*>(from.offsets_->data()); + uint32_t* to_offsets = reinterpret_cast<uint32_t*>(offsets_->mutable_data()); + uint32_t total_length = to_offsets[num_rows_]; + uint32_t total_length_to_append = 0; + for (uint32_t i = 0; i < num_rows_to_append; ++i) { + uint16_t row_id = source_row_ids[i]; + uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id]; + total_length_to_append += length; + to_offsets[num_rows_ + i + 1] = total_length + total_length_to_append; + } + + RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(total_length_to_append)); + + const uint8_t* src = from.rows_->data(); + uint8_t* dst = rows_->mutable_data() + total_length; + for (uint32_t i = 0; i < num_rows_to_append; ++i) { + uint16_t row_id = source_row_ids[i]; + uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id]; + const uint64_t* src64 = + reinterpret_cast<const uint64_t*>(src + from_offsets[row_id]); + uint64_t* dst64 = reinterpret_cast<uint64_t*>(dst); + for (uint32_t j = 0; j < (length + 7) / 8; ++j) { + dst64[j] = src64[j]; + } + dst += length; + } + } else { + // Fixed-length rows + const uint8_t* src = from.rows_->data(); + uint8_t* dst = rows_->mutable_data() + num_rows_ * metadata_.fixed_length; + for (uint32_t i = 0; i < num_rows_to_append; ++i) { + uint16_t row_id = source_row_ids[i]; + uint32_t length = metadata_.fixed_length; + const uint64_t* src64 = reinterpret_cast<const uint64_t*>(src + length * row_id); + uint64_t* dst64 = reinterpret_cast<uint64_t*>(dst); + for (uint32_t j = 0; j < (length + 7) / 8; ++j) { + dst64[j] = src64[j]; + } + dst += length; + } + } + + // Null masks + uint32_t byte_length = metadata_.null_masks_bytes_per_row; + uint64_t dst_byte_offset = num_rows_ * byte_length; + const uint8_t* src_base = from.null_masks_->data(); + uint8_t* dst_base = null_masks_->mutable_data(); + for (uint32_t i = 0; i < num_rows_to_append; ++i) { + uint32_t row_id = source_row_ids[i]; + int64_t src_byte_offset = row_id * byte_length; + const uint8_t* src = src_base + src_byte_offset; + uint8_t* dst = dst_base + dst_byte_offset; + for (uint32_t ibyte = 0; ibyte < byte_length; ++ibyte) { + dst[ibyte] = src[ibyte]; + } + dst_byte_offset += byte_length; + } + + num_rows_ += num_rows_to_append; + + return Status::OK(); +} + +Status KeyEncoder::KeyRowArray::AppendEmpty(uint32_t num_rows_to_append, + uint32_t num_extra_bytes_to_append) { + RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append)); + RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(num_extra_bytes_to_append)); + num_rows_ += num_rows_to_append; + return Status::OK(); +} + +bool KeyEncoder::KeyRowArray::has_any_nulls(const KeyEncoderContext* ctx) const { + if (has_any_nulls_) { + return true; + } + if (num_rows_for_has_any_nulls_ < num_rows_) { + auto size_per_row = get_metadata().null_masks_bytes_per_row; + has_any_nulls_ = !util::BitUtil::are_all_bytes_zero( + ctx->instr, get_null_masks() + size_per_row * num_rows_for_has_any_nulls_, + static_cast<uint32_t>(size_per_row * (num_rows_ - num_rows_for_has_any_nulls_))); + num_rows_for_has_any_nulls_ = num_rows_; + } + return has_any_nulls_; +} + +KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, + const KeyColumnArray& left, + const KeyColumnArray& right, + int buffer_id_to_replace) { + metadata_ = metadata; + length_ = left.get_length(); + for (int i = 0; i < max_buffers_; ++i) { + buffers_[i] = left.buffers_[i]; + mutable_buffers_[i] = left.mutable_buffers_[i]; + } + buffers_[buffer_id_to_replace] = right.buffers_[buffer_id_to_replace]; + mutable_buffers_[buffer_id_to_replace] = right.mutable_buffers_[buffer_id_to_replace]; +} + +KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, + int64_t length, const uint8_t* buffer0, + const uint8_t* buffer1, + const uint8_t* buffer2) { + metadata_ = metadata; + length_ = length; + buffers_[0] = buffer0; + buffers_[1] = buffer1; + buffers_[2] = buffer2; + mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr; +} + +KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, + int64_t length, uint8_t* buffer0, + uint8_t* buffer1, uint8_t* buffer2) { + metadata_ = metadata; + length_ = length; + buffers_[0] = mutable_buffers_[0] = buffer0; + buffers_[1] = mutable_buffers_[1] = buffer1; + buffers_[2] = mutable_buffers_[2] = buffer2; +} + +KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnArray& from, int64_t start, + int64_t length) { + ARROW_DCHECK((start % 8) == 0); + metadata_ = from.metadata_; + length_ = length; + uint32_t fixed_size = + !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length; + + buffers_[0] = from.buffers_[0] ? from.buffers_[0] + start / 8 : nullptr; + mutable_buffers_[0] = + from.mutable_buffers_[0] ? from.mutable_buffers_[0] + start / 8 : nullptr; + + if (fixed_size == 0) { + buffers_[1] = from.buffers_[1] ? from.buffers_[1] + start / 8 : nullptr; + mutable_buffers_[1] = + from.mutable_buffers_[1] ? from.mutable_buffers_[1] + start / 8 : nullptr; + } else { + buffers_[1] = from.buffers_[1] ? from.buffers_[1] + start * fixed_size : nullptr; + mutable_buffers_[1] = from.mutable_buffers_[1] + ? from.mutable_buffers_[1] + start * fixed_size + : nullptr; + } + + buffers_[2] = from.buffers_[2]; + mutable_buffers_[2] = from.mutable_buffers_[2]; +} + +KeyEncoder::KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace( + const KeyColumnArray& column, const KeyColumnArray& temp) { + // Make sure that the temp buffer is large enough + ARROW_DCHECK(temp.get_length() >= column.get_length() && + temp.get_metadata().is_fixed_length && + temp.get_metadata().fixed_length >= sizeof(uint8_t)); + KeyColumnMetadata metadata; + metadata.is_fixed_length = true; + metadata.fixed_length = sizeof(uint8_t); + constexpr int buffer_index = 1; + KeyColumnArray result = KeyColumnArray(metadata, column, temp, buffer_index); + return result; +} + +void KeyEncoder::TransformBoolean::PreEncode(const KeyColumnArray& input, + KeyColumnArray& output, + KeyEncoderContext* ctx) { + // Make sure that metadata and lengths are compatible. + ARROW_DCHECK(output.get_metadata().is_fixed_length == + input.get_metadata().is_fixed_length); + ARROW_DCHECK(output.get_metadata().fixed_length == 1 && + input.get_metadata().fixed_length == 0); + ARROW_DCHECK(output.get_length() == input.get_length()); + constexpr int buffer_index = 1; + ARROW_DCHECK(input.data(buffer_index) != nullptr); + ARROW_DCHECK(output.mutable_data(buffer_index) != nullptr); + util::BitUtil::bits_to_bytes(ctx->instr, static_cast<int>(input.get_length()), + input.data(buffer_index), + output.mutable_data(buffer_index)); +} + +void KeyEncoder::TransformBoolean::PostDecode(const KeyColumnArray& input, + KeyColumnArray& output, + KeyEncoderContext* ctx) { + // Make sure that metadata and lengths are compatible. + ARROW_DCHECK(output.get_metadata().is_fixed_length == + input.get_metadata().is_fixed_length); + ARROW_DCHECK(output.get_metadata().fixed_length == 0 && + input.get_metadata().fixed_length == 1); + ARROW_DCHECK(output.get_length() == input.get_length()); + constexpr int buffer_index = 1; + ARROW_DCHECK(input.data(buffer_index) != nullptr); + ARROW_DCHECK(output.mutable_data(buffer_index) != nullptr); + + util::BitUtil::bytes_to_bits(ctx->instr, static_cast<int>(input.get_length()), + input.data(buffer_index), + output.mutable_data(buffer_index)); +} + +bool KeyEncoder::EncoderInteger::IsBoolean(const KeyColumnMetadata& metadata) { + return metadata.is_fixed_length && metadata.fixed_length == 0; +} + +bool KeyEncoder::EncoderInteger::UsesTransform(const KeyColumnArray& column) { + return IsBoolean(column.get_metadata()); +} + +KeyEncoder::KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace( + const KeyColumnArray& column, KeyColumnArray& temp) { + if (IsBoolean(column.get_metadata())) { + return TransformBoolean::ArrayReplace(column, temp); + } + return column; +} + +void KeyEncoder::EncoderInteger::PreEncode(const KeyColumnArray& input, + KeyColumnArray& output, + KeyEncoderContext* ctx) { + if (IsBoolean(input.get_metadata())) { + TransformBoolean::PreEncode(input, output, ctx); + } +} + +void KeyEncoder::EncoderInteger::PostDecode(const KeyColumnArray& input, + KeyColumnArray& output, + KeyEncoderContext* ctx) { + if (IsBoolean(output.get_metadata())) { + TransformBoolean::PostDecode(input, output, ctx); + } +} + +void KeyEncoder::EncoderInteger::Encode(uint32_t* offset_within_row, KeyRowArray& rows, + const KeyColumnArray& col, KeyEncoderContext* ctx, + KeyColumnArray& temp) { + KeyColumnArray col_prep; + if (UsesTransform(col)) { + col_prep = ArrayReplace(col, temp); + PreEncode(col, col_prep, ctx); + } else { + col_prep = col; + } + + uint32_t num_rows = static_cast<uint32_t>(col.get_length()); + + // When we have a single fixed length column we can just do memcpy + if (rows.get_metadata().is_fixed_length && + rows.get_metadata().fixed_length == col.get_metadata().fixed_length) { + ARROW_DCHECK(*offset_within_row == 0); + uint32_t row_size = col.get_metadata().fixed_length; + memcpy(rows.mutable_data(1), col.data(1), num_rows * row_size); + } else if (rows.get_metadata().is_fixed_length) { + uint32_t row_size = rows.get_metadata().fixed_length; + uint8_t* row_base = rows.mutable_data(1) + *offset_within_row; + const uint8_t* col_base = col_prep.data(1); + switch (col_prep.get_metadata().fixed_length) { + case 1: + for (uint32_t i = 0; i < num_rows; ++i) { + row_base[i * row_size] = col_base[i]; + } + break; + case 2: + for (uint32_t i = 0; i < num_rows; ++i) { + *reinterpret_cast<uint16_t*>(row_base + i * row_size) = + reinterpret_cast<const uint16_t*>(col_base)[i]; + } Review comment: There's a lot of unaligned accesses like this one. This is undefined behavior in C++ and it's not supported on all platforms. Could we use SafeLoadAs and SafeStore? If those produce a performance regression, can we optimize them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
