This is an automated email from the ASF dual-hosted git repository.

mssun pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git


The following commit(s) were added to refs/heads/develop by this push:
     new f7eaf16  [agent] Introduce file agent (#237)
f7eaf16 is described below

commit f7eaf160322db45ae70a71e371ffdbf27b2111c1
Author: Zhaofeng Chen <[email protected]>
AuthorDate: Tue Mar 10 10:17:53 2020 -0700

    [agent] Introduce file agent (#237)
---
 cmake/scripts/test.sh               |  19 ++-
 file_agent/Cargo.toml               |  32 +++++
 file_agent/src/agent.rs             | 259 ++++++++++++++++++++++++++++++++++++
 file_agent/src/lib.rs               |  23 ++++
 tests/scripts/simple_http_server.py |  17 +++
 types/src/worker.rs                 |  26 +---
 6 files changed, 353 insertions(+), 23 deletions(-)

diff --git a/cmake/scripts/test.sh b/cmake/scripts/test.sh
index 0b215e5..6993e42 100755
--- a/cmake/scripts/test.sh
+++ b/cmake/scripts/test.sh
@@ -31,7 +31,13 @@ run_unit_tests() {
   popd
 }
 
+cleanup() {
+  # kill all background services
+  [[ -z "$(jobs -p)" ]] || kill -s SIGTERM $(jobs -p)
+}
+
 run_integration_tests() {
+  trap cleanup ERR
   pushd ${TEACLAVE_TEST_INSTALL_DIR}
 
   echo_title "integration tests"
@@ -41,13 +47,18 @@ run_integration_tests() {
   cargo test --manifest-path 
${TEACLAVE_PROJECT_ROOT}/common/protected_fs_rs/Cargo.toml \
         --target-dir ${TEACLAVE_TARGET_DIR}/untrusted
 
+  echo_title "file_agent tests (untrusted)"
+  cd ${TEACLAVE_TEST_INSTALL_DIR}/
+  python ${TEACLAVE_PROJECT_ROOT}/tests/scripts/simple_http_server.py 6789 &
+  cargo test --manifest-path ${TEACLAVE_PROJECT_ROOT}/file_agent/Cargo.toml \
+        --target-dir ${TEACLAVE_TARGET_DIR}/untrusted
+
   popd
+  
+  cleanup 
 }
 
 run_functional_tests() {
-  cleanup() {
-        [[ -z "$(jobs -p)" ]] || kill -s SIGTERM $(jobs -p)
-  }
   trap cleanup ERR
 
   pushd ${TEACLAVE_TEST_INSTALL_DIR}
@@ -72,7 +83,7 @@ run_functional_tests() {
   popd
 
   # kill all background services
-  [[ -z "$(jobs -p)" ]] || kill -s SIGTERM $(jobs -p)
+  cleanup
 }
 
 case "$1" in
diff --git a/file_agent/Cargo.toml b/file_agent/Cargo.toml
new file mode 100644
index 0000000..3a750e7
--- /dev/null
+++ b/file_agent/Cargo.toml
@@ -0,0 +1,32 @@
+[package]
+name = "teaclave_file_agent"
+version = "0.1.0"
+authors = ["Teaclave Contributors <[email protected]>"]
+description = "Teaclave file agent for worker"
+license = "Apache-2.0"
+edition = "2018"
+
+[lib]
+name = "teaclave_file_agent"
+crate-type = ["staticlib", "rlib"]
+
+[features]
+default = []
+
+[dependencies]
+log           = { version = "0.4.6" }
+anyhow        = { version = "1.0.26" }
+serde_json    = { version = "1.0.39" }
+serde         = { version = "1.0.92", features = ["derive"] }
+thiserror     = { version = "1.0.9" }
+itertools     = { version = "0.8.0", default-features = false }
+teaclave_types = { path = "../types" }
+teaclave_test_utils = { path = "../tests/utils", optional = true }
+
+url             = { version = "2.1.1", features = ["serde"]}
+tokio           = { version = "0.2", features = ["rt-core", "rt-threaded", 
"fs"] }
+tokio-util      = { version = "0.3", features = ["codec"] }
+futures         = { version = "0.3" }
+futures-util    = { version = "0.3.0", default-features = false }
+reqwest         = { version = "0.10", features = ["json", "stream"] }
+http            = { version = "0.2" }
\ No newline at end of file
diff --git a/file_agent/src/agent.rs b/file_agent/src/agent.rs
new file mode 100644
index 0000000..7ba78fe
--- /dev/null
+++ b/file_agent/src/agent.rs
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use futures::future::join_all;
+use futures::TryFutureExt;
+use reqwest;
+use serde::{Deserialize, Serialize};
+use std::path::PathBuf;
+use tokio::io::AsyncWriteExt;
+use tokio_util::codec;
+use url::Url;
+
+async fn download_remote_input_to_file(
+    presigned_url: Url,
+    dest: impl AsRef<std::path::Path>,
+) -> anyhow::Result<()> {
+    let mut download = reqwest::get(presigned_url.as_str())
+        .await?
+        .error_for_status()?;
+
+    let mut outfile = tokio::fs::File::create(dest).await?;
+
+    while let Some(chunk) = download.chunk().await? {
+        outfile.write(&chunk).await?;
+    }
+
+    // Must flush tokio::io::BufWriter manually.
+    // It will *not* flush itself automatically when dropped.
+    outfile.flush().await?;
+
+    Ok(())
+}
+
+async fn copy_file(
+    src: impl AsRef<std::path::Path>,
+    dst: impl AsRef<std::path::Path>,
+) -> anyhow::Result<()> {
+    tokio::fs::copy(src, dst).await?;
+    Ok(())
+}
+
+async fn upload_output_file_to_remote(
+    src: impl AsRef<std::path::Path>,
+    presigned_url: Url,
+) -> anyhow::Result<()> {
+    let metadata = std::fs::metadata(&src)?;
+    let file_len = metadata.len();
+
+    let stream = tokio::fs::File::open(src.as_ref().to_path_buf())
+        .map_ok(|file| codec::FramedRead::new(file, codec::BytesCodec::new()))
+        .try_flatten_stream();
+
+    let body = reqwest::Body::wrap_stream(stream);
+
+    let client = reqwest::Client::new();
+    let res = client
+        .put(presigned_url.as_str())
+        .header(reqwest::header::CONTENT_TYPE, "application/x-binary")
+        .header(reqwest::header::CONTENT_LENGTH, file_len.to_string())
+        .body(body)
+        .send()
+        .await?;
+    match res.status() {
+        http::StatusCode::OK => Ok(()),
+        status => anyhow::bail!("{}", status),
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct HandleFileInfo {
+    local: PathBuf,
+    remote: url::Url,
+}
+impl HandleFileInfo {
+    pub fn new(local: PathBuf, remote: url::Url) -> Self {
+        HandleFileInfo { local, remote }
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum HandleFileCommand {
+    Download,
+    Upload,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct FileAgentRequest {
+    pub cmd: HandleFileCommand,
+    pub info: Vec<HandleFileInfo>,
+}
+
+impl FileAgentRequest {
+    pub fn new(cmd: HandleFileCommand, info: Vec<HandleFileInfo>) -> Self {
+        FileAgentRequest { cmd, info }
+    }
+}
+
+async fn handle_download(info: HandleFileInfo) -> anyhow::Result<()> {
+    anyhow::ensure!(
+        info.local.exists() == false,
+        "[Download] Dest local file: {:?} already exists.",
+        info.local
+    );
+    let dst = info.local;
+
+    match info.remote.scheme() {
+        "https" | "http" => {
+            download_remote_input_to_file(info.remote, dst).await?;
+        }
+        "file" => {
+            let src = PathBuf::from(info.remote.path());
+            anyhow::ensure!(
+                src.exists(),
+                "[Download] Src local file: {:?} doesn't exist.",
+                src
+            );
+            copy_file(src, dst).await?;
+        }
+        _ => anyhow::bail!("Scheme not supported"),
+    }
+    Ok(())
+}
+
+async fn handle_upload(info: HandleFileInfo) -> anyhow::Result<()> {
+    anyhow::ensure!(
+        info.local.exists(),
+        "[Upload] Src local file: {:?} doesn't exist.",
+        info.local
+    );
+    let src = info.local;
+
+    match info.remote.scheme() {
+        "https" | "http" => {
+            upload_output_file_to_remote(src, info.remote).await?;
+        }
+        "file" => {
+            let dst = PathBuf::from(info.remote.path());
+            anyhow::ensure!(
+                dst.exists(),
+                "[Download] Dest local file: {:?} doesn't exist.",
+                dst
+            );
+            copy_file(src, dst).await?;
+        }
+        _ => anyhow::bail!("Scheme not supported"),
+    }
+    Ok(())
+}
+
+fn handle_file_request(bytes: &[u8]) -> anyhow::Result<()> {
+    let req: FileAgentRequest = serde_json::from_slice(bytes)?;
+    let results = tokio::runtime::Builder::new()
+        .threaded_scheduler()
+        .enable_all()
+        .build()?
+        .block_on(async {
+            match req.cmd {
+                HandleFileCommand::Download => {
+                    let futures: Vec<_> = req
+                        .info
+                        .into_iter()
+                        .map(|info| tokio::spawn(async { 
handle_download(info).await }))
+                        .collect();
+                    join_all(futures).await
+                }
+                HandleFileCommand::Upload => {
+                    let futures: Vec<_> = req
+                        .info
+                        .into_iter()
+                        .map(|info| tokio::spawn(async { 
handle_upload(info).await }))
+                        .collect();
+                    join_all(futures).await
+                }
+            }
+        });
+
+    let (task_results, errs): (Vec<_>, Vec<_>) = 
results.into_iter().partition(Result::is_ok);
+    if errs.len() > 0 {
+        anyhow::bail!("Spawned task join error!");
+    }
+    debug!("{:?}", task_results);
+    anyhow::ensure!(
+        task_results.into_iter().all(|x| x.unwrap().is_ok()),
+        "Some handle file task failed"
+    );
+    Ok(())
+}
+
+#[no_mangle]
+pub extern "C" fn ocall_handle_file_request(in_buf: *const u8, in_len: usize) 
-> u32 {
+    let input_buf: &[u8] = unsafe { std::slice::from_raw_parts(in_buf, in_len) 
};
+    match handle_file_request(input_buf) {
+        Ok(_) => 0,
+        Err(_) => 1,
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::io::Write;
+    use url::Url;
+
+    #[test]
+    fn test_file_url() {
+        let s = "file:///tmp/abc.txt";
+        let url = Url::parse(s).unwrap();
+        assert_eq!(url.scheme(), "file");
+        assert_eq!(url.host(), None);
+        assert_eq!(url.path(), "/tmp/abc.txt");
+    }
+
+    #[test]
+    fn test_get_single_file() {
+        let s = "http://localhost:6789/fixtures/functions/mesapy/input.txt";;
+        let url = Url::parse(s).unwrap();
+        let dest = PathBuf::from("/tmp/input_test.txt");
+
+        let info = HandleFileInfo::new(dest.clone(), url);
+        let req = FileAgentRequest::new(HandleFileCommand::Download, 
vec![info]);
+
+        let bytes = serde_json::to_vec(&req).unwrap();
+        handle_file_request(&bytes).unwrap();
+
+        std::fs::remove_file(dest).unwrap();
+    }
+
+    #[test]
+    fn test_put_single_file() {
+        let src = PathBuf::from("/tmp/output_test.txt");
+        {
+            let mut file = std::fs::File::create(&src).unwrap();
+            file.write_all(b"Hello Teaclave Results!").unwrap();
+        }
+
+        let s = "http://localhost:6789/fixtures/functions/mesapy/result.txt";;
+        let url = Url::parse(s).unwrap();
+
+        let info = HandleFileInfo::new(src.clone(), url);
+        let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info]);
+
+        let bytes = serde_json::to_vec(&req).unwrap();
+        handle_file_request(&bytes).unwrap();
+    }
+}
diff --git a/file_agent/src/lib.rs b/file_agent/src/lib.rs
new file mode 100644
index 0000000..9ff8d60
--- /dev/null
+++ b/file_agent/src/lib.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate log;
+
+mod agent;
+pub use agent::ocall_handle_file_request;
+pub use agent::{FileAgentRequest, HandleFileCommand, HandleFileInfo};
diff --git a/tests/scripts/simple_http_server.py 
b/tests/scripts/simple_http_server.py
new file mode 100644
index 0000000..c6f0e86
--- /dev/null
+++ b/tests/scripts/simple_http_server.py
@@ -0,0 +1,17 @@
+import SimpleHTTPServer
+import BaseHTTPServer
+
+class HTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
+    def do_PUT(self):
+        length = int(self.headers["Content-Length"])
+
+        path = self.translate_path(self.path)
+        with open(path, "wb") as dst:
+            dst.write(self.rfile.read(length))
+
+        self.send_response(200)
+        self.end_headers()
+
+
+if __name__ == '__main__':
+    SimpleHTTPServer.test(HandlerClass=HTTPRequestHandler)
\ No newline at end of file
diff --git a/types/src/worker.rs b/types/src/worker.rs
index 351ff09..dc60c02 100644
--- a/types/src/worker.rs
+++ b/types/src/worker.rs
@@ -54,19 +54,6 @@ impl std::fmt::Display for TeaclaveExecutorSelector {
     }
 }
 
-#[derive(Debug)]
-pub struct WorkerInputData {
-    pub path: std::path::PathBuf,
-    pub hash: String,
-    pub crypto_info: TeaclaveFileCryptoInfo,
-}
-#[derive(Debug)]
-pub struct WorkerOutputData {
-    pub path: std::path::PathBuf,
-    pub hash: String,
-    pub crypto_info: TeaclaveFileCryptoInfo,
-}
-
 #[derive(Clone, Debug)]
 pub struct TeaclaveWorkerInputFileInfo {
     pub path: std::path::PathBuf,
@@ -157,11 +144,11 @@ pub fn read_all_bytes(path: impl AsRef<std::path::Path>) 
-> anyhow::Result<Vec<u
 }
 
 pub fn convert_encrypted_input_file(
-    src: WorkerInputData,
-    dst: &str,
+    path: impl AsRef<std::path::Path>,
+    crypto_info: TeaclaveFileCryptoInfo,
+    dst: impl AsRef<std::path::Path>,
 ) -> anyhow::Result<TeaclaveWorkerInputFileInfo> {
-    let path = src.path;
-    let plain_text = match src.crypto_info {
+    let plain_text = match crypto_info {
         TeaclaveFileCryptoInfo::AesGcm128(crypto) => {
             let mut bytes = read_all_bytes(path)?;
             crypto.decrypt(&mut bytes)?;
@@ -173,10 +160,11 @@ pub fn convert_encrypted_input_file(
             bytes
         }
         TeaclaveFileCryptoInfo::TeaclaveFileRootKey128(crypto) => {
-            return Ok(TeaclaveWorkerInputFileInfo::new(path, crypto))
+            let path = path.as_ref().to_owned();
+            return Ok(TeaclaveWorkerInputFileInfo::new(path, crypto));
         }
     };
-    TeaclaveWorkerInputFileInfo::create_with_bytes(dst, &plain_text)
+    TeaclaveWorkerInputFileInfo::create_with_bytes(dst.as_ref(), &plain_text)
 }
 
 #[derive(Debug)]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to