hubcio commented on code in PR #2613: URL: https://github.com/apache/iggy/pull/2613#discussion_r2729002688
########## examples/rust/src/message-headers/message-compression/README.md: ########## @@ -0,0 +1,276 @@ +# An Example on Message Compression in Transit + +This example illustrates how (with some minor additional implementation) the Iggy SDK can be used to compress and decompress messages in transit. + +## Running the Example + +Details on how to run the examples for the Rust Iggy SDK can be found in the parent folder [README.md](https://github.com/apache/iggy/tree/master/examples/rust#readme). + +Run the following commands + +1. Set root credentials in your environment + + ```bash + export IGGY_ROOT_USERNAME=iggy + export IGGY_ROOT_PASSWORD=iggy + ``` + +1. Start the server + + ```bash + cargo run --bin iggy-server -- --with-default-root-credentials + ``` + + **NOTE**: In case the server was running before, make sure to run `rm -rf local_data/` to delete server state data from prior runs. + +1. Run the producer to write compressed messages to the server Review Comment: all points are starting with `1.` ########## examples/rust/src/message-headers/message-compression/README.md: ########## @@ -0,0 +1,276 @@ +# An Example on Message Compression in Transit + +This example illustrates how (with some minor additional implementation) the Iggy SDK can be used to compress and decompress messages in transit. + +## Running the Example + +Details on how to run the examples for the Rust Iggy SDK can be found in the parent folder [README.md](https://github.com/apache/iggy/tree/master/examples/rust#readme). + +Run the following commands + +1. Set root credentials in your environment + + ```bash + export IGGY_ROOT_USERNAME=iggy + export IGGY_ROOT_PASSWORD=iggy + ``` Review Comment: you can skip these if you start server with `--with-default-root-credentials` (which you do) ########## examples/rust/src/message-headers/message-compression/producer/main.rs: ########## @@ -0,0 +1,103 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use bytes::Bytes; +use iggy::prelude::*; +use std::collections::HashMap; +// The compression and decompression utilities are shared between the producer and consumer compression examples. +// Hence, we import them here. +use iggy_examples::shared::codec::{Codec, NUM_MESSAGES, STREAM_NAME, TOPIC_NAME}; + +#[tokio::main] +async fn main() -> Result<(), IggyError> { + // Setup a client to connect to the iggy-server via TCP. + let client = IggyClientBuilder::new().with_tcp().build()?; + client.connect().await?; + + // Login using default credentials. + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await?; + + // Create a Stream. + client + .create_stream(STREAM_NAME) + .await + .expect("Stream was NOT created! Start a fresh server to run this example."); Review Comment: you can mention `--fresh` argument to `iggy-server` binary ########## examples/rust/src/message-headers/message-compression/README.md: ########## @@ -0,0 +1,276 @@ +# An Example on Message Compression in Transit + +This example illustrates how (with some minor additional implementation) the Iggy SDK can be used to compress and decompress messages in transit. + +## Running the Example + +Details on how to run the examples for the Rust Iggy SDK can be found in the parent folder [README.md](https://github.com/apache/iggy/tree/master/examples/rust#readme). + +Run the following commands + +1. Set root credentials in your environment + + ```bash + export IGGY_ROOT_USERNAME=iggy + export IGGY_ROOT_PASSWORD=iggy + ``` + +1. Start the server + + ```bash + cargo run --bin iggy-server -- --with-default-root-credentials + ``` + + **NOTE**: In case the server was running before, make sure to run `rm -rf local_data/` to delete server state data from prior runs. + +1. Run the producer to write compressed messages to the server + + ```bash + cargo run --example message-headers-compression-producer + ``` + +1. Run the consumer to read and decompress messages from the server + + ```bash + cargo run --example message-headers-compression-consumer + ``` + +## The Codec + +The **co**mpression and **dec**compression utilities are implemented in `examples/rust/src/shared/codec.rs` and used when sending messages to the server and reading them from the server. + +First, define a stream and a topic name. +The producer will first initiate the stream and the topic on that stream and then write the example messages to that topic wihtin that stream. Review Comment: typo: `within` ########## examples/rust/src/shared/codec.rs: ########## @@ -0,0 +1,107 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy::prelude::*; +use lz4_flex::frame::{FrameDecoder, FrameEncoder}; +use std::fmt::{Display, Formatter}; +use std::io::{Read, Write}; +use std::str::FromStr; + +pub const STREAM_NAME: &str = "compression-stream"; +pub const TOPIC_NAME: &str = "compression-topic"; +pub const COMPRESSION_HEADER_KEY: &str = "iggy-compression"; +pub const NUM_MESSAGES: u32 = 1000; + +// Codec that defines available compression algorithms. +pub enum Codec { + None, + Lz4, +} + +impl Display for Codec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Codec::None => write!(f, "none"), + Codec::Lz4 => write!(f, "lz4"), + } + } +} + +impl FromStr for Codec { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "lz4" => Ok(Codec::Lz4), + "none" => Ok(Codec::None), + _ => Err(format!("Unknown compression type: {s}")), + } + } +} + +impl Codec { + /// Returns the key to indicate compressed messages as HeaderKey. + pub fn header_key() -> HeaderKey { + HeaderKey::new(COMPRESSION_HEADER_KEY) + .expect("COMPRESSION_HEADER_KEY is an InvalidHeaderKey.") + } + + /// Returns the compression algorithm type as HeaderValue. + pub fn to_header_value(&self) -> HeaderValue { + HeaderValue::from_str(&self.to_string()).expect("failed generating HeaderValue.") + } + + /// Returns a Codec from a HeaderValue. Used when reading messages from the server. + pub fn from_header_value(value: &HeaderValue) -> Self { + let name = value + .as_str() + .expect("could not convert HeaderValue into str."); + Self::from_str(name).expect("compression algorithm not available.") + } + + /// Takes a message payload and compresses it using the algorithm from Codec. + pub fn compress(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> { + match self { + Codec::None => Ok(data.to_vec()), + Codec::Lz4 => { + let mut compressed_data = Vec::new(); + let mut encoder = FrameEncoder::new(&mut compressed_data); + encoder + .write_all(data) + .expect("Cannot write into buffer using Lz4 compression."); + encoder.finish().expect("Cannot finish Lz4 compression."); + Ok(compressed_data) + } + } + } Review Comment: this functions returns Result but in fact it never does because `expect()` is used either simplify and return `Vec<u8>` (which it totally fine for example or propagate errors above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
