kamcheungting-db commented on code in PR #624: URL: https://github.com/apache/iceberg-cpp/pull/624#discussion_r3179405389
########## src/iceberg/puffin/puffin_reader.cc: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include <algorithm> +#include <array> +#include <cstring> +#include <string_view> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes at the given offset. +Status CheckMagic(std::span<const std::byte> data, int64_t offset) { + if (offset < 0 || + offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) { + return Invalid("Invalid file: cannot read magic at offset {}", offset); + } + auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid("Invalid file: expected magic at offset {}", offset); + } + return {}; +} + +} // namespace + +PuffinReader::PuffinReader(std::span<const std::byte> data) : data_(data) {} + +Result<FileMetadata> PuffinReader::ReadFileMetadata() { + auto file_size = static_cast<int64_t>(data_.size()); + + if (file_size < PuffinFormat::kFooterStructLength) { + return Invalid("Invalid file: file length {} is less than minimal footer size {}", + file_size, PuffinFormat::kFooterStructLength); + } + + // Read footer struct from end of file + auto footer_struct_offset = file_size - PuffinFormat::kFooterStructLength; + + // Validate footer end magic + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(data_, footer_struct_offset + PuffinFormat::kFooterStructMagicOffset)); + + // Read payload size from footer struct + auto payload_size = ReadLittleEndian<int32_t>( + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructPayloadSizeOffset); + + if (payload_size < 0) { Review Comment: should `payload_size == 0` also be considered as an error. ########## src/iceberg/puffin/puffin_reader.cc: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include <algorithm> +#include <array> +#include <cstring> +#include <string_view> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes at the given offset. +Status CheckMagic(std::span<const std::byte> data, int64_t offset) { + if (offset < 0 || + offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) { + return Invalid("Invalid file: cannot read magic at offset {}", offset); + } + auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid("Invalid file: expected magic at offset {}", offset); + } + return {}; +} + +} // namespace + +PuffinReader::PuffinReader(std::span<const std::byte> data) : data_(data) {} + +Result<FileMetadata> PuffinReader::ReadFileMetadata() { + auto file_size = static_cast<int64_t>(data_.size()); + + if (file_size < PuffinFormat::kFooterStructLength) { + return Invalid("Invalid file: file length {} is less than minimal footer size {}", + file_size, PuffinFormat::kFooterStructLength); + } + + // Read footer struct from end of file + auto footer_struct_offset = file_size - PuffinFormat::kFooterStructLength; + + // Validate footer end magic + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(data_, footer_struct_offset + PuffinFormat::kFooterStructMagicOffset)); + + // Read payload size from footer struct + auto payload_size = ReadLittleEndian<int32_t>( + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructPayloadSizeOffset); + + if (payload_size < 0) { + return Invalid("Invalid file: negative payload size {}", payload_size); + } + + // Calculate total footer size and validate + int64_t footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast<int64_t>(payload_size) + + PuffinFormat::kFooterStructLength; + auto footer_offset = file_size - footer_size; + if (footer_offset < 0) { + return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size, + file_size); + } + + // Validate footer start magic + ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, footer_offset)); + + // Check flags for footer compression + std::array<uint8_t, 4> flags{}; + std::memcpy( + flags.data(), + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4); + + PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone; + if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { + footer_compression = PuffinFormat::kDefaultFooterCompressionCodec; + } + + // Extract footer payload + auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength; + std::span<const std::byte> payload_span(data_.data() + payload_offset, payload_size); + ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes, + Decompress(footer_compression, payload_span)); + + // Parse JSON + std::string_view json_str(reinterpret_cast<const char*>(payload_bytes.data()), + payload_bytes.size()); + ICEBERG_ASSIGN_OR_RAISE(auto file_metadata, FileMetadataFromJsonString(json_str)); + + // Validate header magic + ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, 0)); Review Comment: should we move this validation to the beginning of this method? ########## src/iceberg/puffin/puffin_writer.cc: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include <array> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(PuffinCompressionCodec default_codec) + : default_codec_(default_codec) {} + +void PuffinWriter::WriteHeader() { + if (header_written_) return; + const auto& magic = PuffinFormat::kMagicV1; + buffer_.insert(buffer_.end(), reinterpret_cast<const std::byte*>(magic.data()), + reinterpret_cast<const std::byte*>(magic.data() + magic.size())); Review Comment: ```suggestion const auto start_byte = magic.data(); buffer_.insert(buffer_.end(), reinterpret_cast<const std::byte*>(start_byte), reinterpret_cast<const std::byte*>(start_byte + magic.size())); ``` ########## src/iceberg/puffin/puffin_writer.h: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_writer.h +/// Puffin file writer. + +#include <cstddef> +#include <cstdint> +#include <optional> +#include <string> +#include <unordered_map> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg::puffin { + +/// \brief Writer for Puffin files. +/// +/// Builds a complete Puffin file in memory. Usage: +/// PuffinWriter writer; +/// writer.Add(blob1); +/// writer.Add(blob2); +/// auto result = writer.Finish({{"created-by", "iceberg-cpp"}}); +/// // result.value() contains the serialized file bytes +class ICEBERG_EXPORT PuffinWriter { + public: + /// \brief Construct a writer with the given default compression codec. + explicit PuffinWriter( + PuffinCompressionCodec default_codec = PuffinCompressionCodec::kNone); + + /// \brief Add a blob to be written. + /// \return The BlobMetadata for the written blob, or an error. + Result<BlobMetadata> Add(const Blob& blob); + + /// \brief Finalize the file and return the serialized bytes. + /// \param properties File-level properties to include in the footer. + /// \return The complete Puffin file as a byte vector, or an error. + Result<std::vector<std::byte>> Finish( + std::unordered_map<std::string, std::string> properties = {}); + + /// \brief Get metadata for all blobs written so far. + const std::vector<BlobMetadata>& written_blobs_metadata() const; + + /// \brief Get the footer size after Finish() has been called. + /// \return The footer size, or std::nullopt if Finish() has not been called. + std::optional<int64_t> footer_size() const; + + private: + PuffinCompressionCodec default_codec_; + std::vector<std::byte> buffer_; + std::vector<BlobMetadata> written_blobs_metadata_; + bool header_written_ = false; + bool finished_ = false; + std::optional<int64_t> footer_size_; Review Comment: please add one line comment for these varaibles ########## src/iceberg/puffin/puffin_reader.cc: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include <algorithm> +#include <array> +#include <cstring> +#include <string_view> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes at the given offset. +Status CheckMagic(std::span<const std::byte> data, int64_t offset) { + if (offset < 0 || + offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) { + return Invalid("Invalid file: cannot read magic at offset {}", offset); + } + auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid("Invalid file: expected magic at offset {}", offset); + } + return {}; +} + +} // namespace + +PuffinReader::PuffinReader(std::span<const std::byte> data) : data_(data) {} + +Result<FileMetadata> PuffinReader::ReadFileMetadata() { + auto file_size = static_cast<int64_t>(data_.size()); + + if (file_size < PuffinFormat::kFooterStructLength) { + return Invalid("Invalid file: file length {} is less than minimal footer size {}", + file_size, PuffinFormat::kFooterStructLength); + } + + // Read footer struct from end of file + auto footer_struct_offset = file_size - PuffinFormat::kFooterStructLength; + + // Validate footer end magic + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(data_, footer_struct_offset + PuffinFormat::kFooterStructMagicOffset)); + + // Read payload size from footer struct + auto payload_size = ReadLittleEndian<int32_t>( + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructPayloadSizeOffset); + + if (payload_size < 0) { + return Invalid("Invalid file: negative payload size {}", payload_size); + } + + // Calculate total footer size and validate + int64_t footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast<int64_t>(payload_size) + + PuffinFormat::kFooterStructLength; + auto footer_offset = file_size - footer_size; + if (footer_offset < 0) { + return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size, + file_size); + } + + // Validate footer start magic + ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, footer_offset)); + + // Check flags for footer compression + std::array<uint8_t, 4> flags{}; + std::memcpy( + flags.data(), + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4); Review Comment: ```suggestion std::memcpy( flags.data(), data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4); ``` ########## src/iceberg/puffin/puffin_reader.cc: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include <algorithm> +#include <array> +#include <cstring> +#include <string_view> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes at the given offset. +Status CheckMagic(std::span<const std::byte> data, int64_t offset) { + if (offset < 0 || + offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) { + return Invalid("Invalid file: cannot read magic at offset {}", offset); + } + auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid("Invalid file: expected magic at offset {}", offset); Review Comment: please also save the value of incorrect magic number into `Invalid` for easy debugging. ########## src/iceberg/puffin/puffin_reader.cc: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include <algorithm> +#include <array> +#include <cstring> +#include <string_view> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes at the given offset. +Status CheckMagic(std::span<const std::byte> data, int64_t offset) { + if (offset < 0 || + offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) { + return Invalid("Invalid file: cannot read magic at offset {}", offset); + } + auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid("Invalid file: expected magic at offset {}", offset); + } + return {}; +} + +} // namespace + +PuffinReader::PuffinReader(std::span<const std::byte> data) : data_(data) {} + +Result<FileMetadata> PuffinReader::ReadFileMetadata() { + auto file_size = static_cast<int64_t>(data_.size()); + + if (file_size < PuffinFormat::kFooterStructLength) { + return Invalid("Invalid file: file length {} is less than minimal footer size {}", + file_size, PuffinFormat::kFooterStructLength); + } + + // Read footer struct from end of file + auto footer_struct_offset = file_size - PuffinFormat::kFooterStructLength; + + // Validate footer end magic + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(data_, footer_struct_offset + PuffinFormat::kFooterStructMagicOffset)); + + // Read payload size from footer struct + auto payload_size = ReadLittleEndian<int32_t>( + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructPayloadSizeOffset); + + if (payload_size < 0) { + return Invalid("Invalid file: negative payload size {}", payload_size); + } + + // Calculate total footer size and validate + int64_t footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast<int64_t>(payload_size) + + PuffinFormat::kFooterStructLength; + auto footer_offset = file_size - footer_size; + if (footer_offset < 0) { + return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size, + file_size); + } + + // Validate footer start magic + ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, footer_offset)); + + // Check flags for footer compression + std::array<uint8_t, 4> flags{}; + std::memcpy( + flags.data(), + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4); + + PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone; + if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { + footer_compression = PuffinFormat::kDefaultFooterCompressionCodec; + } + + // Extract footer payload + auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength; + std::span<const std::byte> payload_span(data_.data() + payload_offset, payload_size); Review Comment: ```suggestion const std::span<const std::byte> payload_span(data_.data() + payload_offset, payload_size); ``` ########## src/iceberg/puffin/puffin_writer.cc: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include <array> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(PuffinCompressionCodec default_codec) + : default_codec_(default_codec) {} + +void PuffinWriter::WriteHeader() { + if (header_written_) return; + const auto& magic = PuffinFormat::kMagicV1; + buffer_.insert(buffer_.end(), reinterpret_cast<const std::byte*>(magic.data()), + reinterpret_cast<const std::byte*>(magic.data() + magic.size())); + header_written_ = true; +} + +Result<BlobMetadata> PuffinWriter::Add(const Blob& blob) { + if (finished_) { + return Invalid("Writer already finished"); + } + + WriteHeader(); + + auto codec = blob.requested_compression.value_or(default_codec_); + std::span<const std::byte> input_span( + reinterpret_cast<const std::byte*>(blob.data.data()), blob.data.size()); + ICEBERG_ASSIGN_OR_RAISE(auto compressed, Compress(codec, input_span)); + + auto offset = static_cast<int64_t>(buffer_.size()); + auto length = static_cast<int64_t>(compressed.size()); + buffer_.insert(buffer_.end(), compressed.begin(), compressed.end()); + + auto codec_name = CodecName(codec); + BlobMetadata metadata{ + .type = blob.type, + .input_fields = blob.input_fields, + .snapshot_id = blob.snapshot_id, + .sequence_number = blob.sequence_number, + .offset = offset, + .length = length, + .compression_codec = std::string(codec_name), + .properties = blob.properties, + }; + written_blobs_metadata_.push_back(metadata); + return metadata; +} + +Result<std::vector<std::byte>> PuffinWriter::Finish( + std::unordered_map<std::string, std::string> properties) { + if (finished_) { + return Invalid("Writer already finished"); + } + + WriteHeader(); + + FileMetadata file_metadata{ + .blobs = written_blobs_metadata_, + .properties = std::move(properties), + }; + + auto footer_json = ToJsonString(file_metadata); + auto footer_payload = std::span<const std::byte>( Review Comment: ```suggestion const auto footer_payload = std::span<const std::byte>( ``` ########## src/iceberg/puffin/puffin_writer.cc: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include <array> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(PuffinCompressionCodec default_codec) + : default_codec_(default_codec) {} + +void PuffinWriter::WriteHeader() { + if (header_written_) return; Review Comment: shouldn't we error out here? It looks like dangerous to call WriteHeader() multiple time. ########## src/iceberg/puffin/puffin_reader.h: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_reader.h +/// Puffin file reader. + +#include <cstddef> +#include <cstdint> +#include <span> +#include <utility> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg::puffin { + +/// \brief Reader for Puffin files. +/// +/// Parses a Puffin file from an in-memory buffer. Usage: +/// PuffinReader reader(file_data); +/// auto metadata = reader.ReadFileMetadata(); +/// auto blob = reader.ReadBlob(metadata.value().blobs[0]); +class ICEBERG_EXPORT PuffinReader { + public: + /// \brief Construct a reader from file data. + explicit PuffinReader(std::span<const std::byte> data); + + /// \brief Read and return the file metadata from the footer. + Result<FileMetadata> ReadFileMetadata(); + + /// \brief Read a specific blob's data by its metadata. + /// \param blob_metadata The metadata describing the blob to read. + /// \return A pair of (BlobMetadata, decompressed data), or an error. + Result<std::pair<BlobMetadata, std::vector<std::byte>>> ReadBlob( + const BlobMetadata& blob_metadata); + + /// \brief Read all blobs described in the file metadata. + /// \return A vector of (BlobMetadata, decompressed data) pairs, or an error. + Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>> ReadAll( + const std::vector<BlobMetadata>& blobs); + + private: + std::span<const std::byte> data_; Review Comment: ```suggestion const std::span<const std::byte> data_; ``` ########## src/iceberg/puffin/puffin_writer.cc: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include <array> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(PuffinCompressionCodec default_codec) + : default_codec_(default_codec) {} + +void PuffinWriter::WriteHeader() { + if (header_written_) return; Review Comment: ```suggestion if (header_written_) { return Invalid("Header has already been written."); } ``` ########## src/iceberg/puffin/puffin_writer.cc: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include <array> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(PuffinCompressionCodec default_codec) + : default_codec_(default_codec) {} + +void PuffinWriter::WriteHeader() { + if (header_written_) return; + const auto& magic = PuffinFormat::kMagicV1; + buffer_.insert(buffer_.end(), reinterpret_cast<const std::byte*>(magic.data()), + reinterpret_cast<const std::byte*>(magic.data() + magic.size())); + header_written_ = true; +} + +Result<BlobMetadata> PuffinWriter::Add(const Blob& blob) { + if (finished_) { + return Invalid("Writer already finished"); + } + + WriteHeader(); + + auto codec = blob.requested_compression.value_or(default_codec_); + std::span<const std::byte> input_span( + reinterpret_cast<const std::byte*>(blob.data.data()), blob.data.size()); + ICEBERG_ASSIGN_OR_RAISE(auto compressed, Compress(codec, input_span)); + + auto offset = static_cast<int64_t>(buffer_.size()); + auto length = static_cast<int64_t>(compressed.size()); + buffer_.insert(buffer_.end(), compressed.begin(), compressed.end()); + + auto codec_name = CodecName(codec); + BlobMetadata metadata{ + .type = blob.type, + .input_fields = blob.input_fields, + .snapshot_id = blob.snapshot_id, + .sequence_number = blob.sequence_number, + .offset = offset, + .length = length, + .compression_codec = std::string(codec_name), + .properties = blob.properties, + }; + written_blobs_metadata_.push_back(metadata); + return metadata; +} + +Result<std::vector<std::byte>> PuffinWriter::Finish( + std::unordered_map<std::string, std::string> properties) { + if (finished_) { + return Invalid("Writer already finished"); + } + + WriteHeader(); + + FileMetadata file_metadata{ + .blobs = written_blobs_metadata_, + .properties = std::move(properties), + }; + + auto footer_json = ToJsonString(file_metadata); + auto footer_payload = std::span<const std::byte>( + reinterpret_cast<const std::byte*>(footer_json.data()), footer_json.size()); + + // Footer start magic + auto footer_start = static_cast<int64_t>(buffer_.size()); + const auto& magic = PuffinFormat::kMagicV1; + buffer_.insert(buffer_.end(), reinterpret_cast<const std::byte*>(magic.data()), + reinterpret_cast<const std::byte*>(magic.data() + magic.size())); + + // Footer payload + buffer_.insert(buffer_.end(), footer_payload.begin(), footer_payload.end()); + + // Footer struct: payload_size (4) + flags (4) + magic (4) + auto payload_size = static_cast<int32_t>(footer_payload.size()); + std::array<std::byte, 4> size_buf{}; + WriteLittleEndian(payload_size, size_buf.data()); + buffer_.insert(buffer_.end(), size_buf.begin(), size_buf.end()); + + // Flags (no compression for now) + std::array<std::byte, 4> flags{}; + buffer_.insert(buffer_.end(), flags.begin(), flags.end()); + + // Footer end magic + buffer_.insert(buffer_.end(), reinterpret_cast<const std::byte*>(magic.data()), + reinterpret_cast<const std::byte*>(magic.data() + magic.size())); Review Comment: ```suggestion buffer_.insert(buffer_.end(), reinterpret_cast<const std::byte*>(magic.data()), reinterpret_cast<const std::byte*>(magic.data() + magic.size())); ``` ########## src/iceberg/puffin/puffin_writer.h: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_writer.h +/// Puffin file writer. + +#include <cstddef> +#include <cstdint> +#include <optional> +#include <string> +#include <unordered_map> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg::puffin { + +/// \brief Writer for Puffin files. +/// +/// Builds a complete Puffin file in memory. Usage: +/// PuffinWriter writer; +/// writer.Add(blob1); +/// writer.Add(blob2); +/// auto result = writer.Finish({{"created-by", "iceberg-cpp"}}); +/// // result.value() contains the serialized file bytes +class ICEBERG_EXPORT PuffinWriter { + public: + /// \brief Construct a writer with the given default compression codec. + explicit PuffinWriter( + PuffinCompressionCodec default_codec = PuffinCompressionCodec::kNone); + + /// \brief Add a blob to be written. + /// \return The BlobMetadata for the written blob, or an error. + Result<BlobMetadata> Add(const Blob& blob); + + /// \brief Finalize the file and return the serialized bytes. + /// \param properties File-level properties to include in the footer. + /// \return The complete Puffin file as a byte vector, or an error. + Result<std::vector<std::byte>> Finish( + std::unordered_map<std::string, std::string> properties = {}); Review Comment: do we really need to copy the whole `std::unordered_map<std::string, std::string>` ########## src/iceberg/puffin/puffin_reader.cc: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include <algorithm> +#include <array> +#include <cstring> +#include <string_view> + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes at the given offset. +Status CheckMagic(std::span<const std::byte> data, int64_t offset) { + if (offset < 0 || + offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) { + return Invalid("Invalid file: cannot read magic at offset {}", offset); + } + auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid("Invalid file: expected magic at offset {}", offset); + } + return {}; +} + +} // namespace + +PuffinReader::PuffinReader(std::span<const std::byte> data) : data_(data) {} + +Result<FileMetadata> PuffinReader::ReadFileMetadata() { + auto file_size = static_cast<int64_t>(data_.size()); + + if (file_size < PuffinFormat::kFooterStructLength) { + return Invalid("Invalid file: file length {} is less than minimal footer size {}", + file_size, PuffinFormat::kFooterStructLength); + } + + // Read footer struct from end of file + auto footer_struct_offset = file_size - PuffinFormat::kFooterStructLength; + + // Validate footer end magic + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(data_, footer_struct_offset + PuffinFormat::kFooterStructMagicOffset)); + + // Read payload size from footer struct + auto payload_size = ReadLittleEndian<int32_t>( + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructPayloadSizeOffset); + + // Calculate total footer size and validate + int64_t footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast<int64_t>(payload_size) + + PuffinFormat::kFooterStructLength; + auto footer_offset = file_size - footer_size; + if (footer_offset < 0) { + return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size, + file_size); + } + + // Validate footer start magic + ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, footer_offset)); + + // Check flags for footer compression + std::array<uint8_t, 4> flags{}; + std::memcpy( + flags.data(), + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4); + + PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone; + if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { + footer_compression = PuffinFormat::kDefaultFooterCompressionCodec; + } + + // Extract footer payload + auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength; + std::span<const std::byte> payload_span(data_.data() + payload_offset, payload_size); + ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes, + Decompress(footer_compression, payload_span)); Review Comment: payload_size is already validated above: ``` if (payload_size < 0) { return Invalid("Invalid file: negative payload size {}", payload_size); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
