jt2594838 commented on code in PR #648: URL: https://github.com/apache/tsfile/pull/648#discussion_r2575563627
########## cpp/src/common/device_id.cc: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "common/device_id.h" + +#include <algorithm> +#include <cctype> +#include <numeric> +#include <stdexcept> +#include <unordered_set> + +#include "constant/tsfile_constant.h" +#ifdef ENABLE_ANTLR4 +#include "parser/path_nodes_generator.h" +#endif +#include "utils/errno_define.h" + +namespace storage { + +// IDeviceID implementation +IDeviceID::IDeviceID() : empty_segments_() {} + +int IDeviceID::serialize(common::ByteStream& write_stream) { return 0; } +int IDeviceID::deserialize(common::ByteStream& read_stream) { return 0; } +std::string IDeviceID::get_table_name() { return ""; } +int IDeviceID::segment_num() { return 0; } +const std::vector<std::string*>& IDeviceID::get_segments() const { + return empty_segments_; +} +std::string IDeviceID::get_device_name() const { return ""; } +bool IDeviceID::operator<(const IDeviceID& other) { return false; } +bool IDeviceID::operator==(const IDeviceID& other) { return false; } +bool IDeviceID::operator!=(const IDeviceID& other) { return false; } + +// IDeviceIDComparator implementation +bool IDeviceIDComparator::operator()( + const std::shared_ptr<IDeviceID>& lhs, + const std::shared_ptr<IDeviceID>& rhs) const { + return *lhs < *rhs; +} + +// StringArrayDeviceID implementation +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string>& segments) + : segments_(formalize(segments)) {} + +StringArrayDeviceID::StringArrayDeviceID(const std::string& device_id_string) { + auto segments = split_device_id_string(device_id_string); + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(new std::string(segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string*>& segments) { + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(segment == nullptr ? nullptr + : new std::string(*segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID() : segments_() {} + +StringArrayDeviceID::~StringArrayDeviceID() { + for (const auto& segment : segments_) { + delete segment; + } +} + +std::string StringArrayDeviceID::get_device_name() const { + if (segments_.empty()) { + return ""; + } + + std::string result(*segments_.front()); + for (auto it = std::next(segments_.begin()); it != segments_.end(); ++it) { + result += '.'; + if (*it != nullptr) { + result += **it; + } else { + result += "null"; + } + } + + return result; +} + +int StringArrayDeviceID::serialize(common::ByteStream& write_stream) { + int ret = common::E_OK; + if (RET_FAIL(common::SerializationUtil::write_var_uint(segment_num(), + write_stream))) { + return ret; + } + for (const auto& segment : segments_) { + if (RET_FAIL(common::SerializationUtil::write_var_char_ptr( + segment, write_stream))) { + return ret; + } + } + return ret; +} + +int StringArrayDeviceID::deserialize(common::ByteStream& read_stream) { + int ret = common::E_OK; + uint32_t num_segments; + if (RET_FAIL(common::SerializationUtil::read_var_uint(num_segments, + read_stream))) { + return ret; + } + + for (auto& segment : segments_) { + if (segment != nullptr) { + delete segment; + } + } + + segments_.clear(); + for (uint32_t i = 0; i < num_segments; ++i) { + std::string* segment; + if (RET_FAIL(common::SerializationUtil::read_var_char_ptr( + segment, read_stream))) { + delete segment; + return ret; + } + segments_.push_back(segment); + } + return ret; +} + +std::string StringArrayDeviceID::get_table_name() { + return segments_.empty() ? "" : *segments_[0]; +} + +int StringArrayDeviceID::segment_num() { + return static_cast<int>(segments_.size()); +} + +const std::vector<std::string*>& StringArrayDeviceID::get_segments() const { + return segments_; +} + +bool StringArrayDeviceID::operator<(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return std::lexicographical_compare( + segments_.begin(), segments_.end(), other_segments.begin(), + other_segments.end(), [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return false; // equal + if (a == nullptr) return true; // nullptr < any string + if (b == nullptr) return false; // any string > nullptr + return *a < *b; + }); +} + +bool StringArrayDeviceID::operator==(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return (segments_.size() == other_segments.size()) && + std::equal(segments_.begin(), segments_.end(), + other_segments.begin(), + [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return true; + if (a == nullptr || b == nullptr) return false; + return *a == *b; + }); +} + +bool StringArrayDeviceID::operator!=(const IDeviceID& other) { + return !(*this == other); +} + +std::vector<std::string*> StringArrayDeviceID::formalize( + const std::vector<std::string>& segments) { + auto it = std::find_if(segments.rbegin(), segments.rend(), + [](const std::string& seg) { return !seg.empty(); }); + std::vector<std::string> validate_segments(segments.begin(), it.base()); + std::vector<std::string*> result; + result.reserve(validate_segments.size()); + for (const auto& segment : validate_segments) { + result.emplace_back(new std::string(segment)); + } + return result; +} + +std::vector<std::string> StringArrayDeviceID::split_device_id_string( + const std::string& device_id_string) { +#ifdef ENABLE_ANTLR4 + auto splits = storage::PathNodesGenerator::invokeParser(device_id_string); + return split_device_id_string(splits); +#else + return split_string(device_id_string, '.'); +#endif +} + +std::vector<std::string> StringArrayDeviceID::split_device_id_string( + const std::vector<std::string>& splits) { + size_t segment_cnt = splits.size(); + std::vector<std::string> final_segments; + + if (segment_cnt == 0) { + return final_segments; + } + + if (segment_cnt == 1) { + // "root" -> {"root"} + final_segments.push_back(splits[0]); + } else if (segment_cnt < + static_cast<size_t>(storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + + 1)) { + // "root.a" -> {"root", "a"} + // "root.a.b" -> {"root.a", "b"} + std::string table_name = std::accumulate( + splits.begin(), splits.end() - 1, std::string(), + [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + final_segments.push_back(table_name); + final_segments.push_back(splits.back()); + } else { + // "root.a.b.c" -> {"root.a.b", "c"} + // "root.a.b.c.d" -> {"root.a.b", "c", "d"} + std::string table_name = std::accumulate( + splits.begin(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + std::string(), [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + + final_segments.emplace_back(std::move(table_name)); + final_segments.insert( + final_segments.end(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + splits.end()); + } + + return final_segments; +} + +std::vector<std::string> IDeviceID::split_string(const std::string& str, + char delimiter) { + std::vector<std::string> tokens; + + // Reject newlines in path explicitly (illegal path name). + if (str.find('\n') != std::string::npos || + str.find('\r') != std::string::npos) { + throw std::runtime_error("Path contains newline"); + } + + std::string token; + bool in_back_quotes = false; // Inside `quoted` section + bool in_double_quotes = false; // Inside "quoted" section + bool in_single_quotes = false; // Inside 'quoted' section + + for (size_t i = 0; i < str.length(); ++i) { + char c = str[i]; + + // Toggle quote state when encountering a quote character outside other + // quote types. + if (c == '`' && !in_double_quotes && !in_single_quotes) { + in_back_quotes = !in_back_quotes; + token += c; // preserve the backtick character + } else if (c == '"' && !in_back_quotes && !in_single_quotes) { + in_double_quotes = !in_double_quotes; + token += c; // preserve + } else if (c == '\'' && !in_back_quotes && !in_double_quotes) { + in_single_quotes = !in_single_quotes; + token += c; // preserve + } else if (c == delimiter && !in_back_quotes && !in_double_quotes && + !in_single_quotes) { + // delimiter outside quotes -> split + if (!token.empty()) { + tokens.push_back(token); + token.clear(); Review Comment: verify and unquote? ########## cpp/src/common/device_id.cc: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "common/device_id.h" + +#include <algorithm> +#include <cctype> +#include <numeric> +#include <stdexcept> +#include <unordered_set> + +#include "constant/tsfile_constant.h" +#ifdef ENABLE_ANTLR4 +#include "parser/path_nodes_generator.h" +#endif +#include "utils/errno_define.h" + +namespace storage { + +// IDeviceID implementation +IDeviceID::IDeviceID() : empty_segments_() {} + +int IDeviceID::serialize(common::ByteStream& write_stream) { return 0; } +int IDeviceID::deserialize(common::ByteStream& read_stream) { return 0; } +std::string IDeviceID::get_table_name() { return ""; } +int IDeviceID::segment_num() { return 0; } +const std::vector<std::string*>& IDeviceID::get_segments() const { + return empty_segments_; +} +std::string IDeviceID::get_device_name() const { return ""; } +bool IDeviceID::operator<(const IDeviceID& other) { return false; } +bool IDeviceID::operator==(const IDeviceID& other) { return false; } +bool IDeviceID::operator!=(const IDeviceID& other) { return false; } + +// IDeviceIDComparator implementation +bool IDeviceIDComparator::operator()( + const std::shared_ptr<IDeviceID>& lhs, + const std::shared_ptr<IDeviceID>& rhs) const { + return *lhs < *rhs; +} + +// StringArrayDeviceID implementation +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string>& segments) + : segments_(formalize(segments)) {} + +StringArrayDeviceID::StringArrayDeviceID(const std::string& device_id_string) { + auto segments = split_device_id_string(device_id_string); + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(new std::string(segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string*>& segments) { + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(segment == nullptr ? nullptr + : new std::string(*segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID() : segments_() {} + +StringArrayDeviceID::~StringArrayDeviceID() { + for (const auto& segment : segments_) { + delete segment; + } +} + +std::string StringArrayDeviceID::get_device_name() const { + if (segments_.empty()) { + return ""; + } + + std::string result(*segments_.front()); + for (auto it = std::next(segments_.begin()); it != segments_.end(); ++it) { + result += '.'; + if (*it != nullptr) { + result += **it; + } else { + result += "null"; + } + } + + return result; +} + +int StringArrayDeviceID::serialize(common::ByteStream& write_stream) { + int ret = common::E_OK; + if (RET_FAIL(common::SerializationUtil::write_var_uint(segment_num(), + write_stream))) { + return ret; + } + for (const auto& segment : segments_) { + if (RET_FAIL(common::SerializationUtil::write_var_char_ptr( + segment, write_stream))) { + return ret; + } + } + return ret; +} + +int StringArrayDeviceID::deserialize(common::ByteStream& read_stream) { + int ret = common::E_OK; + uint32_t num_segments; + if (RET_FAIL(common::SerializationUtil::read_var_uint(num_segments, + read_stream))) { + return ret; + } + + for (auto& segment : segments_) { + if (segment != nullptr) { + delete segment; + } + } + + segments_.clear(); + for (uint32_t i = 0; i < num_segments; ++i) { + std::string* segment; + if (RET_FAIL(common::SerializationUtil::read_var_char_ptr( + segment, read_stream))) { + delete segment; + return ret; + } + segments_.push_back(segment); + } + return ret; +} + +std::string StringArrayDeviceID::get_table_name() { + return segments_.empty() ? "" : *segments_[0]; +} + +int StringArrayDeviceID::segment_num() { + return static_cast<int>(segments_.size()); +} + +const std::vector<std::string*>& StringArrayDeviceID::get_segments() const { + return segments_; +} + +bool StringArrayDeviceID::operator<(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return std::lexicographical_compare( + segments_.begin(), segments_.end(), other_segments.begin(), + other_segments.end(), [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return false; // equal + if (a == nullptr) return true; // nullptr < any string + if (b == nullptr) return false; // any string > nullptr + return *a < *b; + }); +} + +bool StringArrayDeviceID::operator==(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return (segments_.size() == other_segments.size()) && + std::equal(segments_.begin(), segments_.end(), + other_segments.begin(), + [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return true; + if (a == nullptr || b == nullptr) return false; + return *a == *b; + }); +} + +bool StringArrayDeviceID::operator!=(const IDeviceID& other) { + return !(*this == other); +} + +std::vector<std::string*> StringArrayDeviceID::formalize( + const std::vector<std::string>& segments) { + auto it = std::find_if(segments.rbegin(), segments.rend(), + [](const std::string& seg) { return !seg.empty(); }); + std::vector<std::string> validate_segments(segments.begin(), it.base()); + std::vector<std::string*> result; + result.reserve(validate_segments.size()); + for (const auto& segment : validate_segments) { + result.emplace_back(new std::string(segment)); + } + return result; +} + +std::vector<std::string> StringArrayDeviceID::split_device_id_string( + const std::string& device_id_string) { +#ifdef ENABLE_ANTLR4 + auto splits = storage::PathNodesGenerator::invokeParser(device_id_string); + return split_device_id_string(splits); +#else + return split_string(device_id_string, '.'); +#endif +} + +std::vector<std::string> StringArrayDeviceID::split_device_id_string( + const std::vector<std::string>& splits) { + size_t segment_cnt = splits.size(); + std::vector<std::string> final_segments; + + if (segment_cnt == 0) { + return final_segments; + } + + if (segment_cnt == 1) { + // "root" -> {"root"} + final_segments.push_back(splits[0]); + } else if (segment_cnt < + static_cast<size_t>(storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + + 1)) { + // "root.a" -> {"root", "a"} + // "root.a.b" -> {"root.a", "b"} + std::string table_name = std::accumulate( + splits.begin(), splits.end() - 1, std::string(), + [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + final_segments.push_back(table_name); + final_segments.push_back(splits.back()); + } else { + // "root.a.b.c" -> {"root.a.b", "c"} + // "root.a.b.c.d" -> {"root.a.b", "c", "d"} + std::string table_name = std::accumulate( + splits.begin(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + std::string(), [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + + final_segments.emplace_back(std::move(table_name)); + final_segments.insert( + final_segments.end(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + splits.end()); + } + + return final_segments; +} + +std::vector<std::string> IDeviceID::split_string(const std::string& str, + char delimiter) { + std::vector<std::string> tokens; + + // Reject newlines in path explicitly (illegal path name). + if (str.find('\n') != std::string::npos || + str.find('\r') != std::string::npos) { + throw std::runtime_error("Path contains newline"); + } + + std::string token; + bool in_back_quotes = false; // Inside `quoted` section + bool in_double_quotes = false; // Inside "quoted" section + bool in_single_quotes = false; // Inside 'quoted' section + + for (size_t i = 0; i < str.length(); ++i) { + char c = str[i]; + + // Toggle quote state when encountering a quote character outside other + // quote types. + if (c == '`' && !in_double_quotes && !in_single_quotes) { + in_back_quotes = !in_back_quotes; + token += c; // preserve the backtick character + } else if (c == '"' && !in_back_quotes && !in_single_quotes) { + in_double_quotes = !in_double_quotes; + token += c; // preserve + } else if (c == '\'' && !in_back_quotes && !in_double_quotes) { + in_single_quotes = !in_single_quotes; + token += c; // preserve + } else if (c == delimiter && !in_back_quotes && !in_double_quotes && + !in_single_quotes) { + // delimiter outside quotes -> split + if (!token.empty()) { + tokens.push_back(token); + token.clear(); + } else { + // keep behaviour: do not push empty tokens + token.clear(); + } Review Comment: Empty segments should be preserved? ########## cpp/src/common/device_id.cc: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "common/device_id.h" + +#include <algorithm> +#include <cctype> +#include <numeric> +#include <stdexcept> +#include <unordered_set> + +#include "constant/tsfile_constant.h" +#ifdef ENABLE_ANTLR4 +#include "parser/path_nodes_generator.h" +#endif +#include "utils/errno_define.h" + +namespace storage { + +// IDeviceID implementation +IDeviceID::IDeviceID() : empty_segments_() {} + +int IDeviceID::serialize(common::ByteStream& write_stream) { return 0; } +int IDeviceID::deserialize(common::ByteStream& read_stream) { return 0; } +std::string IDeviceID::get_table_name() { return ""; } +int IDeviceID::segment_num() { return 0; } +const std::vector<std::string*>& IDeviceID::get_segments() const { + return empty_segments_; +} +std::string IDeviceID::get_device_name() const { return ""; } +bool IDeviceID::operator<(const IDeviceID& other) { return false; } +bool IDeviceID::operator==(const IDeviceID& other) { return false; } +bool IDeviceID::operator!=(const IDeviceID& other) { return false; } + +// IDeviceIDComparator implementation +bool IDeviceIDComparator::operator()( + const std::shared_ptr<IDeviceID>& lhs, + const std::shared_ptr<IDeviceID>& rhs) const { + return *lhs < *rhs; +} + +// StringArrayDeviceID implementation +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string>& segments) + : segments_(formalize(segments)) {} + +StringArrayDeviceID::StringArrayDeviceID(const std::string& device_id_string) { + auto segments = split_device_id_string(device_id_string); + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(new std::string(segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string*>& segments) { + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(segment == nullptr ? nullptr + : new std::string(*segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID() : segments_() {} + +StringArrayDeviceID::~StringArrayDeviceID() { + for (const auto& segment : segments_) { + delete segment; + } +} + +std::string StringArrayDeviceID::get_device_name() const { + if (segments_.empty()) { + return ""; + } + + std::string result(*segments_.front()); + for (auto it = std::next(segments_.begin()); it != segments_.end(); ++it) { + result += '.'; + if (*it != nullptr) { + result += **it; + } else { + result += "null"; + } + } + + return result; +} + +int StringArrayDeviceID::serialize(common::ByteStream& write_stream) { + int ret = common::E_OK; + if (RET_FAIL(common::SerializationUtil::write_var_uint(segment_num(), + write_stream))) { + return ret; + } + for (const auto& segment : segments_) { + if (RET_FAIL(common::SerializationUtil::write_var_char_ptr( + segment, write_stream))) { + return ret; + } + } + return ret; +} + +int StringArrayDeviceID::deserialize(common::ByteStream& read_stream) { + int ret = common::E_OK; + uint32_t num_segments; + if (RET_FAIL(common::SerializationUtil::read_var_uint(num_segments, + read_stream))) { + return ret; + } + + for (auto& segment : segments_) { + if (segment != nullptr) { + delete segment; + } + } + + segments_.clear(); + for (uint32_t i = 0; i < num_segments; ++i) { + std::string* segment; + if (RET_FAIL(common::SerializationUtil::read_var_char_ptr( + segment, read_stream))) { + delete segment; + return ret; + } + segments_.push_back(segment); + } + return ret; +} + +std::string StringArrayDeviceID::get_table_name() { + return segments_.empty() ? "" : *segments_[0]; +} + +int StringArrayDeviceID::segment_num() { + return static_cast<int>(segments_.size()); +} + +const std::vector<std::string*>& StringArrayDeviceID::get_segments() const { + return segments_; +} + +bool StringArrayDeviceID::operator<(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return std::lexicographical_compare( + segments_.begin(), segments_.end(), other_segments.begin(), + other_segments.end(), [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return false; // equal + if (a == nullptr) return true; // nullptr < any string + if (b == nullptr) return false; // any string > nullptr + return *a < *b; + }); +} + +bool StringArrayDeviceID::operator==(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return (segments_.size() == other_segments.size()) && + std::equal(segments_.begin(), segments_.end(), + other_segments.begin(), + [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return true; + if (a == nullptr || b == nullptr) return false; + return *a == *b; + }); +} + +bool StringArrayDeviceID::operator!=(const IDeviceID& other) { + return !(*this == other); +} + +std::vector<std::string*> StringArrayDeviceID::formalize( + const std::vector<std::string>& segments) { + auto it = std::find_if(segments.rbegin(), segments.rend(), + [](const std::string& seg) { return !seg.empty(); }); + std::vector<std::string> validate_segments(segments.begin(), it.base()); + std::vector<std::string*> result; + result.reserve(validate_segments.size()); + for (const auto& segment : validate_segments) { + result.emplace_back(new std::string(segment)); + } + return result; +} + Review Comment: Null is different from "", so you cannot remove trailing empty segments. ########## cpp/src/cwrapper/tsfile_cwrapper.cc: ########## @@ -24,6 +24,8 @@ #include <unistd.h> #include <writer/tsfile_table_writer.h> +#include <set> Review Comment: Is this used? ########## cpp/src/compress/compressor_factory.h: ########## @@ -46,19 +58,35 @@ class CompressorFactory { if (type == common::UNCOMPRESSED) { ALLOC_AND_RETURN_COMPRESSPR(UncompressedCompressor); } else if (type == common::SNAPPY) { +#ifdef ENABLE_SNAPPY ALLOC_AND_RETURN_COMPRESSPR(SnappyCompressor); +#else + ALLOC_AND_RETURN_COMPRESSPR(UncompressedCompressor); +#endif } else if (type == common::GZIP) { +#ifdef ENABLE_GZIP ALLOC_AND_RETURN_COMPRESSPR(GZIPCompressor); +#else + ALLOC_AND_RETURN_COMPRESSPR(UncompressedCompressor); +#endif } else if (type == common::LZO) { +#ifdef ENABLE_LZOKAY ALLOC_AND_RETURN_COMPRESSPR(LZOCompressor); +#else + ALLOC_AND_RETURN_COMPRESSPR(UncompressedCompressor); +#endif + } else if (type == common::LZ4) { +#ifdef ENABLE_LZ4 + ALLOC_AND_RETURN_COMPRESSPR(LZ4Compressor); +#else + ALLOC_AND_RETURN_COMPRESSPR(UncompressedCompressor); +#endif Review Comment: Make sure that the compression type of the caller is consistently changed. That is, you will not write a chunk with compression type being LZ4 but the chunk is actually not compressed. ########## cpp/src/common/device_id.cc: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "common/device_id.h" + +#include <algorithm> +#include <cctype> +#include <numeric> +#include <stdexcept> +#include <unordered_set> + +#include "constant/tsfile_constant.h" +#ifdef ENABLE_ANTLR4 +#include "parser/path_nodes_generator.h" +#endif +#include "utils/errno_define.h" + +namespace storage { + +// IDeviceID implementation +IDeviceID::IDeviceID() : empty_segments_() {} + +int IDeviceID::serialize(common::ByteStream& write_stream) { return 0; } +int IDeviceID::deserialize(common::ByteStream& read_stream) { return 0; } +std::string IDeviceID::get_table_name() { return ""; } +int IDeviceID::segment_num() { return 0; } +const std::vector<std::string*>& IDeviceID::get_segments() const { + return empty_segments_; +} +std::string IDeviceID::get_device_name() const { return ""; } +bool IDeviceID::operator<(const IDeviceID& other) { return false; } +bool IDeviceID::operator==(const IDeviceID& other) { return false; } +bool IDeviceID::operator!=(const IDeviceID& other) { return false; } + +// IDeviceIDComparator implementation +bool IDeviceIDComparator::operator()( + const std::shared_ptr<IDeviceID>& lhs, + const std::shared_ptr<IDeviceID>& rhs) const { + return *lhs < *rhs; +} + +// StringArrayDeviceID implementation +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string>& segments) + : segments_(formalize(segments)) {} + +StringArrayDeviceID::StringArrayDeviceID(const std::string& device_id_string) { + auto segments = split_device_id_string(device_id_string); + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(new std::string(segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string*>& segments) { + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(segment == nullptr ? nullptr + : new std::string(*segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID() : segments_() {} + +StringArrayDeviceID::~StringArrayDeviceID() { + for (const auto& segment : segments_) { + delete segment; + } +} + +std::string StringArrayDeviceID::get_device_name() const { + if (segments_.empty()) { + return ""; + } + + std::string result(*segments_.front()); + for (auto it = std::next(segments_.begin()); it != segments_.end(); ++it) { + result += '.'; + if (*it != nullptr) { + result += **it; + } else { + result += "null"; + } + } + + return result; +} + +int StringArrayDeviceID::serialize(common::ByteStream& write_stream) { + int ret = common::E_OK; + if (RET_FAIL(common::SerializationUtil::write_var_uint(segment_num(), + write_stream))) { + return ret; + } + for (const auto& segment : segments_) { + if (RET_FAIL(common::SerializationUtil::write_var_char_ptr( + segment, write_stream))) { + return ret; + } + } + return ret; +} + +int StringArrayDeviceID::deserialize(common::ByteStream& read_stream) { + int ret = common::E_OK; + uint32_t num_segments; + if (RET_FAIL(common::SerializationUtil::read_var_uint(num_segments, + read_stream))) { + return ret; + } + + for (auto& segment : segments_) { + if (segment != nullptr) { + delete segment; + } + } + + segments_.clear(); + for (uint32_t i = 0; i < num_segments; ++i) { + std::string* segment; + if (RET_FAIL(common::SerializationUtil::read_var_char_ptr( + segment, read_stream))) { + delete segment; + return ret; + } + segments_.push_back(segment); + } + return ret; +} + +std::string StringArrayDeviceID::get_table_name() { + return segments_.empty() ? "" : *segments_[0]; +} + +int StringArrayDeviceID::segment_num() { + return static_cast<int>(segments_.size()); +} + +const std::vector<std::string*>& StringArrayDeviceID::get_segments() const { + return segments_; +} + +bool StringArrayDeviceID::operator<(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return std::lexicographical_compare( + segments_.begin(), segments_.end(), other_segments.begin(), + other_segments.end(), [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return false; // equal + if (a == nullptr) return true; // nullptr < any string + if (b == nullptr) return false; // any string > nullptr + return *a < *b; + }); +} + +bool StringArrayDeviceID::operator==(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return (segments_.size() == other_segments.size()) && + std::equal(segments_.begin(), segments_.end(), + other_segments.begin(), + [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return true; + if (a == nullptr || b == nullptr) return false; + return *a == *b; + }); +} + +bool StringArrayDeviceID::operator!=(const IDeviceID& other) { + return !(*this == other); +} + +std::vector<std::string*> StringArrayDeviceID::formalize( + const std::vector<std::string>& segments) { + auto it = std::find_if(segments.rbegin(), segments.rend(), + [](const std::string& seg) { return !seg.empty(); }); + std::vector<std::string> validate_segments(segments.begin(), it.base()); + std::vector<std::string*> result; + result.reserve(validate_segments.size()); + for (const auto& segment : validate_segments) { + result.emplace_back(new std::string(segment)); + } + return result; +} + +std::vector<std::string> StringArrayDeviceID::split_device_id_string( + const std::string& device_id_string) { +#ifdef ENABLE_ANTLR4 + auto splits = storage::PathNodesGenerator::invokeParser(device_id_string); + return split_device_id_string(splits); +#else + return split_string(device_id_string, '.'); +#endif +} + +std::vector<std::string> StringArrayDeviceID::split_device_id_string( + const std::vector<std::string>& splits) { + size_t segment_cnt = splits.size(); + std::vector<std::string> final_segments; + + if (segment_cnt == 0) { + return final_segments; + } + + if (segment_cnt == 1) { + // "root" -> {"root"} + final_segments.push_back(splits[0]); + } else if (segment_cnt < + static_cast<size_t>(storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + + 1)) { + // "root.a" -> {"root", "a"} + // "root.a.b" -> {"root.a", "b"} + std::string table_name = std::accumulate( + splits.begin(), splits.end() - 1, std::string(), + [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + final_segments.push_back(table_name); + final_segments.push_back(splits.back()); + } else { + // "root.a.b.c" -> {"root.a.b", "c"} + // "root.a.b.c.d" -> {"root.a.b", "c", "d"} + std::string table_name = std::accumulate( + splits.begin(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + std::string(), [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + + final_segments.emplace_back(std::move(table_name)); + final_segments.insert( + final_segments.end(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + splits.end()); + } + + return final_segments; +} + +std::vector<std::string> IDeviceID::split_string(const std::string& str, + char delimiter) { + std::vector<std::string> tokens; + + // Reject newlines in path explicitly (illegal path name). + if (str.find('\n') != std::string::npos || + str.find('\r') != std::string::npos) { + throw std::runtime_error("Path contains newline"); + } Review Comment: No need to ban these? ########## cpp/src/common/device_id.cc: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "common/device_id.h" + +#include <algorithm> +#include <cctype> +#include <numeric> +#include <stdexcept> +#include <unordered_set> + +#include "constant/tsfile_constant.h" +#ifdef ENABLE_ANTLR4 +#include "parser/path_nodes_generator.h" +#endif +#include "utils/errno_define.h" + +namespace storage { + +// IDeviceID implementation +IDeviceID::IDeviceID() : empty_segments_() {} + +int IDeviceID::serialize(common::ByteStream& write_stream) { return 0; } +int IDeviceID::deserialize(common::ByteStream& read_stream) { return 0; } +std::string IDeviceID::get_table_name() { return ""; } +int IDeviceID::segment_num() { return 0; } +const std::vector<std::string*>& IDeviceID::get_segments() const { + return empty_segments_; +} +std::string IDeviceID::get_device_name() const { return ""; } +bool IDeviceID::operator<(const IDeviceID& other) { return false; } +bool IDeviceID::operator==(const IDeviceID& other) { return false; } +bool IDeviceID::operator!=(const IDeviceID& other) { return false; } + +// IDeviceIDComparator implementation +bool IDeviceIDComparator::operator()( + const std::shared_ptr<IDeviceID>& lhs, + const std::shared_ptr<IDeviceID>& rhs) const { + return *lhs < *rhs; +} + +// StringArrayDeviceID implementation +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string>& segments) + : segments_(formalize(segments)) {} + +StringArrayDeviceID::StringArrayDeviceID(const std::string& device_id_string) { + auto segments = split_device_id_string(device_id_string); + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(new std::string(segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID( + const std::vector<std::string*>& segments) { + segments_.reserve(segments.size()); + for (const auto& segment : segments) { + segments_.push_back(segment == nullptr ? nullptr + : new std::string(*segment)); + } +} + +StringArrayDeviceID::StringArrayDeviceID() : segments_() {} + +StringArrayDeviceID::~StringArrayDeviceID() { + for (const auto& segment : segments_) { + delete segment; + } +} + +std::string StringArrayDeviceID::get_device_name() const { + if (segments_.empty()) { + return ""; + } + + std::string result(*segments_.front()); + for (auto it = std::next(segments_.begin()); it != segments_.end(); ++it) { + result += '.'; + if (*it != nullptr) { + result += **it; + } else { + result += "null"; + } + } + + return result; +} + +int StringArrayDeviceID::serialize(common::ByteStream& write_stream) { + int ret = common::E_OK; + if (RET_FAIL(common::SerializationUtil::write_var_uint(segment_num(), + write_stream))) { + return ret; + } + for (const auto& segment : segments_) { + if (RET_FAIL(common::SerializationUtil::write_var_char_ptr( + segment, write_stream))) { + return ret; + } + } + return ret; +} + +int StringArrayDeviceID::deserialize(common::ByteStream& read_stream) { + int ret = common::E_OK; + uint32_t num_segments; + if (RET_FAIL(common::SerializationUtil::read_var_uint(num_segments, + read_stream))) { + return ret; + } + + for (auto& segment : segments_) { + if (segment != nullptr) { + delete segment; + } + } + + segments_.clear(); + for (uint32_t i = 0; i < num_segments; ++i) { + std::string* segment; + if (RET_FAIL(common::SerializationUtil::read_var_char_ptr( + segment, read_stream))) { + delete segment; + return ret; + } + segments_.push_back(segment); + } + return ret; +} + +std::string StringArrayDeviceID::get_table_name() { + return segments_.empty() ? "" : *segments_[0]; +} + +int StringArrayDeviceID::segment_num() { + return static_cast<int>(segments_.size()); +} + +const std::vector<std::string*>& StringArrayDeviceID::get_segments() const { + return segments_; +} + +bool StringArrayDeviceID::operator<(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return std::lexicographical_compare( + segments_.begin(), segments_.end(), other_segments.begin(), + other_segments.end(), [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return false; // equal + if (a == nullptr) return true; // nullptr < any string + if (b == nullptr) return false; // any string > nullptr + return *a < *b; + }); +} + +bool StringArrayDeviceID::operator==(const IDeviceID& other) { + auto other_segments = other.get_segments(); + return (segments_.size() == other_segments.size()) && + std::equal(segments_.begin(), segments_.end(), + other_segments.begin(), + [](const std::string* a, const std::string* b) { + if (a == nullptr && b == nullptr) return true; + if (a == nullptr || b == nullptr) return false; + return *a == *b; + }); +} + +bool StringArrayDeviceID::operator!=(const IDeviceID& other) { + return !(*this == other); +} + +std::vector<std::string*> StringArrayDeviceID::formalize( + const std::vector<std::string>& segments) { + auto it = std::find_if(segments.rbegin(), segments.rend(), + [](const std::string& seg) { return !seg.empty(); }); + std::vector<std::string> validate_segments(segments.begin(), it.base()); + std::vector<std::string*> result; + result.reserve(validate_segments.size()); + for (const auto& segment : validate_segments) { + result.emplace_back(new std::string(segment)); + } + return result; +} + +std::vector<std::string> StringArrayDeviceID::split_device_id_string( + const std::string& device_id_string) { +#ifdef ENABLE_ANTLR4 + auto splits = storage::PathNodesGenerator::invokeParser(device_id_string); + return split_device_id_string(splits); +#else + return split_string(device_id_string, '.'); +#endif +} + +std::vector<std::string> StringArrayDeviceID::split_device_id_string( + const std::vector<std::string>& splits) { + size_t segment_cnt = splits.size(); + std::vector<std::string> final_segments; + + if (segment_cnt == 0) { + return final_segments; + } + + if (segment_cnt == 1) { + // "root" -> {"root"} + final_segments.push_back(splits[0]); + } else if (segment_cnt < + static_cast<size_t>(storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + + 1)) { + // "root.a" -> {"root", "a"} + // "root.a.b" -> {"root.a", "b"} + std::string table_name = std::accumulate( + splits.begin(), splits.end() - 1, std::string(), + [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + final_segments.push_back(table_name); + final_segments.push_back(splits.back()); + } else { + // "root.a.b.c" -> {"root.a.b", "c"} + // "root.a.b.c.d" -> {"root.a.b", "c", "d"} + std::string table_name = std::accumulate( + splits.begin(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + std::string(), [](const std::string& a, const std::string& b) { + return a.empty() ? b : a + storage::PATH_SEPARATOR + b; + }); + + final_segments.emplace_back(std::move(table_name)); + final_segments.insert( + final_segments.end(), + splits.begin() + storage::DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME, + splits.end()); + } + + return final_segments; +} + +std::vector<std::string> IDeviceID::split_string(const std::string& str, + char delimiter) { + std::vector<std::string> tokens; + + // Reject newlines in path explicitly (illegal path name). + if (str.find('\n') != std::string::npos || + str.find('\r') != std::string::npos) { + throw std::runtime_error("Path contains newline"); Review Comment: Return tokens as a parameter and return an error code? ########## cpp/src/reader/meta_data_querier.h: ########## @@ -20,6 +20,8 @@ #ifndef READER_META_DATA_QUERIER_H #define READER_META_DATA_QUERIER_H +#include <mutex> Review Comment: What is this for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
