This is an automated email from the ASF dual-hosted git repository.
ianmcook pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-experiments.git
The following commit(s) were added to refs/heads/main by this push:
new bbbfaf6 Add Rust server and client for `get_simple` example (#8)
bbbfaf6 is described below
commit bbbfaf69d01d423758280e88cd69c7b4f9e03abc
Author: Matthijs Brobbel <[email protected]>
AuthorDate: Mon Mar 11 20:16:14 2024 +0100
Add Rust server and client for `get_simple` example (#8)
* Add Rust server and client for `get_simple` example
* Remove and ignore `Cargo.lock`
* Fix get request in client
* Wrap span around the full example
* Re-use the `BufReader`
* Handle chunked transfer encoding in client
* Apply suggestions from code review
Co-authored-by: Ian Cook <[email protected]>
---------
Co-authored-by: Ian Cook <[email protected]>
---
http/get_simple/rs/.gitignore | 19 +++++
http/get_simple/rs/Cargo.toml | 27 +++++++
http/get_simple/rs/client/Cargo.toml | 26 +++++++
http/get_simple/rs/client/README.md | 34 +++++++++
http/get_simple/rs/client/src/main.rs | 106 ++++++++++++++++++++++++++
http/get_simple/rs/server/Cargo.toml | 31 ++++++++
http/get_simple/rs/server/README.md | 34 +++++++++
http/get_simple/rs/server/src/main.rs | 135 ++++++++++++++++++++++++++++++++++
8 files changed, 412 insertions(+)
diff --git a/http/get_simple/rs/.gitignore b/http/get_simple/rs/.gitignore
new file mode 100644
index 0000000..95a3a86
--- /dev/null
+++ b/http/get_simple/rs/.gitignore
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+/target
+Cargo.lock
diff --git a/http/get_simple/rs/Cargo.toml b/http/get_simple/rs/Cargo.toml
new file mode 100644
index 0000000..f8670a8
--- /dev/null
+++ b/http/get_simple/rs/Cargo.toml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[workspace]
+resolver = "2"
+members = ["client", "server"]
+
+[workspace.dependencies]
+arrow-array = "50.0.0"
+arrow-ipc = "50.0.0"
+arrow-schema = "50.0.0"
+tracing = "0.1.40"
+tracing-subscriber = "0.3.18"
diff --git a/http/get_simple/rs/client/Cargo.toml
b/http/get_simple/rs/client/Cargo.toml
new file mode 100644
index 0000000..1eec1bd
--- /dev/null
+++ b/http/get_simple/rs/client/Cargo.toml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "client"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+arrow-ipc.workspace = true
+tracing.workspace = true
+tracing-subscriber.workspace = true
diff --git a/http/get_simple/rs/client/README.md
b/http/get_simple/rs/client/README.md
new file mode 100644
index 0000000..caeeab1
--- /dev/null
+++ b/http/get_simple/rs/client/README.md
@@ -0,0 +1,34 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# HTTP GET Arrow Data: Simple Rust Client Example
+
+This directory contains a minimal example of an HTTP client implemented in
Rust. The client:
+
+1. Sends an HTTP GET request to a server.
+2. Receives an HTTP 200 response from the server, with the response body
containing an Arrow IPC stream of record batches.
+3. Adds the record batches to a list as they are received.
+
+To run this example, first start one of the server examples in the parent
directory, then:
+
+```sh
+cargo r --release
+```
+> [!NOTE]
+> This client example implements low-level HTTP/1.1 details directly, instead
of using an HTTP library. We intend to update the example to use
[hyper](https://docs.rs/hyper/latest/hyper/) after [arrow-rs has an async Arrow
IPC reader](https://github.com/apache/arrow-rs/issues/1207)).
diff --git a/http/get_simple/rs/client/src/main.rs
b/http/get_simple/rs/client/src/main.rs
new file mode 100644
index 0000000..7e593b4
--- /dev/null
+++ b/http/get_simple/rs/client/src/main.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_ipc::reader::StreamReader;
+use std::{
+ io::{BufRead, BufReader, Read, Write},
+ net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
+};
+use tracing::{error, info, info_span};
+use tracing_subscriber::fmt::format::FmtSpan;
+
+fn main() {
+ // Configure tracing subscriber.
+ tracing_subscriber::fmt()
+ .with_span_events(FmtSpan::CLOSE)
+ .init();
+
+ info_span!("get_simple").in_scope(|| {
+ // Connect to server.
+ let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
+ match TcpStream::connect(addr) {
+ Ok(mut stream) => {
+ info_span!("Reading Arrow IPC stream", %addr).in_scope(|| {
+ info!("Connected");
+
+ // Send request.
+ stream
+ .write_all(format!("GET / HTTP/1.1\r\nHost:
{addr}\r\n\r\n").as_bytes())
+ .unwrap();
+
+ // Ignore response header.
+ let mut reader = BufReader::new(&mut stream);
+ let mut chunked = false;
+ loop {
+ let mut line = String::default();
+ reader.read_line(&mut line).unwrap();
+ if let Some(("transfer-encoding", "chunked")) = line
+ .to_lowercase()
+ .split_once(':')
+ .map(|(key, value)| (key.trim(), value.trim()))
+ {
+ chunked = true;
+ }
+ if line == "\r\n" {
+ break;
+ }
+ }
+
+ // Read Arrow IPC stream
+ let batches: Vec<_> = if chunked {
+ let mut buffer = Vec::default();
+ loop {
+ // Chunk size
+ let mut line = String::default();
+ reader.read_line(&mut line).unwrap();
+ let chunk_size = u64::from_str_radix(line.trim(),
16).unwrap();
+
+ if chunk_size == 0 {
+ // Terminating chunk
+ break;
+ } else {
+ // Append chunk to buffer
+ let mut chunk_reader = reader.take(chunk_size);
+ chunk_reader.read_to_end(&mut buffer).unwrap();
+ // Terminating CR-LF sequence
+ reader = chunk_reader.into_inner();
+ reader.read_line(&mut
String::default()).unwrap();
+ }
+ }
+ StreamReader::try_new_unbuffered(buffer.as_slice(),
None)
+ .unwrap()
+ .flat_map(Result::ok)
+ .collect()
+ } else {
+ StreamReader::try_new_unbuffered(reader, None)
+ .unwrap()
+ .flat_map(Result::ok)
+ .collect()
+ };
+
+ info!(
+ batches = batches.len(),
+ rows = batches.iter().map(|rb|
rb.num_rows()).sum::<usize>()
+ );
+ });
+ }
+ Err(error) => {
+ error!(%error, "Connection failed")
+ }
+ }
+ })
+}
diff --git a/http/get_simple/rs/server/Cargo.toml
b/http/get_simple/rs/server/Cargo.toml
new file mode 100644
index 0000000..f961a4c
--- /dev/null
+++ b/http/get_simple/rs/server/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "server"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+arrow-array.workspace = true
+arrow-ipc.workspace = true
+arrow-schema.workspace = true
+once_cell = "1.19.0"
+rand = "0.8.5"
+rayon = "1.9.0"
+tracing.workspace = true
+tracing-subscriber.workspace = true
diff --git a/http/get_simple/rs/server/README.md
b/http/get_simple/rs/server/README.md
new file mode 100644
index 0000000..70ac8f7
--- /dev/null
+++ b/http/get_simple/rs/server/README.md
@@ -0,0 +1,34 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# HTTP GET Arrow Data: Simple Rust Server Example
+
+This directory contains a minimal example of an HTTP server implemented in
Rust. The server:
+
+1. Creates a list of record batches and populates it with synthesized data.
+2. Listens for HTTP requests from clients.
+3. Upon receiving a request, sends an HTTP 200 response with the body
containing an Arrow IPC stream of record batches.
+
+To run this example:
+
+```sh
+cargo r --release
+```
+> [!NOTE]
+> This server example implements low-level HTTP/1.1 details directly, instead
of using an HTTP library. We intend to update the example to use
[hyper](https://docs.rs/hyper/latest/hyper/) after [arrow-rs has an async Arrow
IPC writer](https://github.com/apache/arrow-rs/issues/1207)).
diff --git a/http/get_simple/rs/server/src/main.rs
b/http/get_simple/rs/server/src/main.rs
new file mode 100644
index 0000000..be3d981
--- /dev/null
+++ b/http/get_simple/rs/server/src/main.rs
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ io::{BufRead, BufReader, Result, Write},
+ net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
+ sync::Arc,
+ thread,
+};
+
+use arrow_array::{Int64Array, RecordBatch};
+use arrow_ipc::writer::StreamWriter;
+use arrow_schema::{DataType, Field, Fields, Schema};
+use once_cell::sync::Lazy;
+use rand::{distributions::Standard, prelude::*};
+use rayon::{iter, prelude::*};
+use tracing::{error, info, info_span};
+use tracing_subscriber::fmt::format::FmtSpan;
+
+const RECORDS_PER_BATCH: usize = 4096;
+const TOTAL_RECORDS: usize = if cfg!(debug_assertions) {
+ 100_000
+} else {
+ 100_000_000
+};
+
+/// Schema for random data
+static SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
+ Arc::new(Schema::new(
+ ('a'..='d')
+ .map(|field_name| Field::new(field_name, DataType::Int64, true))
+ .collect::<Fields>(),
+ ))
+});
+
+/// Random data
+static DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
+ info_span!("data", TOTAL_RECORDS, RECORDS_PER_BATCH).in_scope(|| {
+ info!("Generating random data");
+ // Generate recordbatches with random data
+ iter::repeatn(
+ RECORDS_PER_BATCH,
+ TOTAL_RECORDS.div_euclid(RECORDS_PER_BATCH),
+ )
+ .chain(iter::once(TOTAL_RECORDS.rem_euclid(RECORDS_PER_BATCH)))
+ .map_init(rand::thread_rng, |rng, len| {
+ RecordBatch::try_new(
+ Arc::clone(&SCHEMA),
+ (0..SCHEMA.all_fields().len())
+ .map(|_| {
+ Arc::new(
+ rng.sample_iter::<i64, Standard>(Standard)
+ .take(len)
+ .collect::<Int64Array>(),
+ ) as _
+ })
+ .collect(),
+ )
+ })
+ .flatten()
+ .collect()
+ })
+});
+
+fn get_simple(mut stream: std::net::TcpStream) {
+ info!("Incoming connection");
+
+ // Ignore incoming request.
+ for _ in BufReader::new(&mut stream)
+ .lines()
+ .take_while(|line| line.as_ref().is_ok_and(|line| !line.is_empty()))
+ {}
+
+ // Write response header.
+ stream
+ .write_all(
+ "HTTP/1.1 200 OK\r\ncontent-type:
application/vnd.apache.arrow.stream\r\n\r\n"
+ .as_bytes(),
+ )
+ .unwrap();
+
+ // Stream the body.
+ let mut writer = StreamWriter::try_new(stream, &SCHEMA).unwrap();
+ for batch in DATA.iter() {
+ writer.write(batch).unwrap();
+ }
+ writer.finish().unwrap();
+
+ let stream = writer.into_inner().unwrap();
+ stream.shutdown(std::net::Shutdown::Both).unwrap();
+}
+
+fn main() -> Result<()> {
+ // Configure tracing subscriber.
+ tracing_subscriber::fmt()
+ .with_span_events(FmtSpan::CLOSE)
+ .init();
+
+ // Generate random data.
+ let _ = Lazy::force(&DATA);
+
+ // Start listening.
+ let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
+ let listener = TcpListener::bind(bind_addr)?;
+ info!(%bind_addr, "Listening");
+
+ // Handle incoming connections.
+ loop {
+ match listener.accept() {
+ Ok((stream, remote_peer)) => {
+ thread::spawn(move || {
+ info_span!("Writing Arrow IPC stream", %remote_peer)
+ .in_scope(|| get_simple(stream))
+ });
+ }
+ Err(error) => {
+ error!(%error, "Connection failed");
+ }
+ }
+ }
+}