This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
The following commit(s) were added to refs/heads/develop by this push:
new f7eaf16 [agent] Introduce file agent (#237)
f7eaf16 is described below
commit f7eaf160322db45ae70a71e371ffdbf27b2111c1
Author: Zhaofeng Chen <[email protected]>
AuthorDate: Tue Mar 10 10:17:53 2020 -0700
[agent] Introduce file agent (#237)
---
cmake/scripts/test.sh | 19 ++-
file_agent/Cargo.toml | 32 +++++
file_agent/src/agent.rs | 259 ++++++++++++++++++++++++++++++++++++
file_agent/src/lib.rs | 23 ++++
tests/scripts/simple_http_server.py | 17 +++
types/src/worker.rs | 26 +---
6 files changed, 353 insertions(+), 23 deletions(-)
diff --git a/cmake/scripts/test.sh b/cmake/scripts/test.sh
index 0b215e5..6993e42 100755
--- a/cmake/scripts/test.sh
+++ b/cmake/scripts/test.sh
@@ -31,7 +31,13 @@ run_unit_tests() {
popd
}
+cleanup() {
+ # kill all background services
+ [[ -z "$(jobs -p)" ]] || kill -s SIGTERM $(jobs -p)
+}
+
run_integration_tests() {
+ trap cleanup ERR
pushd ${TEACLAVE_TEST_INSTALL_DIR}
echo_title "integration tests"
@@ -41,13 +47,18 @@ run_integration_tests() {
cargo test --manifest-path
${TEACLAVE_PROJECT_ROOT}/common/protected_fs_rs/Cargo.toml \
--target-dir ${TEACLAVE_TARGET_DIR}/untrusted
+ echo_title "file_agent tests (untrusted)"
+ cd ${TEACLAVE_TEST_INSTALL_DIR}/
+ python ${TEACLAVE_PROJECT_ROOT}/tests/scripts/simple_http_server.py 6789 &
+ cargo test --manifest-path ${TEACLAVE_PROJECT_ROOT}/file_agent/Cargo.toml \
+ --target-dir ${TEACLAVE_TARGET_DIR}/untrusted
+
popd
+
+ cleanup
}
run_functional_tests() {
- cleanup() {
- [[ -z "$(jobs -p)" ]] || kill -s SIGTERM $(jobs -p)
- }
trap cleanup ERR
pushd ${TEACLAVE_TEST_INSTALL_DIR}
@@ -72,7 +83,7 @@ run_functional_tests() {
popd
# kill all background services
- [[ -z "$(jobs -p)" ]] || kill -s SIGTERM $(jobs -p)
+ cleanup
}
case "$1" in
diff --git a/file_agent/Cargo.toml b/file_agent/Cargo.toml
new file mode 100644
index 0000000..3a750e7
--- /dev/null
+++ b/file_agent/Cargo.toml
@@ -0,0 +1,32 @@
+[package]
+name = "teaclave_file_agent"
+version = "0.1.0"
+authors = ["Teaclave Contributors <[email protected]>"]
+description = "Teaclave file agent for worker"
+license = "Apache-2.0"
+edition = "2018"
+
+[lib]
+name = "teaclave_file_agent"
+crate-type = ["staticlib", "rlib"]
+
+[features]
+default = []
+
+[dependencies]
+log = { version = "0.4.6" }
+anyhow = { version = "1.0.26" }
+serde_json = { version = "1.0.39" }
+serde = { version = "1.0.92", features = ["derive"] }
+thiserror = { version = "1.0.9" }
+itertools = { version = "0.8.0", default-features = false }
+teaclave_types = { path = "../types" }
+teaclave_test_utils = { path = "../tests/utils", optional = true }
+
+url = { version = "2.1.1", features = ["serde"]}
+tokio = { version = "0.2", features = ["rt-core", "rt-threaded",
"fs"] }
+tokio-util = { version = "0.3", features = ["codec"] }
+futures = { version = "0.3" }
+futures-util = { version = "0.3.0", default-features = false }
+reqwest = { version = "0.10", features = ["json", "stream"] }
+http = { version = "0.2" }
\ No newline at end of file
diff --git a/file_agent/src/agent.rs b/file_agent/src/agent.rs
new file mode 100644
index 0000000..7ba78fe
--- /dev/null
+++ b/file_agent/src/agent.rs
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use futures::future::join_all;
+use futures::TryFutureExt;
+use reqwest;
+use serde::{Deserialize, Serialize};
+use std::path::PathBuf;
+use tokio::io::AsyncWriteExt;
+use tokio_util::codec;
+use url::Url;
+
+async fn download_remote_input_to_file(
+ presigned_url: Url,
+ dest: impl AsRef<std::path::Path>,
+) -> anyhow::Result<()> {
+ let mut download = reqwest::get(presigned_url.as_str())
+ .await?
+ .error_for_status()?;
+
+ let mut outfile = tokio::fs::File::create(dest).await?;
+
+ while let Some(chunk) = download.chunk().await? {
+ outfile.write(&chunk).await?;
+ }
+
+ // Must flush tokio::io::BufWriter manually.
+ // It will *not* flush itself automatically when dropped.
+ outfile.flush().await?;
+
+ Ok(())
+}
+
+async fn copy_file(
+ src: impl AsRef<std::path::Path>,
+ dst: impl AsRef<std::path::Path>,
+) -> anyhow::Result<()> {
+ tokio::fs::copy(src, dst).await?;
+ Ok(())
+}
+
+async fn upload_output_file_to_remote(
+ src: impl AsRef<std::path::Path>,
+ presigned_url: Url,
+) -> anyhow::Result<()> {
+ let metadata = std::fs::metadata(&src)?;
+ let file_len = metadata.len();
+
+ let stream = tokio::fs::File::open(src.as_ref().to_path_buf())
+ .map_ok(|file| codec::FramedRead::new(file, codec::BytesCodec::new()))
+ .try_flatten_stream();
+
+ let body = reqwest::Body::wrap_stream(stream);
+
+ let client = reqwest::Client::new();
+ let res = client
+ .put(presigned_url.as_str())
+ .header(reqwest::header::CONTENT_TYPE, "application/x-binary")
+ .header(reqwest::header::CONTENT_LENGTH, file_len.to_string())
+ .body(body)
+ .send()
+ .await?;
+ match res.status() {
+ http::StatusCode::OK => Ok(()),
+ status => anyhow::bail!("{}", status),
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct HandleFileInfo {
+ local: PathBuf,
+ remote: url::Url,
+}
+impl HandleFileInfo {
+ pub fn new(local: PathBuf, remote: url::Url) -> Self {
+ HandleFileInfo { local, remote }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum HandleFileCommand {
+ Download,
+ Upload,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct FileAgentRequest {
+ pub cmd: HandleFileCommand,
+ pub info: Vec<HandleFileInfo>,
+}
+
+impl FileAgentRequest {
+ pub fn new(cmd: HandleFileCommand, info: Vec<HandleFileInfo>) -> Self {
+ FileAgentRequest { cmd, info }
+ }
+}
+
+async fn handle_download(info: HandleFileInfo) -> anyhow::Result<()> {
+ anyhow::ensure!(
+ info.local.exists() == false,
+ "[Download] Dest local file: {:?} already exists.",
+ info.local
+ );
+ let dst = info.local;
+
+ match info.remote.scheme() {
+ "https" | "http" => {
+ download_remote_input_to_file(info.remote, dst).await?;
+ }
+ "file" => {
+ let src = PathBuf::from(info.remote.path());
+ anyhow::ensure!(
+ src.exists(),
+ "[Download] Src local file: {:?} doesn't exist.",
+ src
+ );
+ copy_file(src, dst).await?;
+ }
+ _ => anyhow::bail!("Scheme not supported"),
+ }
+ Ok(())
+}
+
+async fn handle_upload(info: HandleFileInfo) -> anyhow::Result<()> {
+ anyhow::ensure!(
+ info.local.exists(),
+ "[Upload] Src local file: {:?} doesn't exist.",
+ info.local
+ );
+ let src = info.local;
+
+ match info.remote.scheme() {
+ "https" | "http" => {
+ upload_output_file_to_remote(src, info.remote).await?;
+ }
+ "file" => {
+ let dst = PathBuf::from(info.remote.path());
+ anyhow::ensure!(
+ dst.exists(),
+ "[Download] Dest local file: {:?} doesn't exist.",
+ dst
+ );
+ copy_file(src, dst).await?;
+ }
+ _ => anyhow::bail!("Scheme not supported"),
+ }
+ Ok(())
+}
+
+fn handle_file_request(bytes: &[u8]) -> anyhow::Result<()> {
+ let req: FileAgentRequest = serde_json::from_slice(bytes)?;
+ let results = tokio::runtime::Builder::new()
+ .threaded_scheduler()
+ .enable_all()
+ .build()?
+ .block_on(async {
+ match req.cmd {
+ HandleFileCommand::Download => {
+ let futures: Vec<_> = req
+ .info
+ .into_iter()
+ .map(|info| tokio::spawn(async {
handle_download(info).await }))
+ .collect();
+ join_all(futures).await
+ }
+ HandleFileCommand::Upload => {
+ let futures: Vec<_> = req
+ .info
+ .into_iter()
+ .map(|info| tokio::spawn(async {
handle_upload(info).await }))
+ .collect();
+ join_all(futures).await
+ }
+ }
+ });
+
+ let (task_results, errs): (Vec<_>, Vec<_>) =
results.into_iter().partition(Result::is_ok);
+ if errs.len() > 0 {
+ anyhow::bail!("Spawned task join error!");
+ }
+ debug!("{:?}", task_results);
+ anyhow::ensure!(
+ task_results.into_iter().all(|x| x.unwrap().is_ok()),
+ "Some handle file task failed"
+ );
+ Ok(())
+}
+
+#[no_mangle]
+pub extern "C" fn ocall_handle_file_request(in_buf: *const u8, in_len: usize)
-> u32 {
+ let input_buf: &[u8] = unsafe { std::slice::from_raw_parts(in_buf, in_len)
};
+ match handle_file_request(input_buf) {
+ Ok(_) => 0,
+ Err(_) => 1,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::io::Write;
+ use url::Url;
+
+ #[test]
+ fn test_file_url() {
+ let s = "file:///tmp/abc.txt";
+ let url = Url::parse(s).unwrap();
+ assert_eq!(url.scheme(), "file");
+ assert_eq!(url.host(), None);
+ assert_eq!(url.path(), "/tmp/abc.txt");
+ }
+
+ #[test]
+ fn test_get_single_file() {
+ let s = "http://localhost:6789/fixtures/functions/mesapy/input.txt";
+ let url = Url::parse(s).unwrap();
+ let dest = PathBuf::from("/tmp/input_test.txt");
+
+ let info = HandleFileInfo::new(dest.clone(), url);
+ let req = FileAgentRequest::new(HandleFileCommand::Download,
vec![info]);
+
+ let bytes = serde_json::to_vec(&req).unwrap();
+ handle_file_request(&bytes).unwrap();
+
+ std::fs::remove_file(dest).unwrap();
+ }
+
+ #[test]
+ fn test_put_single_file() {
+ let src = PathBuf::from("/tmp/output_test.txt");
+ {
+ let mut file = std::fs::File::create(&src).unwrap();
+ file.write_all(b"Hello Teaclave Results!").unwrap();
+ }
+
+ let s = "http://localhost:6789/fixtures/functions/mesapy/result.txt";
+ let url = Url::parse(s).unwrap();
+
+ let info = HandleFileInfo::new(src.clone(), url);
+ let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info]);
+
+ let bytes = serde_json::to_vec(&req).unwrap();
+ handle_file_request(&bytes).unwrap();
+ }
+}
diff --git a/file_agent/src/lib.rs b/file_agent/src/lib.rs
new file mode 100644
index 0000000..9ff8d60
--- /dev/null
+++ b/file_agent/src/lib.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate log;
+
+mod agent;
+pub use agent::ocall_handle_file_request;
+pub use agent::{FileAgentRequest, HandleFileCommand, HandleFileInfo};
diff --git a/tests/scripts/simple_http_server.py
b/tests/scripts/simple_http_server.py
new file mode 100644
index 0000000..c6f0e86
--- /dev/null
+++ b/tests/scripts/simple_http_server.py
@@ -0,0 +1,17 @@
+import SimpleHTTPServer
+import BaseHTTPServer
+
+class HTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
+ def do_PUT(self):
+ length = int(self.headers["Content-Length"])
+
+ path = self.translate_path(self.path)
+ with open(path, "wb") as dst:
+ dst.write(self.rfile.read(length))
+
+ self.send_response(200)
+ self.end_headers()
+
+
+if __name__ == '__main__':
+ SimpleHTTPServer.test(HandlerClass=HTTPRequestHandler)
\ No newline at end of file
diff --git a/types/src/worker.rs b/types/src/worker.rs
index 351ff09..dc60c02 100644
--- a/types/src/worker.rs
+++ b/types/src/worker.rs
@@ -54,19 +54,6 @@ impl std::fmt::Display for TeaclaveExecutorSelector {
}
}
-#[derive(Debug)]
-pub struct WorkerInputData {
- pub path: std::path::PathBuf,
- pub hash: String,
- pub crypto_info: TeaclaveFileCryptoInfo,
-}
-#[derive(Debug)]
-pub struct WorkerOutputData {
- pub path: std::path::PathBuf,
- pub hash: String,
- pub crypto_info: TeaclaveFileCryptoInfo,
-}
-
#[derive(Clone, Debug)]
pub struct TeaclaveWorkerInputFileInfo {
pub path: std::path::PathBuf,
@@ -157,11 +144,11 @@ pub fn read_all_bytes(path: impl AsRef<std::path::Path>)
-> anyhow::Result<Vec<u
}
pub fn convert_encrypted_input_file(
- src: WorkerInputData,
- dst: &str,
+ path: impl AsRef<std::path::Path>,
+ crypto_info: TeaclaveFileCryptoInfo,
+ dst: impl AsRef<std::path::Path>,
) -> anyhow::Result<TeaclaveWorkerInputFileInfo> {
- let path = src.path;
- let plain_text = match src.crypto_info {
+ let plain_text = match crypto_info {
TeaclaveFileCryptoInfo::AesGcm128(crypto) => {
let mut bytes = read_all_bytes(path)?;
crypto.decrypt(&mut bytes)?;
@@ -173,10 +160,11 @@ pub fn convert_encrypted_input_file(
bytes
}
TeaclaveFileCryptoInfo::TeaclaveFileRootKey128(crypto) => {
- return Ok(TeaclaveWorkerInputFileInfo::new(path, crypto))
+ let path = path.as_ref().to_owned();
+ return Ok(TeaclaveWorkerInputFileInfo::new(path, crypto));
}
};
- TeaclaveWorkerInputFileInfo::create_with_bytes(dst, &plain_text)
+ TeaclaveWorkerInputFileInfo::create_with_bytes(dst.as_ref(), &plain_text)
}
#[derive(Debug)]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]