This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partitions in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 556018ddcb7d4eeb62b684e4264241bc7aebe671 Author: numinex <[email protected]> AuthorDate: Fri Feb 13 14:56:25 2026 +0100 temp ve3 --- core/common/src/types/consensus/header.rs | 178 ++++++++++++++++++------------ core/consensus/src/impls.rs | 2 - core/metadata/src/impls/metadata.rs | 7 +- core/partitions/src/iggy_partitions.rs | 7 +- core/simulator/src/client.rs | 8 +- 5 files changed, 112 insertions(+), 90 deletions(-) diff --git a/core/common/src/types/consensus/header.rs b/core/common/src/types/consensus/header.rs index bab1812c6..6f4f809af 100644 --- a/core/common/src/types/consensus/header.rs +++ b/core/common/src/types/consensus/header.rs @@ -18,10 +18,10 @@ use bytemuck::{Pod, Zeroable}; use thiserror::Error; -pub struct Header {} - +const HEADER_SIZE: usize = 256; pub trait ConsensusHeader: Sized + Pod + Zeroable { const COMMAND: Command2; + const _SIZE_CHECK: () = assert!(std::mem::size_of::<Self>() == HEADER_SIZE); fn validate(&self) -> Result<(), ConsensusError>; fn size(&self) -> u32; @@ -129,13 +129,11 @@ pub struct GenericHeader { pub checksum_body: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 66], pub namespace: u64, pub reserved_command: [u8; 120], @@ -156,6 +154,8 @@ impl ConsensusHeader for GenericHeader { } } +const _: () = assert!(std::mem::size_of::<GenericHeader>() == 256); + #[repr(C)] #[derive(Debug, Clone, Copy)] @@ -164,44 +164,42 @@ pub struct RequestHeader { pub checksum_body: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 66], pub client: u128, pub request_checksum: u128, pub timestamp: u64, pub request: u64, pub operation: Operation, + pub operation_padding: [u8; 7], pub namespace: u64, - pub reserved: [u8; 87], + pub reserved: [u8; 64], } impl Default for RequestHeader { fn default() -> Self { Self { - reserved: [0; 87], checksum: 0, checksum_body: 0, cluster: 0, size: 0, - epoch: 0, view: 0, release: 0, - protocol: 0, command: Default::default(), replica: 0, - reserved_frame: [0; 12], + reserved_frame: [0; 66], client: 0, request_checksum: 0, timestamp: 0, request: 0, operation: Default::default(), + operation_padding: [0; 7], namespace: 0, + reserved: [0; 64], } } } @@ -229,32 +227,29 @@ impl ConsensusHeader for RequestHeader { // TODO: Manually impl default (and use a const for the `release`) #[repr(C)] -#[derive(Default, Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy)] pub struct PrepareHeader { pub checksum: u128, pub checksum_body: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 66], pub client: u128, pub parent: u128, - pub parent_padding: u128, pub request_checksum: u128, - pub request_checksum_padding: u128, pub op: u64, pub commit: u64, pub timestamp: u64, pub request: u64, pub operation: Operation, + pub operation_padding: [u8; 7], pub namespace: u64, - pub reserved: [u8; 11], + pub reserved: [u8; 32], } unsafe impl Pod for PrepareHeader {} @@ -270,12 +265,6 @@ impl ConsensusHeader for PrepareHeader { found: self.command, }); } - if self.parent_padding != 0 { - return Err(ConsensusError::PrepareParentPaddingNonZero); - } - if self.request_checksum_padding != 0 { - return Err(ConsensusError::PrepareRequestChecksumPaddingNonZero); - } Ok(()) } @@ -284,33 +273,57 @@ impl ConsensusHeader for PrepareHeader { } } +impl Default for PrepareHeader { + fn default() -> Self { + Self { + checksum: 0, + checksum_body: 0, + cluster: 0, + size: 0, + view: 0, + release: 0, + command: Default::default(), + replica: 0, + reserved_frame: [0; 66], + client: 0, + parent: 0, + request_checksum: 0, + op: 0, + commit: 0, + timestamp: 0, + request: 0, + operation: Default::default(), + operation_padding: [0; 7], + namespace: 0, + reserved: [0; 32], + } + } +} + // TODO: Manually impl default (and use a const for the `release`) #[repr(C)] -#[derive(Default, Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy)] pub struct PrepareOkHeader { pub checksum: u128, pub checksum_body: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 66], pub parent: u128, - pub parent_padding: u128, pub prepare_checksum: u128, - pub prepare_checksum_padding: u128, pub op: u64, pub commit: u64, pub timestamp: u64, pub request: u64, pub operation: Operation, + pub operation_padding: [u8; 7], pub namespace: u64, - pub reserved: [u8; 11], + pub reserved: [u8; 48], } unsafe impl Pod for PrepareOkHeader {} @@ -326,12 +339,6 @@ impl ConsensusHeader for PrepareOkHeader { found: self.command, }); } - if self.parent_padding != 0 { - return Err(ConsensusError::PrepareParentPaddingNonZero); - } - if self.prepare_checksum_padding != 0 { - return Err(ConsensusError::PrepareRequestChecksumPaddingNonZero); - } Ok(()) } @@ -340,6 +347,32 @@ impl ConsensusHeader for PrepareOkHeader { } } +impl Default for PrepareOkHeader { + fn default() -> Self { + Self { + checksum: 0, + checksum_body: 0, + cluster: 0, + size: 0, + view: 0, + release: 0, + command: Default::default(), + replica: 0, + reserved_frame: [0; 66], + parent: 0, + prepare_checksum: 0, + op: 0, + commit: 0, + timestamp: 0, + request: 0, + operation: Default::default(), + operation_padding: [0; 7], + namespace: 0, + reserved: [0; 48], + } + } +} + #[repr(C)] #[derive(Debug, Clone, Copy)] pub struct CommitHeader { @@ -347,20 +380,18 @@ pub struct CommitHeader { pub checksum_body: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 66], pub commit_checksum: u128, pub timestamp_monotonic: u64, pub commit: u64, pub checkpoint_op: u64, pub namespace: u64, - pub reserved: [u8; 88], + pub reserved: [u8; 80], } unsafe impl Pod for CommitHeader {} @@ -385,30 +416,28 @@ impl ConsensusHeader for CommitHeader { } #[repr(C)] -#[derive(Default, Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy)] pub struct ReplyHeader { pub checksum: u128, pub checksum_body: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 66], pub request_checksum: u128, - pub request_checksum_padding: u128, pub context: u128, - pub context_padding: u128, pub op: u64, pub commit: u64, pub timestamp: u64, pub request: u64, pub operation: Operation, - pub reserved: [u8; 19], + pub operation_padding: [u8; 7], + pub namespace: u64, + pub reserved: [u8; 41], } unsafe impl Pod for ReplyHeader {} @@ -421,12 +450,6 @@ impl ConsensusHeader for ReplyHeader { if self.command != Command2::Reply { return Err(ConsensusError::ReplyInvalidCommand2); } - if self.request_checksum_padding != 0 { - return Err(ConsensusError::ReplyRequestChecksumPaddingNonZero); - } - if self.context_padding != 0 { - return Err(ConsensusError::ReplyContextPaddingNonZero); - } Ok(()) } @@ -435,6 +458,32 @@ impl ConsensusHeader for ReplyHeader { } } +impl Default for ReplyHeader { + fn default() -> Self { + Self { + checksum: 0, + checksum_body: 0, + cluster: 0, + size: 0, + view: 0, + release: 0, + command: Default::default(), + replica: 0, + reserved_frame: [0; 66], + request_checksum: 0, + context: 0, + op: 0, + commit: 0, + timestamp: 0, + request: 0, + operation: Default::default(), + operation_padding: [0; 7], + namespace: 0, + reserved: [0; 41], + } + } +} + /// StartViewChange message header. /// /// Sent by a replica when it suspects the primary has failed. @@ -448,13 +497,11 @@ pub struct StartViewChangeHeader { pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 42], pub reserved: [u8; 128], } @@ -497,27 +544,22 @@ pub struct DoViewChangeHeader { pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 42], /// The highest op-number in this replica's log. /// Used to select the most complete log when log_view values are equal. pub op: u64, - /// The replica's commit number (highest committed op). /// The new primary sets its commit to max(commit) across all DVCs. pub commit: u64, - /// The view number when this replica's status was last normal. /// This is the key field for log selection: the replica with the /// highest log_view has the most authoritative log. pub log_view: u32, - pub reserved: [u8; 108], } @@ -575,23 +617,19 @@ pub struct StartViewHeader { pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, - pub epoch: u32, pub view: u32, pub release: u32, - pub protocol: u16, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 12], + pub reserved_frame: [u8; 42], /// The op-number of the highest entry in the new primary's log. /// Backups set their op to this value. pub op: u64, - /// The commit number. /// This is max(commit) from all DVCs received by the primary. /// Backups set their commit to this value. pub commit: u64, - pub reserved: [u8; 112], } diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index f66fdb8d2..288c40472 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -1095,7 +1095,6 @@ impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for Message *new = PrepareHeader { cluster: consensus.cluster, size: old.size, - epoch: 0, view: consensus.view.get(), release: old.release, command: Command2::Prepare, @@ -1125,7 +1124,6 @@ impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for Messa request: old.request, cluster: consensus.cluster, replica: consensus.replica, - epoch: 0, // TODO: consensus.epoch // It's important to use the view of the replica, not the received prepare! view: consensus.view.get(), op: old.op, diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 30bf7775b..e5e4b01a2 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -204,17 +204,13 @@ where checksum_body: 0, cluster: consensus.cluster(), size: std::mem::size_of::<ReplyHeader>() as u32, - epoch: prepare_header.epoch, view: consensus.view(), release: 0, - protocol: 0, command: Command2::Reply, replica: consensus.replica(), - reserved_frame: [0; 12], + reserved_frame: [0; 66], request_checksum: prepare_header.request_checksum, - request_checksum_padding: 0, context: 0, - context_padding: 0, op: prepare_header.op, commit: consensus.commit(), timestamp: prepare_header.timestamp, @@ -412,7 +408,6 @@ where cluster: consensus.cluster(), replica: consensus.replica(), view: consensus.view(), - epoch: header.epoch, op: header.op, commit: consensus.commit(), timestamp: header.timestamp, diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 998e30dd9..d49ad3407 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -513,17 +513,13 @@ where checksum_body: 0, cluster: consensus.cluster(), size: std::mem::size_of::<ReplyHeader>() as u32, - epoch: prepare_header.epoch, view: consensus.view(), release: 0, - protocol: 0, command: Command2::Reply, replica: consensus.replica(), - reserved_frame: [0; 12], + reserved_frame: [0; 66], request_checksum: prepare_header.request_checksum, - request_checksum_padding: 0, context: 0, - context_padding: 0, op: prepare_header.op, commit: consensus.commit(), timestamp: prepare_header.timestamp, @@ -1020,7 +1016,6 @@ where cluster: consensus.cluster(), replica: consensus.replica(), view: consensus.view(), - epoch: header.epoch, op: header.op, commit: consensus.commit(), timestamp: header.timestamp, diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index a8e5ccbb7..eca8ff9b1 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -111,12 +111,10 @@ impl SimClient { cluster: 0, checksum: 0, checksum_body: 0, - epoch: 0, view: 0, release: 0, - protocol: 0, replica: 0, - reserved_frame: [0; 12], + reserved_frame: [0; 66], client: self.client_id, request_checksum: 0, timestamp: 0, @@ -147,12 +145,10 @@ impl SimClient { cluster: 0, // TODO: Get from config checksum: 0, checksum_body: 0, - epoch: 0, view: 0, release: 0, - protocol: 0, replica: 0, - reserved_frame: [0; 12], + reserved_frame: [0; 66], client: self.client_id, request_checksum: 0, timestamp: 0, // TODO: Use actual timestamp
