Copilot commented on code in PR #1628: URL: https://github.com/apache/horaedb/pull/1628#discussion_r2459238007
########## src/remote_write/src/repeated_field.rs: ########## @@ -0,0 +1,530 @@ +// Copyright (c) 2019 Stepan Koltsov +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +// OR OTHER DEALINGS IN THE SOFTWARE. + +/// ! The [Clear] trait and [RepeatedField] are taken from [rust-protobuf](https://github.com/stepancheg/rust-protobuf/tree/master/protobuf-examples/vs-prost) +/// to leverage the pooling mechanism to avoid frequent heap +/// allocation/deallocation when decoding deeply nested structs. +use std::borrow::Borrow; +use std::{ + cmp::Ordering, + default::Default, + fmt, + hash::{Hash, Hasher}, + iter::{FromIterator, IntoIterator}, + ops::{Deref, DerefMut, Index, IndexMut}, + slice, vec, +}; + +use bytes::Bytes; + +/// anything that can be cleared +pub trait Clear { + /// Clear this make, make it equivalent to newly created object. + fn clear(&mut self); +} + +impl<T> Clear for Option<T> { + fn clear(&mut self) { Review Comment: The `Clear` trait documentation mentions 'make it equivalent to newly created object' but the implementation for `RepeatedField` at line 82 only sets `len` to 0 without clearing the actual data in the backing `Vec`. While this is correct for the pooling use case, the trait documentation should clarify that 'clear' means resetting to a logically empty state, not necessarily deallocating or zeroing memory. ########## src/benchmarks/build.rs: ########## @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{env, fs, path::PathBuf, process::Command}; + +fn main() { + // Similar to prost, we generate rust-protobuf and quick-protobuf code to + // OUT_DIR instead of the src directory. + let proto_path = "../pb_types/protos/remote_write.proto"; + let include_path = "../pb_types/protos"; + let out_dir = env::var("OUT_DIR").unwrap(); + let out_dir_path = PathBuf::from(&out_dir); + + // Generate rust-protobuf code to OUT_DIR. + protobuf_codegen::Codegen::new() + .pure() + .out_dir(&out_dir) + .input(proto_path) + .include(include_path) + .run() + .expect("rust-protobuf code generation failed"); + + // Rename rust-protobuf generated file to avoid potential conflicts. + let src_file = out_dir_path.join("remote_write.rs"); + let dst_file = out_dir_path.join("rust_protobuf_remote_write.rs"); + fs::rename(&src_file, &dst_file).expect("rust-protobuf file rename failed"); + + // Generate quick-protobuf code to OUT_DIR using pb-rs command line tool. + let quick_protobuf_file = out_dir_path.join("quick_protobuf_remote_write.rs"); + let output = Command::new("pb-rs") + .args([ + "-I", + include_path, + "-o", + quick_protobuf_file.to_str().unwrap(), + "-s", + proto_path, + ]) + .output() + .expect("pb-rs command execution failed"); + + if !output.status.success() { + panic!( + "pb-rs command execution failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + + // Fix package namespace conflicts and inner attributes using sed. + let _ = Command::new("sed") Review Comment: The build script silently ignores errors from `sed` commands by using `let _`. If sed fails, the generated code may be incorrect but the build continues. Consider propagating these errors or at least logging warnings when sed commands fail. ########## src/remote_write/src/pb_reader.rs: ########## @@ -0,0 +1,644 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::{Error, ErrorKind, Result}; + +use bytes::{Buf, Bytes}; + +use crate::pooled_types::{ + PooledExemplar, PooledLabel, PooledMetricMetadata, PooledMetricType, PooledSample, + PooledTimeSeries, PooledWriteRequest, +}; + +const WIRE_TYPE_VARINT: u8 = 0; +const WIRE_TYPE_64BIT: u8 = 1; +const WIRE_TYPE_LENGTH_DELIMITED: u8 = 2; + +const FIELD_NUM_TIMESERIES: u32 = 1; +const FIELD_NUM_METADATA: u32 = 3; +const FIELD_NUM_LABELS: u32 = 1; +const FIELD_NUM_SAMPLES: u32 = 2; +const FIELD_NUM_EXEMPLARS: u32 = 3; +const FIELD_NUM_LABEL_NAME: u32 = 1; +const FIELD_NUM_LABEL_VALUE: u32 = 2; +const FIELD_NUM_SAMPLE_VALUE: u32 = 1; +const FIELD_NUM_SAMPLE_TIMESTAMP: u32 = 2; +const FIELD_NUM_EXEMPLAR_LABELS: u32 = 1; +const FIELD_NUM_EXEMPLAR_VALUE: u32 = 2; +const FIELD_NUM_EXEMPLAR_TIMESTAMP: u32 = 3; +const FIELD_NUM_METADATA_TYPE: u32 = 1; +const FIELD_NUM_METADATA_FAMILY_NAME: u32 = 2; +const FIELD_NUM_METADATA_HELP: u32 = 4; +const FIELD_NUM_METADATA_UNIT: u32 = 5; + +// Taken from https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs under Apache License 2.0. +#[allow(dead_code)] +#[inline(always)] +unsafe fn copy_to_bytes(data: &mut Bytes, len: usize) -> Bytes { + if len == data.remaining() { + std::mem::replace(data, Bytes::new()) + } else { + let ret = unsafe { split_to_unsafe(data, len) }; + data.advance(len); + ret + } +} + +// Taken from https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs under Apache License 2.0. +#[allow(dead_code)] +#[inline(always)] +pub unsafe fn split_to_unsafe(buf: &Bytes, end: usize) -> Bytes { + let len = buf.len(); + assert!( + end <= len, + "range end out of bounds: {:?} <= {:?}", + end, + len, + ); + + if end == 0 { + return Bytes::new(); + } + + let ptr = buf.as_ptr(); + // `Bytes::drop` does nothing when it's built via `from_static`. + use std::slice; + Bytes::from_static(unsafe { slice::from_raw_parts(ptr, end) }) +} + +pub struct ProtobufReader { + data: Bytes, +} + +impl ProtobufReader { + pub fn new(data: Bytes) -> Self { + Self { data } + } + + pub fn remaining(&self) -> usize { + self.data.remaining() + } + + /// Read a varint from the buffer. + /// + /// Similar to [quick-protobuf](https://github.com/tafia/quick-protobuf), unroll the loop in + /// [the official Go implementation](https://cs.opensource.google/go/go/+/refs/tags/go1.24.5:src/encoding/binary/varint.go;l=68) + /// for better performance. + #[inline(always)] + pub fn read_varint(&mut self) -> Result<u64> { + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + // First byte. + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(b as u64); + } + let mut x = (b & 0x7f) as u64; + // Second byte. + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 7)); + } + x |= ((b & 0x7f) as u64) << 7; + // Third byte. + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 14)); + } + x |= ((b & 0x7f) as u64) << 14; + // Fourth byte. + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 21)); + } + x |= ((b & 0x7f) as u64) << 21; + // Fifth byte. + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 28)); + } + x |= ((b & 0x7f) as u64) << 28; + // Sixth byte. + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 35)); + } + x |= ((b & 0x7f) as u64) << 35; + // Seventh byte. + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 42)); + } + x |= ((b & 0x7f) as u64) << 42; + // Eighth byte. + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 49)); + } + x |= ((b & 0x7f) as u64) << 49; + // Ninth byte. + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b < 0x80 { + return Ok(x | ((b as u64) << 56)); + } + x |= ((b & 0x7f) as u64) << 56; + // Tenth byte (final byte, must terminate). + if !self.data.has_remaining() { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for varint", + )); + } + let b = self.data.get_u8(); + if b >= 0x80 { + return Err(Error::new(ErrorKind::InvalidData, "varint overflow")); + } + if b > 1 { + return Err(Error::new(ErrorKind::InvalidData, "varint overflow")); + } + Ok(x | ((b as u64) << 63)) + } + + /// Read a double from the buffer. + #[inline(always)] + pub fn read_double(&mut self) -> Result<f64> { + if self.data.remaining() < 8 { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for double", + )); + } + // In Protobuf, double is encoded as 64-bit. + let bits = self.data.get_u64_le(); + Ok(f64::from_bits(bits)) + } + + /// Read a 64-bit integer from the buffer. + #[inline(always)] + pub fn read_int64(&mut self) -> Result<i64> { + // In Protobuf, int64 is encoded as varint. + self.read_varint().map(|v| v as i64) + } + + /// Read a string from the buffer. + pub fn read_string(&mut self) -> Result<Bytes> { + let len = self.read_varint()? as usize; + if self.data.remaining() < len { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for string", + )); + } + // In Protobuf, string is encoded as length-delimited UTF-8 bytes. + #[cfg(feature = "unsafe-split")] + let bytes = unsafe { copy_to_bytes(&mut self.data, len) }; + #[cfg(not(feature = "unsafe-split"))] + let bytes = self.data.split_to(len); + // Leave the responsibility of validating UTF-8 to the caller, + // which is the practice of both [easyproto](https://github.com/VictoriaMetrics/easyproto) + // and [prom-write-request-bench](https://github.com/v0y4g3r/prom-write-request-bench). + Ok(bytes) + } + + /// Read a tag from the buffer. + #[inline(always)] + pub fn read_tag(&mut self) -> Result<(u32, u8)> { + // In Protobuf, tag is encoded as varint. + // tag = (field_number << 3) | wire_type. + let tag = self.read_varint()?; + let field_number = tag >> 3; + let wire_type = tag & 0x07; + Ok((field_number as u32, wire_type as u8)) + } + + /// Read timeseries from the buffer. + #[inline(always)] + pub fn read_timeseries(&mut self, timeseries: &mut PooledTimeSeries) -> Result<()> { + let len = self.read_varint()? as usize; + if self.data.remaining() < len { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for timeseries", + )); + } + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_LABELS => { + validate_wire_type(wire_type, WIRE_TYPE_LENGTH_DELIMITED, "labels")?; + let label_ref = timeseries.labels.push_default(); + self.read_label(label_ref)?; + } + FIELD_NUM_SAMPLES => { + validate_wire_type(wire_type, WIRE_TYPE_LENGTH_DELIMITED, "samples")?; + let sample_ref = timeseries.samples.push_default(); + self.read_sample(sample_ref)?; + } + FIELD_NUM_EXEMPLARS => { + validate_wire_type(wire_type, WIRE_TYPE_LENGTH_DELIMITED, "exemplars")?; + let exemplar_ref = timeseries.exemplars.push_default(); + self.read_exemplar(exemplar_ref)?; + } + _ => { + // Skip unknown fields instead of returning an error + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Read label from the buffer. + #[inline(always)] + pub fn read_label(&mut self, label: &mut PooledLabel) -> Result<()> { + let len = self.read_varint()? as usize; + if self.data.remaining() < len { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for label", + )); + } + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_LABEL_NAME => { + validate_wire_type(wire_type, WIRE_TYPE_LENGTH_DELIMITED, "label name")?; + label.name = self.read_string()?; + } + FIELD_NUM_LABEL_VALUE => { + validate_wire_type(wire_type, WIRE_TYPE_LENGTH_DELIMITED, "label value")?; + label.value = self.read_string()?; + } + _ => { + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Read sample from the buffer. + #[inline(always)] + pub fn read_sample(&mut self, sample: &mut PooledSample) -> Result<()> { + let len = self.read_varint()? as usize; + if self.data.remaining() < len { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for sample", + )); + } + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_SAMPLE_VALUE => { + validate_wire_type(wire_type, WIRE_TYPE_64BIT, "sample value")?; + sample.value = self.read_double()?; + } + FIELD_NUM_SAMPLE_TIMESTAMP => { + validate_wire_type(wire_type, WIRE_TYPE_VARINT, "sample timestamp")?; + sample.timestamp = self.read_int64()?; + } + _ => { + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Read exemplar from the buffer. + #[inline(always)] + pub fn read_exemplar(&mut self, exemplar: &mut PooledExemplar) -> Result<()> { + let len = self.read_varint()? as usize; + if self.data.remaining() < len { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for exemplar", + )); + } + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_EXEMPLAR_LABELS => { + validate_wire_type(wire_type, WIRE_TYPE_LENGTH_DELIMITED, "exemplar labels")?; + let label_ref = exemplar.labels.push_default(); + self.read_label(label_ref)?; + } + FIELD_NUM_EXEMPLAR_VALUE => { + validate_wire_type(wire_type, WIRE_TYPE_64BIT, "exemplar value")?; + exemplar.value = self.read_double()?; + } + FIELD_NUM_EXEMPLAR_TIMESTAMP => { + validate_wire_type(wire_type, WIRE_TYPE_VARINT, "exemplar timestamp")?; + exemplar.timestamp = self.read_int64()?; + } + _ => { + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Read metric metadata from the buffer. + #[inline(always)] + pub fn read_metric_metadata(&mut self, metadata: &mut PooledMetricMetadata) -> Result<()> { + let len = self.read_varint()? as usize; + if self.data.remaining() < len { + return Err(Error::new( + ErrorKind::UnexpectedEof, + "not enough bytes for metadata", + )); + } + let start_remaining = self.data.remaining(); + let end_remaining = start_remaining - len; + while self.data.remaining() > end_remaining { + let (field_number, wire_type) = self.read_tag()?; + match field_number { + FIELD_NUM_METADATA_TYPE => { + validate_wire_type(wire_type, WIRE_TYPE_VARINT, "metadata type")?; + let type_value = self.read_varint()? as i32; + metadata.metric_type = match type_value { + 0 => PooledMetricType::Unknown, + 1 => PooledMetricType::Counter, + 2 => PooledMetricType::Gauge, + 3 => PooledMetricType::Histogram, + 4 => PooledMetricType::GaugeHistogram, + 5 => PooledMetricType::Summary, + 6 => PooledMetricType::Info, + 7 => PooledMetricType::StateSet, + _ => PooledMetricType::Unknown, + }; + } + FIELD_NUM_METADATA_FAMILY_NAME => { + validate_wire_type( + wire_type, + WIRE_TYPE_LENGTH_DELIMITED, + "metadata family name", + )?; + metadata.metric_family_name = self.read_string()?; + } + FIELD_NUM_METADATA_HELP => { + validate_wire_type(wire_type, WIRE_TYPE_LENGTH_DELIMITED, "metadata help")?; + metadata.help = self.read_string()?; + } + FIELD_NUM_METADATA_UNIT => { + validate_wire_type(wire_type, WIRE_TYPE_LENGTH_DELIMITED, "metadata unit")?; + metadata.unit = self.read_string()?; + } + _ => { + self.skip_field(wire_type)?; + } + } + } + Ok(()) + } + + /// Skip an unknown field based on its wire type. + #[inline(always)] + pub fn skip_field(&mut self, wire_type: u8) -> Result<()> { + match wire_type { + WIRE_TYPE_VARINT => { + // For varint, read and discard the value.. Review Comment: Corrected double period '..' to single period '.'. ```suggestion // For varint, read and discard the value. ``` ########## src/benchmarks/remote_write_memory_bench.py: ########## @@ -0,0 +1,240 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#!/usr/bin/env python3 Review Comment: The shebang line appears after the license header rather than as the first line of the file. For proper script execution, the shebang should be the very first line. Move line 18 to line 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
