github-actions[bot] commented on code in PR #63182:
URL: https://github.com/apache/doris/pull/63182#discussion_r3232926804
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java:
##########
@@ -705,6 +705,7 @@ public void modifyProperties(AlterRoutineLoadCommand
command) throws UserExcepti
throw new DdlException("Only supports modification of PAUSED
jobs");
}
+ validateFlexiblePartialUpdateForAlter(jobProperties,
command.getRoutineLoadDesc());
modifyPropertiesInternal(jobProperties, dataSourceProperties);
Review Comment:
This validation is still not atomic with installing the new
`RoutineLoadDesc`, because `RoutineLoadManager.alterRoutineLoadJob()` calls
`job.setRoutineLoadDesc(command.getRoutineLoadDesc())` after
`modifyProperties()` returns and outside this job write lock. The same-ALTER
case is now checked, but two ALTERs can still interleave: ALTER B containing
only `COLUMNS(...)` validates while the job is still `UPSERT`, releases the
lock without installing its descriptor; ALTER A then validates and switches to
`UPDATE_FLEXIBLE_COLUMNS` while `columnDescs` is still empty; finally ALTER B
installs the `COLUMNS` descriptor outside the lock. The final job is flexible
partial update with a COLUMNS mapping, which routes VARIANT flexible loads
through the unsupported mapped-column path. Please move descriptor validation
and installation into the same job write-lock critical section, or otherwise
serialize ALTER validation against the effective post-ALTER descriptor state.
This is distinct from the
earlier same-statement validation issue because the remaining bypass requires
two concurrent ALTER statements.
##########
be/src/exec/common/variant_util.cpp:
##########
@@ -2141,6 +2146,627 @@ phmap::flat_hash_map<std::string_view,
ColumnVariant::Subcolumn> materialize_doc
return subcolumns;
}
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_MASK = 1ULL << 63;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_CLASS_SHIFT = 62;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_UID_BITS = 31;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_INDEX_BITS = 11;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_POS_BITS = 12;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_BYTE_BITS = 8;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_POS_SHIFT =
VARIANT_PATCH_PATH_MARKER_BYTE_BITS;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_INDEX_SHIFT =
+ VARIANT_PATCH_PATH_MARKER_POS_SHIFT +
VARIANT_PATCH_PATH_MARKER_POS_BITS;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_UID_SHIFT =
+ VARIANT_PATCH_PATH_MARKER_INDEX_SHIFT +
VARIANT_PATCH_PATH_MARKER_INDEX_BITS;
+static_assert(VARIANT_PATCH_PATH_MARKER_UID_SHIFT +
VARIANT_PATCH_PATH_MARKER_UID_BITS ==
+ VARIANT_PATCH_PATH_MARKER_CLASS_SHIFT);
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_UID_MASK =
+ (1ULL << VARIANT_PATCH_PATH_MARKER_UID_BITS) - 1;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_INDEX_MASK =
+ (1ULL << VARIANT_PATCH_PATH_MARKER_INDEX_BITS) - 1;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_POS_MASK =
+ (1ULL << VARIANT_PATCH_PATH_MARKER_POS_BITS) - 1;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_BYTE_MASK =
+ (1ULL << VARIANT_PATCH_PATH_MARKER_BYTE_BITS) - 1;
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_MAX_COUNT = 1ULL
+ <<
VARIANT_PATCH_PATH_MARKER_INDEX_BITS;
+// Flexible VARIANT partial update keeps exact patch paths in skip bitmap
markers.
+// The byte position field is the feature-level encoded-path limit.
+constexpr uint64_t VARIANT_PATCH_PATH_MARKER_MAX_BYTES = 1ULL <<
VARIANT_PATCH_PATH_MARKER_POS_BITS;
+constexpr uint64_t VARIANT_PATCH_PATH_MAX_COUNT = 256;
+
+// The hidden skip bitmap stores top-level column unique ids, so VARIANT patch
metadata uses
+// values outside the int32 uid range. Each path is represented by exact,
column-scoped byte
+// markers with the high marker bit set; this keeps publish-conflict merge
deterministic.
+bool is_variant_patch_path_marker(uint64_t value) {
+ return (value & VARIANT_PATCH_PATH_MARKER_MASK) != 0;
+}
+
+namespace {
+
+struct VariantPatchPathEncoding {
+ std::optional<uint64_t> length;
+ std::vector<std::optional<uint8_t>> bytes;
+};
+
+using VariantPatchPathMap = std::map<std::string, PathInData>;
+
+void append_fixed_u32(uint32_t value, std::string* dst) {
+ dst->push_back(static_cast<char>(value & 0xFF));
+ dst->push_back(static_cast<char>((value >> 8) & 0xFF));
+ dst->push_back(static_cast<char>((value >> 16) & 0xFF));
+ dst->push_back(static_cast<char>((value >> 24) & 0xFF));
+}
+
+bool read_fixed_u32(std::string_view src, size_t* offset, uint32_t* value) {
+ if (*offset + sizeof(uint32_t) > src.size()) {
+ return false;
+ }
+ const auto* data = reinterpret_cast<const uint8_t*>(src.data() + *offset);
+ *value = static_cast<uint32_t>(data[0]) | (static_cast<uint32_t>(data[1])
<< 8) |
+ (static_cast<uint32_t>(data[2]) << 16) |
(static_cast<uint32_t>(data[3]) << 24);
+ *offset += sizeof(uint32_t);
+ return true;
+}
+
+std::string encode_variant_patch_path_key(const PathInData& path) {
+ const auto& parts = path.get_parts();
+ DCHECK(!parts.empty());
+ std::string encoded;
+ append_fixed_u32(static_cast<uint32_t>(parts.size()), &encoded);
+ for (const auto& part : parts) {
+ append_fixed_u32(static_cast<uint32_t>(part.key.size()), &encoded);
+ encoded.append(part.key.data(), part.key.size());
+ encoded.push_back(static_cast<char>(part.is_nested ? 1 : 0));
+ encoded.push_back(static_cast<char>(part.anonymous_array_level));
+ }
+ return encoded;
+}
+
+Status decode_variant_patch_path_key(std::string_view encoded, PathInData*
path) {
+ size_t offset = 0;
+ uint32_t part_count = 0;
+ if (!read_fixed_u32(encoded, &offset, &part_count) || part_count == 0) {
+ return Status::InternalError("Invalid VARIANT patch path marker part
count");
+ }
+
+ PathInData::Parts parts;
+ parts.reserve(part_count);
+ for (uint32_t i = 0; i < part_count; ++i) {
+ uint32_t key_size = 0;
+ if (!read_fixed_u32(encoded, &offset, &key_size) ||
+ offset + key_size + 2 > encoded.size()) {
+ return Status::InternalError("Invalid VARIANT patch path marker
part payload");
+ }
+ PathInData::Part part;
+ part.key = std::string_view(encoded.data() + offset, key_size);
+ offset += key_size;
+ part.is_nested = encoded[offset++] != 0;
+ part.anonymous_array_level = static_cast<UInt8>(encoded[offset++]);
+ parts.emplace_back(part);
+ }
+ if (offset != encoded.size()) {
+ return Status::InternalError("Trailing bytes in VARIANT patch path
marker");
+ }
+
+ *path = PathInData(parts);
+ return Status::OK();
+}
+
+uint64_t variant_patch_path_max_bytes() {
+ return VARIANT_PATCH_PATH_MARKER_MAX_BYTES;
+}
+
+uint64_t normalized_variant_col_unique_id(int32_t variant_col_unique_id) {
+ CHECK_GE(variant_col_unique_id, 0);
+ CHECK_LE(static_cast<uint64_t>(variant_col_unique_id),
VARIANT_PATCH_PATH_MARKER_UID_MASK);
+ return static_cast<uint64_t>(variant_col_unique_id);
+}
+
+uint64_t variant_patch_path_marker_uid(uint64_t marker) {
+ return (marker >> VARIANT_PATCH_PATH_MARKER_UID_SHIFT) &
VARIANT_PATCH_PATH_MARKER_UID_MASK;
+}
+
+bool is_variant_patch_path_marker_for_column(uint64_t marker, int32_t
variant_col_unique_id) {
+ return is_variant_patch_path_marker(marker) &&
+ variant_patch_path_marker_uid(marker) ==
+ normalized_variant_col_unique_id(variant_col_unique_id);
+}
+
+uint64_t variant_patch_path_marker_index(uint64_t marker) {
+ return (marker >> VARIANT_PATCH_PATH_MARKER_INDEX_SHIFT) &
VARIANT_PATCH_PATH_MARKER_INDEX_MASK;
+}
+
+bool variant_patch_path_marker_is_byte(uint64_t marker) {
+ return ((marker >> VARIANT_PATCH_PATH_MARKER_CLASS_SHIFT) & 1ULL) != 0;
+}
+
+uint64_t variant_patch_path_length_marker(int32_t variant_col_unique_id,
uint64_t path_index,
+ uint64_t length) {
+ DCHECK_LT(path_index, VARIANT_PATCH_PATH_MARKER_MAX_COUNT);
+ DCHECK_LE(length, VARIANT_PATCH_PATH_MARKER_MAX_BYTES);
+ return VARIANT_PATCH_PATH_MARKER_MASK |
+ (normalized_variant_col_unique_id(variant_col_unique_id)
+ << VARIANT_PATCH_PATH_MARKER_UID_SHIFT) |
+ (path_index << VARIANT_PATCH_PATH_MARKER_INDEX_SHIFT) | length;
+}
+
+uint64_t variant_patch_path_byte_marker(int32_t variant_col_unique_id,
uint64_t path_index,
+ uint64_t byte_pos, uint8_t byte) {
+ DCHECK_LT(path_index, VARIANT_PATCH_PATH_MARKER_MAX_COUNT);
+ DCHECK_LT(byte_pos, VARIANT_PATCH_PATH_MARKER_MAX_BYTES);
+ return VARIANT_PATCH_PATH_MARKER_MASK | (1ULL <<
VARIANT_PATCH_PATH_MARKER_CLASS_SHIFT) |
+ (normalized_variant_col_unique_id(variant_col_unique_id)
+ << VARIANT_PATCH_PATH_MARKER_UID_SHIFT) |
+ (path_index << VARIANT_PATCH_PATH_MARKER_INDEX_SHIFT) |
+ (byte_pos << VARIANT_PATCH_PATH_MARKER_POS_SHIFT) | byte;
+}
+
+void remove_variant_patch_path_markers_for_column(int32_t
variant_col_unique_id,
+ BitmapValue* bitmap) {
+ std::vector<uint64_t> markers_to_remove;
+ for (uint64_t marker : *bitmap) {
+ if (is_variant_patch_path_marker_for_column(marker,
variant_col_unique_id)) {
+ markers_to_remove.push_back(marker);
+ }
+ }
+ for (uint64_t marker : markers_to_remove) {
+ bitmap->remove(marker);
+ }
+}
+
+void remove_all_variant_patch_path_markers(BitmapValue* bitmap) {
+ std::vector<uint64_t> markers_to_remove;
+ for (uint64_t marker : *bitmap) {
+ if (is_variant_patch_path_marker(marker)) {
+ markers_to_remove.push_back(marker);
+ }
+ }
+ for (uint64_t marker : markers_to_remove) {
+ bitmap->remove(marker);
+ }
+}
+
+Status decode_variant_patch_paths(const BitmapValue& bitmap, int32_t
variant_col_unique_id,
+ VariantPatchPathMap* paths) {
+ paths->clear();
+ std::map<uint64_t, VariantPatchPathEncoding> encoded_paths;
+ for (uint64_t marker : bitmap) {
+ if (!is_variant_patch_path_marker_for_column(marker,
variant_col_unique_id)) {
+ continue;
+ }
+ auto& encoded_path =
encoded_paths[variant_patch_path_marker_index(marker)];
+ if (!variant_patch_path_marker_is_byte(marker)) {
+ const uint64_t length = marker & ((1ULL <<
VARIANT_PATCH_PATH_MARKER_INDEX_SHIFT) - 1);
+ if (length > VARIANT_PATCH_PATH_MARKER_MAX_BYTES) {
+ return Status::InternalError(
+ "Invalid VARIANT patch path marker length {} for
column {}", length,
+ variant_col_unique_id);
+ }
+ if (encoded_path.length.has_value() && *encoded_path.length !=
length) {
+ return Status::InternalError(
+ "Conflicting VARIANT patch path marker length for
column {}",
+ variant_col_unique_id);
+ }
+ encoded_path.length = length;
+ continue;
+ }
+
+ const uint64_t byte_pos = (marker >>
VARIANT_PATCH_PATH_MARKER_POS_SHIFT) &
+ VARIANT_PATCH_PATH_MARKER_POS_MASK;
+ const uint8_t byte = marker & VARIANT_PATCH_PATH_MARKER_BYTE_MASK;
+ if (encoded_path.bytes.size() <= byte_pos) {
+ encoded_path.bytes.resize(byte_pos + 1);
+ }
+ if (encoded_path.bytes[byte_pos].has_value() &&
*encoded_path.bytes[byte_pos] != byte) {
+ return Status::InternalError("Conflicting VARIANT patch path
marker byte for column {}",
+ variant_col_unique_id);
+ }
+ encoded_path.bytes[byte_pos] = byte;
+ }
+
+ for (const auto& [_, encoded_path] : encoded_paths) {
+ if (!encoded_path.length.has_value()) {
+ if (!encoded_path.bytes.empty()) {
+ return Status::InternalError(
+ "VARIANT patch path marker byte without length for
column {}",
+ variant_col_unique_id);
+ }
+ continue;
+ }
+ if (encoded_path.bytes.size() > *encoded_path.length) {
+ return Status::InternalError(
+ "VARIANT patch path marker byte exceeds length for column
{}",
+ variant_col_unique_id);
+ }
+ std::string encoded_path_key;
+ encoded_path_key.reserve(*encoded_path.length);
+ for (uint64_t i = 0; i < *encoded_path.length; ++i) {
+ if (i >= encoded_path.bytes.size() ||
!encoded_path.bytes[i].has_value()) {
+ return Status::InternalError("Incomplete VARIANT patch path
marker for column {}",
+ variant_col_unique_id);
+ }
+
encoded_path_key.push_back(static_cast<char>(*encoded_path.bytes[i]));
+ }
+ PathInData path;
+ RETURN_IF_ERROR(decode_variant_patch_path_key(encoded_path_key,
&path));
+ paths->insert_or_assign(std::move(encoded_path_key), std::move(path));
+ }
+ return Status::OK();
+}
+
+Status encode_variant_patch_paths(int32_t variant_col_unique_id, const
VariantPatchPathMap& paths,
+ BitmapValue* bitmap) {
+ if (paths.size() > VARIANT_PATCH_PATH_MAX_COUNT) {
+ return Status::NotSupported(
+ "VARIANT flexible partial update supports at most {} patch
paths per row",
+ VARIANT_PATCH_PATH_MAX_COUNT);
+ }
+ const uint64_t max_encoded_bytes = variant_patch_path_max_bytes();
+ for (const auto& [encoded_path_key, _] : paths) {
+ if (encoded_path_key.size() > max_encoded_bytes) {
+ return Status::NotSupported(
Review Comment:
This only caps each encoded path at 4KB, while the row can still contain up
to `VARIANT_PATCH_PATH_MAX_COUNT` paths. A single crafted row with 256 long
sibling paths can therefore emit roughly `256 * 4096` byte markers plus length
markers into the hidden skip bitmap, and every later decode/merge path iterates
those markers during duplicate aggregation, segment write, and publish-conflict
replay. That is a large per-row CPU/memory/storage amplification vector for a
load input that is still within the current checks. Please cap the total
encoded patch-path bytes or total emitted markers per row before adding
markers, not just each individual path length.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]