This is an automated email from the ASF dual-hosted git repository. hsun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
commit 388f8ffbe00e68c12c47f81e79b4bb91cc7551ab Author: sunhe05 <[email protected]> AuthorDate: Mon Jun 5 12:14:18 2023 +0000 [frontend] Send audit information to the management service --- cmake/tomls/Cargo.sgx_trusted_lib.lock | 1 - examples/rust/builtin_echo/Cargo.lock | 2 + .../rust/builtin_ordered_set_intersect/Cargo.lock | 2 + .../rust/builtin_ordered_set_intersect/src/main.rs | 17 +++ sdk/c/teaclave_client_sdk.h | 50 +++++++++ sdk/python/teaclave.py | 20 ++++ sdk/rust/Cargo.lock | 2 + sdk/rust/src/bindings.rs | 102 +++++++++++++++-- sdk/rust/src/lib.rs | 30 ++++- services/frontend/enclave/src/audit.rs | 61 +++++++++++ services/frontend/enclave/src/lib.rs | 43 ++++++-- services/frontend/enclave/src/service.rs | 121 +++++++++++++++------ services/management/enclave/src/audit/auditor.rs | 26 +++-- .../management/enclave/src/audit/db_directory.rs | 11 +- services/management/enclave/src/error.rs | 1 + services/management/enclave/src/lib.rs | 4 +- services/management/enclave/src/service.rs | 54 +++++++-- .../src/proto/teaclave_frontend_service.proto | 10 ++ .../src/proto/teaclave_management_service.proto | 1 + services/proto/src/teaclave_common.rs | 8 +- services/proto/src/teaclave_frontend_service.rs | 22 +++- services/proto/src/teaclave_management_service.rs | 14 +++ tests/functional/enclave/src/frontend_service.rs | 60 ++++++++++ types/src/audit.rs | 16 +-- types/src/user.rs | 6 + 25 files changed, 586 insertions(+), 98 deletions(-) diff --git a/cmake/tomls/Cargo.sgx_trusted_lib.lock b/cmake/tomls/Cargo.sgx_trusted_lib.lock index 5ad9c349..485ce7aa 100644 --- a/cmake/tomls/Cargo.sgx_trusted_lib.lock +++ b/cmake/tomls/Cargo.sgx_trusted_lib.lock @@ -2567,7 +2567,6 @@ dependencies = [ "cfg-if 0.1.10", "chrono", "log", - "once_cell", "rand", "ring", "serde", diff --git a/examples/rust/builtin_echo/Cargo.lock b/examples/rust/builtin_echo/Cargo.lock index 38b4db0a..8c3230f7 100644 --- a/examples/rust/builtin_echo/Cargo.lock +++ b/examples/rust/builtin_echo/Cargo.lock @@ -1042,6 +1042,7 @@ name = "teaclave_proto" version = "0.5.1" dependencies = [ "anyhow", + "chrono", "prost", "serde", "serde_json", @@ -1072,6 +1073,7 @@ name = "teaclave_types" version = "0.5.1" dependencies = [ "anyhow", + "chrono", "hex", "log", "rand", diff --git a/examples/rust/builtin_ordered_set_intersect/Cargo.lock b/examples/rust/builtin_ordered_set_intersect/Cargo.lock index 14447ddd..8ee7759f 100644 --- a/examples/rust/builtin_ordered_set_intersect/Cargo.lock +++ b/examples/rust/builtin_ordered_set_intersect/Cargo.lock @@ -1042,6 +1042,7 @@ name = "teaclave_proto" version = "0.5.1" dependencies = [ "anyhow", + "chrono", "prost", "serde", "serde_json", @@ -1072,6 +1073,7 @@ name = "teaclave_types" version = "0.5.1" dependencies = [ "anyhow", + "chrono", "hex", "log", "rand", diff --git a/examples/rust/builtin_ordered_set_intersect/src/main.rs b/examples/rust/builtin_ordered_set_intersect/src/main.rs index 55d0e024..b09d0563 100644 --- a/examples/rust/builtin_ordered_set_intersect/src/main.rs +++ b/examples/rust/builtin_ordered_set_intersect/src/main.rs @@ -202,6 +202,14 @@ impl Client { let response = self.client.get_task_result(&task_id)?; Ok(response) } + + fn query_audit_logs( + &mut self, + message: &str, + limit: usize, + ) -> Result<Vec<teaclave_client_sdk::Entry>> { + self.client.query_audit_logs(message.to_owned(), limit) + } } fn main() -> Result<()> { @@ -278,6 +286,15 @@ fn main() -> Result<()> { result_user1.1 ); + // Wait for the logs to be sent to the auditor + std::thread::sleep(std::time::Duration::from_secs(35)); + + println!("[+] getting audit logs"); + let logs = user1.query_audit_logs("*", 1000)?; + for log in logs { + println!("{:?}", log); + } + println!("[+] done"); Ok(()) } diff --git a/sdk/c/teaclave_client_sdk.h b/sdk/c/teaclave_client_sdk.h index cc54549d..4720193a 100644 --- a/sdk/c/teaclave_client_sdk.h +++ b/sdk/c/teaclave_client_sdk.h @@ -34,6 +34,14 @@ typedef struct AuthenticationClient AuthenticationClient; typedef struct FrontendClient FrontendClient; +typedef struct c_entry { + int64_t microsecond; + uint8_t ip[16]; + void *user; + void *message; + bool result; +} c_entry; + /** * Connect to Teaclave Authentication Service. * @@ -203,6 +211,22 @@ int teaclave_get_task_result(struct FrontendClient *client, char *task_result, size_t *task_result_len); +/** + * Query audit logs according to `query`. `query` is the query statement for tantivy. The result + * will be saved in the `log_buffer` buffer with the corresponding `log_len` argument set. + * Remember to free the user and message inside c_entry to avoid memory leak. + * + * The function returns 0 for success. On error, the function returns 1. + * + * # Safety + * + * Inconsistent length of allocated buffer may caused overflow. + */ +int teaclave_query_audit_logs(struct FrontendClient *client, + const char *query, + struct c_entry *log_buffer, + size_t *log_len); + /** * Send JSON serialized request to the service with the `client` and * get the serialized response. @@ -540,3 +564,29 @@ int teaclave_get_task_serialized(struct FrontendClient *client, const char *serialized_request, char *serialized_response, size_t *serialized_response_len); + +/** + * Send JSON serialized request to the service with the `client` and + * get the serialized response. + * + * # Arguments + * + * * `client`: service client. + * * `serialized_request`; JSON serialized request + * * `serialized_response`: buffer to store the JSON serialized response. + * * `serialized_response_len`: length of the allocated + * `serialized_response`, will be set as the length of + * `serialized_response` when return successfully. + * + * # Return + * + * The function returns 0 for success. On error, the function returns 1. + * + * # Safety + * + * Inconsistent length of allocated buffer may caused overflow. + */ +int teaclave_query_audit_logs_serialized(struct FrontendClient *client, + const char *serialized_request, + char *serialized_response, + size_t *serialized_response_len); diff --git a/sdk/python/teaclave.py b/sdk/python/teaclave.py index 43032641..d1ca3841 100644 --- a/sdk/python/teaclave.py +++ b/sdk/python/teaclave.py @@ -769,6 +769,13 @@ class GetTaskRequest(Request): self.message = fe.GetTaskRequest(task_id=task_id) +class QueryAuditLogsRequest(Request): + + def __init__(self, metadata: Metadata, message: str, limit: int): + super().__init__("QueryAuditLogs", fe.QueryAuditLogsResponse, metadata) + self.message = fe.QueryAuditLogsReqeust(message=message, limit=limit) + + class FrontendService(TeaclaveService): """Establish trusted channel with the frontend service and provide clients to send request through RPC. @@ -1044,3 +1051,16 @@ class FrontendService(TeaclaveService): preserving_proto_field_name=True, use_integers_for_enums=True) return base64.b64decode(response["result"]["Ok"]["tags_map"][tag]) + + def query_audit_logs(self, message: str, limit: int): + self.check_metadata() + self.check_channel() + request = QueryAuditLogsRequest(self.metadata, message, limit) + try: + response = self.call_method(request) + return MessageToDict(response, + preserving_proto_field_name=True, + use_integers_for_enums=True) + except Exception as e: + reason = str(e) + raise TeaclaveException(f"Failed to get audit logs ({reason})") diff --git a/sdk/rust/Cargo.lock b/sdk/rust/Cargo.lock index be4d6899..56491508 100644 --- a/sdk/rust/Cargo.lock +++ b/sdk/rust/Cargo.lock @@ -1033,6 +1033,7 @@ name = "teaclave_proto" version = "0.5.1" dependencies = [ "anyhow", + "chrono", "prost", "serde", "serde_json", @@ -1063,6 +1064,7 @@ name = "teaclave_types" version = "0.5.1" dependencies = [ "anyhow", + "chrono", "hex", "log", "rand", diff --git a/sdk/rust/src/bindings.rs b/sdk/rust/src/bindings.rs index f48b8c71..1e3664c8 100644 --- a/sdk/rust/src/bindings.rs +++ b/sdk/rust/src/bindings.rs @@ -15,16 +15,14 @@ // specific language governing permissions and limitations // under the License. -use libc::size_t; -use std::ffi::CStr; -use std::ffi::CString; -use std::fs; -use std::os::raw::c_char; -use std::os::raw::c_int; -use std::ptr; +use libc::{c_char, c_int, c_void, malloc, size_t}; + +use std::ffi::{CStr, CString}; +use std::{fs, ptr}; use crate::{ - AuthenticationClient, AuthenticationService, EnclaveInfo, FrontendClient, FrontendService, + AuthenticationClient, AuthenticationService, EnclaveInfo, Entry, FrontendClient, + FrontendService, }; macro_rules! unwrap_or_return_null { @@ -384,6 +382,89 @@ pub unsafe extern "C" fn teaclave_get_task_result( } } +/// Query audit logs according to `query`. `query` is the query statement for tantivy. The result +/// will be saved in the `log_buffer` buffer with the corresponding `log_len` argument set. +/// Remember to free the user and message inside c_entry to avoid memory leak. +/// +/// The function returns 0 for success. On error, the function returns 1. +/// +/// # Safety +/// +/// Inconsistent length of allocated buffer may caused overflow. +#[no_mangle] +pub unsafe extern "C" fn teaclave_query_audit_logs( + client: &mut FrontendClient, + query: *const c_char, + log_buffer: *mut c_entry, + log_len: *mut size_t, +) -> c_int { + if (client as *mut FrontendClient).is_null() + || query.is_null() + || log_buffer.is_null() + || log_len.is_null() + || *log_len == 0 + { + return 1; + } + + let query = CStr::from_ptr(query).to_string_lossy().into_owned(); + match client.query_audit_logs(query, *log_len) { + Ok(audit_logs) => { + let c_logs: Vec<_> = audit_logs.into_iter().map(c_entry::from).collect(); + let src_logs = c_logs.as_ptr(); + let len = c_logs.len(); + if len > *log_len { + return 1; + } + + unsafe { + ptr::copy_nonoverlapping(src_logs, log_buffer, len); + } + + *log_len = len; + 0 + } + Err(_) => 1, + } +} + +#[repr(C)] +pub struct c_entry { + microsecond: i64, + ip: [u8; 16], + user: *mut c_void, + message: *mut c_void, + result: bool, +} + +impl From<Entry> for c_entry { + fn from(entry: Entry) -> Self { + let user_bytes = CString::new(entry.user()).unwrap().into_bytes_with_nul(); + let len = user_bytes.len(); + let user = unsafe { + let user = malloc(len); + ptr::copy_nonoverlapping(user_bytes.as_ptr(), user as *mut u8, len); + user + }; + + let message_bytes = CString::new(entry.message()).unwrap().into_bytes_with_nul(); + let len = message_bytes.len(); + let message = unsafe { + let message = malloc(len); + ptr::copy_nonoverlapping(message_bytes.as_ptr(), message as *mut u8, len); + message + }; + + Self { + microsecond: entry.datetime().timestamp_micros(), + ip: entry.ip().octets(), + user, + message, + result: entry.result(), + } + } +} + macro_rules! generate_function_serialized { ( $client_type:ident, $c_function_name:ident, $rust_function_name:ident) => { /// Send JSON serialized request to the service with the `client` and @@ -505,3 +586,8 @@ generate_function_serialized!( teaclave_get_task_serialized, get_task_serialized ); +generate_function_serialized!( + FrontendClient, + teaclave_query_audit_logs_serialized, + query_audit_logs_serialized +); diff --git a/sdk/rust/src/lib.rs b/sdk/rust/src/lib.rs index d38ec933..0eb466cd 100644 --- a/sdk/rust/src/lib.rs +++ b/sdk/rust/src/lib.rs @@ -35,12 +35,12 @@ pub use teaclave_proto::teaclave_frontend_service::{ ApproveTaskRequest, AssignDataRequest, CancelTaskRequest, CreateTaskRequest, CreateTaskResponse, GetFunctionRequest, GetFunctionResponse, GetFunctionUsageStatsRequest, GetFunctionUsageStatsResponse, GetTaskRequest, GetTaskResponse, InvokeTaskRequest, - RegisterFunctionRequest, RegisterFunctionRequestBuilder, RegisterFunctionResponse, - RegisterInputFileRequest, RegisterInputFileResponse, RegisterOutputFileRequest, - RegisterOutputFileResponse, + QueryAuditLogsRequest, QueryAuditLogsResponse, RegisterFunctionRequest, + RegisterFunctionRequestBuilder, RegisterFunctionResponse, RegisterInputFileRequest, + RegisterInputFileResponse, RegisterOutputFileRequest, RegisterOutputFileResponse, }; pub use teaclave_types::{ - EnclaveInfo, Executor, FileCrypto, FunctionArgument, FunctionInput, FunctionOutput, + EnclaveInfo, Entry, Executor, FileCrypto, FunctionArgument, FunctionInput, FunctionOutput, FunctionUsage, TaskResult, }; @@ -550,6 +550,28 @@ impl FrontendClient { let request = CancelTaskRequest::new(task_id.try_into()?); self.cancel_task_with_request(request) } + + pub fn query_audit_logs(&mut self, query: String, limit: usize) -> Result<Vec<Entry>> { + let request = QueryAuditLogsRequest::new(query, limit); + let response = self.query_audit_logs_with_request(request)?; + + response.logs.into_iter().map(Entry::try_from).collect() + } + + pub fn query_audit_logs_serialized(&mut self, serialized_request: &str) -> Result<String> { + let request = serde_json::from_str(serialized_request)?; + let response = self.query_audit_logs_with_request(request)?; + let serialized_response = serde_json::to_string(&response)?; + + Ok(serialized_response) + } + + pub fn query_audit_logs_with_request( + &mut self, + request: QueryAuditLogsRequest, + ) -> Result<QueryAuditLogsResponse> { + do_request_with_credential!(self, query_audit_logs, request) + } } #[cfg(test)] diff --git a/services/frontend/enclave/src/audit.rs b/services/frontend/enclave/src/audit.rs new file mode 100644 index 00000000..28e52d8a --- /dev/null +++ b/services/frontend/enclave/src/audit.rs @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use tokio::sync::Mutex; +use tokio::time::{sleep, Duration}; + +use std::sync::Arc; + +use teaclave_proto::teaclave_management_service::{SaveLogsRequest, TeaclaveManagementClient}; +use teaclave_rpc::transport::Channel; +use teaclave_types::Entry; + +/// Agent to send audit information to the auditor in the management service. +/// To reduce the network activity, buffer and then send the information every 30 seconds. +pub struct AuditAgent { + management_client: Arc<Mutex<TeaclaveManagementClient<Channel>>>, + buffer: Arc<Mutex<Vec<Entry>>>, +} + +impl AuditAgent { + pub fn new( + management_client: Arc<Mutex<TeaclaveManagementClient<Channel>>>, + buffer: Arc<Mutex<Vec<Entry>>>, + ) -> Self { + Self { + management_client, + buffer, + } + } + + pub async fn run(&self) { + loop { + let mut mutex = self.buffer.lock().await; + let logs: Vec<Entry> = mutex.drain(..).collect(); + drop(mutex); + + if !logs.is_empty() { + let request = SaveLogsRequest::new(logs); + + let mut client = self.management_client.lock().await; + let _ = client.save_logs(request).await; + } + + sleep(Duration::from_secs(30)).await; + } + } +} diff --git a/services/frontend/enclave/src/lib.rs b/services/frontend/enclave/src/lib.rs index 67f66806..78a63b7f 100644 --- a/services/frontend/enclave/src/lib.rs +++ b/services/frontend/enclave/src/lib.rs @@ -19,6 +19,9 @@ extern crate log; extern crate sgx_types; use anyhow::{anyhow, Result}; +use tokio::sync::Mutex; + +use std::sync::Arc; use teaclave_attestation::verifier; use teaclave_attestation::{AttestationConfig, RemoteAttestation}; @@ -29,13 +32,16 @@ use teaclave_binder::proto::{ use teaclave_binder::{handle_ecall, register_ecall_handler}; use teaclave_config::build::AS_ROOT_CA_CERT; use teaclave_config::RuntimeConfig; +use teaclave_proto::teaclave_authentication_service::TeaclaveAuthenticationInternalClient; use teaclave_proto::teaclave_frontend_service::TeaclaveFrontendServer; +use teaclave_proto::teaclave_management_service::TeaclaveManagementClient; use teaclave_rpc::{config::SgxTrustedTlsServerConfig, transport::Server}; use teaclave_service_enclave_utils::{ create_trusted_authentication_endpoint, create_trusted_management_endpoint, ServiceEnclave, }; use teaclave_types::{TeeServiceError, TeeServiceResult}; +mod audit; mod error; mod service; @@ -67,7 +73,15 @@ async fn start_service(config: &RuntimeConfig) -> Result<()> { attested_tls_config.clone(), )?; - info!(" Starting FrontEnd: setup authentication endpoint finished ..."); + let authentication_channel = authentication_service_endpoint + .connect() + .await + .map_err(|e| anyhow!("Failed to connect to authentication service, retry {:?}", e))?; + let authentication_client = Arc::new(Mutex::new(TeaclaveAuthenticationInternalClient::new( + authentication_channel, + ))); + + info!(" Starting FrontEnd: setup authentication client finished ..."); let management_service_endpoint = create_trusted_management_endpoint( &config.internal_endpoints.management.advertised_address, @@ -77,13 +91,25 @@ async fn start_service(config: &RuntimeConfig) -> Result<()> { attested_tls_config, )?; - info!(" Starting FrontEnd: setup management endpoint finished ..."); + let management_channel = management_service_endpoint + .connect() + .await + .map_err(|e| anyhow!("Failed to connect to management service, {:?}", e))?; + let management_client = Arc::new(Mutex::new(TeaclaveManagementClient::new( + management_channel, + ))); + + info!(" Starting FrontEnd: setup management client finished ..."); - let service = service::TeaclaveFrontendService::new( - authentication_service_endpoint, - management_service_endpoint, - ) - .await?; + let log_buffer = Arc::new(Mutex::new(Vec::new())); + let audit_agent = audit::AuditAgent::new(management_client.clone(), log_buffer.clone()); + let agent_handle = tokio::spawn(async move { + audit_agent.run().await; + }); + + let service = + service::TeaclaveFrontendService::new(authentication_client, management_client, log_buffer) + .await?; info!(" Starting FrontEnd: start listening ..."); Server::builder() @@ -92,6 +118,9 @@ async fn start_service(config: &RuntimeConfig) -> Result<()> { .add_service(TeaclaveFrontendServer::new(service)) .serve(listen_address) .await?; + + agent_handle.await?; + Ok(()) } diff --git a/services/frontend/enclave/src/service.rs b/services/frontend/enclave/src/service.rs index 31a2d317..288c3982 100644 --- a/services/frontend/enclave/src/service.rs +++ b/services/frontend/enclave/src/service.rs @@ -18,7 +18,8 @@ use crate::error::AuthenticationError; use crate::error::FrontendServiceError; -use anyhow::{anyhow, Result}; +use anyhow::Result; +use std::net::{IpAddr, Ipv6Addr}; use std::sync::Arc; use teaclave_proto::teaclave_authentication_service::{ TeaclaveAuthenticationInternalClient, UserAuthenticateRequest, @@ -30,28 +31,34 @@ use teaclave_proto::teaclave_frontend_service::{ GetFunctionResponse, GetFunctionUsageStatsRequest, GetFunctionUsageStatsResponse, GetInputFileRequest, GetInputFileResponse, GetOutputFileRequest, GetOutputFileResponse, GetTaskRequest, GetTaskResponse, InvokeTaskRequest, ListFunctionsRequest, - ListFunctionsResponse, RegisterFunctionRequest, RegisterFunctionResponse, - RegisterFusionOutputRequest, RegisterFusionOutputResponse, RegisterInputFileRequest, - RegisterInputFileResponse, RegisterInputFromOutputRequest, RegisterInputFromOutputResponse, - RegisterOutputFileRequest, RegisterOutputFileResponse, TeaclaveFrontend, UpdateFunctionRequest, - UpdateFunctionResponse, UpdateInputFileRequest, UpdateInputFileResponse, - UpdateOutputFileRequest, UpdateOutputFileResponse, + ListFunctionsResponse, QueryAuditLogsRequest, QueryAuditLogsResponse, RegisterFunctionRequest, + RegisterFunctionResponse, RegisterFusionOutputRequest, RegisterFusionOutputResponse, + RegisterInputFileRequest, RegisterInputFileResponse, RegisterInputFromOutputRequest, + RegisterInputFromOutputResponse, RegisterOutputFileRequest, RegisterOutputFileResponse, + TeaclaveFrontend, UpdateFunctionRequest, UpdateFunctionResponse, UpdateInputFileRequest, + UpdateInputFileResponse, UpdateOutputFileRequest, UpdateOutputFileResponse, }; use teaclave_proto::teaclave_management_service::TeaclaveManagementClient; -use teaclave_rpc::transport::{channel::Endpoint, Channel}; +use teaclave_rpc::transport::Channel; use teaclave_rpc::Request; use teaclave_service_enclave_utils::bail; -use teaclave_types::{TeaclaveServiceResponseResult, UserAuthClaims, UserRole}; +use teaclave_types::{ + Entry, EntryBuilder, TeaclaveServiceResponseResult, UserAuthClaims, UserRole, +}; use tokio::sync::Mutex; -#[derive(Clone)] -pub(crate) struct TeaclaveFrontendService { - authentication_client: Arc<Mutex<TeaclaveAuthenticationInternalClient<Channel>>>, - management_client: Arc<Mutex<TeaclaveManagementClient<Channel>>>, -} - macro_rules! authentication_and_forward_to_management { ($service: ident, $request: ident, $func: ident, $endpoint: expr) => {{ + let function_name = stringify!($func).to_owned(); + let ip_option = $request.remote_addr().map(|s| s.ip()); + let ip = match ip_option { + Some(IpAddr::V4(ip_v4)) => ip_v4.to_ipv6_compatible(), + Some(IpAddr::V6(ip_v6)) => ip_v6, + None => Ipv6Addr::UNSPECIFIED, + }; + + let builder = EntryBuilder::new().ip(ip); + let claims = match $service.authenticate(&$request).await { Ok(claims) => { if authorize(&claims, $endpoint) { @@ -62,6 +69,13 @@ macro_rules! authentication_and_forward_to_management { stringify!($endpoint), stringify!($func) ); + + let entry = builder + .message(String::from("authenticate to ") + &function_name) + .result(false) + .build(); + $service.push_log(entry).await; + bail!(FrontendServiceError::PermissionDenied); } } @@ -71,10 +85,22 @@ macro_rules! authentication_and_forward_to_management { stringify!($endpoint), stringify!($func) ); + + let entry = builder + .message( + String::from("authenticate to ") + &function_name + ": " + &e.to_string(), + ) + .result(false) + .build(); + $service.push_log(entry).await; + bail!(e); } }; + let user = claims.to_string(); + let builder = builder.user(user); + let client = $service.management_client.clone(); let mut client = client.lock().await; let meta = $request.metadata().clone(); @@ -85,11 +111,26 @@ macro_rules! authentication_and_forward_to_management { *metadata = meta; metadata.insert("role", claims.role.parse().unwrap()); - let response = client.$func(request).await?; + let response = match client.$func(request).await { + Err(e) => { + let entry = builder + .clone() + .message(function_name.clone() + ":" + &e.to_string()) + .result(false) + .build(); + $service.push_log(entry).await; + return Err(e); + } + Ok(r) => r, + }; + + let entry = builder.message(function_name).result(true).build(); + $service.push_log(entry).await; Ok(response) }}; } +// TODO: remove this structure as it is the same with RPC interface enum Endpoints { RegisterInputFile, RegisterOutputFile, @@ -112,6 +153,7 @@ enum Endpoints { ApproveTask, InvokeTask, CancelTask, + QueryAuditLogs, } fn authorize(claims: &UserAuthClaims, request: Endpoints) -> bool { @@ -146,35 +188,34 @@ fn authorize(claims: &UserAuthClaims, request: Endpoints) -> bool { Endpoints::GetFunction | Endpoints::ListFunctions | Endpoints::GetFunctionUsageStats => { role.is_function_owner() || role.is_data_owner() } + Endpoints::QueryAuditLogs => false, } } +#[derive(Clone)] +pub(crate) struct TeaclaveFrontendService { + authentication_client: Arc<Mutex<TeaclaveAuthenticationInternalClient<Channel>>>, + management_client: Arc<Mutex<TeaclaveManagementClient<Channel>>>, + audit_log_buffer: Arc<Mutex<Vec<Entry>>>, +} + impl TeaclaveFrontendService { pub(crate) async fn new( - authentication_service_endpoint: Endpoint, - management_service_endpoint: Endpoint, + authentication_client: Arc<Mutex<TeaclaveAuthenticationInternalClient<Channel>>>, + management_client: Arc<Mutex<TeaclaveManagementClient<Channel>>>, + audit_log_buffer: Arc<Mutex<Vec<Entry>>>, ) -> Result<Self> { - let authentication_channel = authentication_service_endpoint - .connect() - .await - .map_err(|e| anyhow!("Failed to connect to authentication service, retry {:?}", e))?; - let authentication_client = Arc::new(Mutex::new( - TeaclaveAuthenticationInternalClient::new(authentication_channel), - )); - - let management_channel = management_service_endpoint - .connect() - .await - .map_err(|e| anyhow!("Failed to connect to management service, {:?}", e))?; - let management_client = Arc::new(Mutex::new(TeaclaveManagementClient::new( - management_channel, - ))); - Ok(Self { authentication_client, management_client, + audit_log_buffer, }) } + + pub async fn push_log(&self, entry: Entry) { + let mut buffer_lock = self.audit_log_buffer.lock().await; + buffer_lock.push(entry); + } } #[teaclave_rpc::async_trait] @@ -405,6 +446,18 @@ impl TeaclaveFrontend for TeaclaveFrontendService { ) -> TeaclaveServiceResponseResult<()> { authentication_and_forward_to_management!(self, request, cancel_task, Endpoints::CancelTask) } + + async fn query_audit_logs( + &self, + request: Request<QueryAuditLogsRequest>, + ) -> TeaclaveServiceResponseResult<QueryAuditLogsResponse> { + authentication_and_forward_to_management!( + self, + request, + query_audit_logs, + Endpoints::QueryAuditLogs + ) + } } impl TeaclaveFrontendService { diff --git a/services/management/enclave/src/audit/auditor.rs b/services/management/enclave/src/audit/auditor.rs index f8ac6790..46cfab35 100644 --- a/services/management/enclave/src/audit/auditor.rs +++ b/services/management/enclave/src/audit/auditor.rs @@ -21,8 +21,7 @@ use teaclave_proto::teaclave_storage_service::TeaclaveStorageClient; use teaclave_rpc::transport::Channel; use teaclave_types::{Entry, EntryBuilder}; -use std::sync::Arc; -use tokio::sync::Mutex; +use std::sync::{Arc, Mutex}; use anyhow::{anyhow, Result}; use tantivy::{ @@ -38,7 +37,9 @@ pub struct Auditor { } impl Auditor { - pub fn try_new(storage: Arc<Mutex<TeaclaveStorageClient<Channel>>>) -> Result<Self> { + pub fn try_new( + storage: Arc<tokio::sync::Mutex<TeaclaveStorageClient<Channel>>>, + ) -> Result<Self> { let directory = db_directory::DbDirectory::new(storage); let schema = Self::log_schema(); @@ -74,8 +75,8 @@ impl Auditor { }) } - pub async fn add_logs(&self, logs: Vec<Entry>) -> Result<()> { - let mut writer = self.writer.lock().await; + pub fn add_logs(&self, logs: Vec<Entry>) -> Result<()> { + let mut writer = self.writer.lock().unwrap(); for log in logs { let document = Self::convert_to_doc(log); @@ -89,12 +90,13 @@ impl Auditor { /// query: the query for tantivy /// limit: maximum number of the returned logs - pub async fn query_logs(&self, query: &str, limit: usize) -> Result<Vec<Entry>> { - let index = self.index.lock().await; - let schema = Self::log_schema(); - - let reader = self.reader.lock().await; + pub fn query_logs(&self, query: &str, limit: usize) -> Result<Vec<Entry>> { + let reader = self.reader.lock().unwrap(); let searcher = reader.searcher(); + drop(reader); + + let index = self.index.lock().unwrap(); + let schema = Self::log_schema(); let message = schema.get_field("message").unwrap(); let date = schema.get_field("date").unwrap(); @@ -151,7 +153,7 @@ impl Auditor { let entry = EntryBuilder::new() .microsecond(microsecond) - .ip(ip.to_ipv4().unwrap()) + .ip(ip) .user(user.to_owned()) .message(message.to_owned()) .result(result) @@ -172,7 +174,7 @@ impl Auditor { let mut doc = Document::default(); doc.add_date(date, date_v); - doc.add_ip_addr(ip, entry.ip().to_ipv6_compatible()); + doc.add_ip_addr(ip, entry.ip()); doc.add_text(user, &entry.user()); doc.add_text(message, &entry.message()); doc.add_bool(result, entry.result()); diff --git a/services/management/enclave/src/audit/db_directory.rs b/services/management/enclave/src/audit/db_directory.rs index 0562608d..6e0f3c2c 100644 --- a/services/management/enclave/src/audit/db_directory.rs +++ b/services/management/enclave/src/audit/db_directory.rs @@ -46,19 +46,15 @@ struct Cache { shared_directory: DbDirectory, data: Cursor<Vec<u8>>, is_flushed: bool, - rt: Runtime, } impl Cache { fn new(path_buf: PathBuf, shared_directory: DbDirectory) -> Self { - let rt = Builder::new_current_thread().enable_all().build().unwrap(); - Cache { path: path_buf, data: Cursor::new(Vec::new()), shared_directory, is_flushed: true, - rt, } } } @@ -85,13 +81,10 @@ impl Write for Cache { } fn flush(&mut self) -> io::Result<()> { + self.shared_directory + .write(&self.path, self.data.get_ref())?; self.is_flushed = true; - let key = DB_PREFIX.clone() + &self.path.to_string_lossy(); - let request = PutRequest::new(key.as_bytes(), self.data.get_ref().clone()); - self.rt - .block_on(self.shared_directory.db.blocking_lock().put(request)) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; Ok(()) } } diff --git a/services/management/enclave/src/error.rs b/services/management/enclave/src/error.rs index a8d7f88a..9ae56c08 100644 --- a/services/management/enclave/src/error.rs +++ b/services/management/enclave/src/error.rs @@ -17,6 +17,7 @@ use teaclave_rpc::{Code, Status}; use thiserror::Error; + #[derive(Error, Debug)] pub(crate) enum ManagementServiceError { #[error("service internal error")] diff --git a/services/management/enclave/src/lib.rs b/services/management/enclave/src/lib.rs index 5d6f5bd6..4f5e75ad 100644 --- a/services/management/enclave/src/lib.rs +++ b/services/management/enclave/src/lib.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#![feature(result_flattening)] + #[macro_use] extern crate log; extern crate sgx_types; @@ -37,7 +39,7 @@ mod error; mod service; // Sets the number of worker threads the Runtime will use. -const N_WORKERS: usize = 8; +const N_WORKERS: usize = 16; async fn start_service(config: &RuntimeConfig) -> Result<()> { info!("Starting Management..."); diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs index 0c2649a5..7d182a32 100644 --- a/services/management/enclave/src/service.rs +++ b/services/management/enclave/src/service.rs @@ -33,11 +33,11 @@ use teaclave_proto::teaclave_frontend_service::{ GetFunctionResponse, GetFunctionUsageStatsRequest, GetFunctionUsageStatsResponse, GetInputFileRequest, GetInputFileResponse, GetOutputFileRequest, GetOutputFileResponse, GetTaskRequest, GetTaskResponse, InvokeTaskRequest, ListFunctionsRequest, - ListFunctionsResponse, RegisterFunctionRequest, RegisterFunctionResponse, - RegisterFusionOutputRequest, RegisterFusionOutputResponse, RegisterInputFileRequest, - RegisterInputFileResponse, RegisterInputFromOutputRequest, RegisterInputFromOutputResponse, - RegisterOutputFileRequest, RegisterOutputFileResponse, UpdateFunctionRequest, - UpdateFunctionResponse, UpdateInputFileRequest, UpdateInputFileResponse, + ListFunctionsResponse, QueryAuditLogsRequest, QueryAuditLogsResponse, RegisterFunctionRequest, + RegisterFunctionResponse, RegisterFusionOutputRequest, RegisterFusionOutputResponse, + RegisterInputFileRequest, RegisterInputFileResponse, RegisterInputFromOutputRequest, + RegisterInputFromOutputResponse, RegisterOutputFileRequest, RegisterOutputFileResponse, + UpdateFunctionRequest, UpdateFunctionResponse, UpdateInputFileRequest, UpdateInputFileResponse, UpdateOutputFileRequest, UpdateOutputFileResponse, }; use teaclave_proto::teaclave_management_service::{SaveLogsRequest, TeaclaveManagement}; @@ -50,6 +50,7 @@ use teaclave_rpc::{Request, Response}; use teaclave_service_enclave_utils::ensure; use teaclave_types::*; use tokio::sync::Mutex; +use tokio::task; use url::Url; use uuid::Uuid; @@ -933,12 +934,46 @@ impl TeaclaveManagement for TeaclaveManagementService { ManagementServiceError::AuditError(err_msg) })?; - self.auditor.add_logs(logs).await.map_err(|e| { - let err_msg = format!("failed to save logs {:?}", e); + let auditor = self.auditor.clone(); + task::spawn_blocking(move || auditor.add_logs(logs)) + .await + .map_err(|e| anyhow!("{}", e.to_string())) + .flatten() + .map_err(|e| { + let err_msg = format!("failed to save logs {:?}", e); + ManagementServiceError::AuditError(err_msg) + })?; + + Ok(Response::new(())) + } + + // access_control: only + // user_role == admin + async fn query_audit_logs( + &self, + request: Request<QueryAuditLogsRequest>, + ) -> TeaclaveServiceResponseResult<QueryAuditLogsResponse> { + let role = get_request_role(&request)?; + ensure!( + role == UserRole::PlatformAdmin, + ManagementServiceError::PermissionDenied + ); + + let request = request.into_inner(); + let auditor = self.auditor.clone(); + let logs = task::spawn_blocking(move || { + auditor.query_logs(&request.query, request.limit as usize) + }) + .await + .map_err(|e| anyhow!("{}", e.to_string())) + .flatten() + .map_err(|e| { + let err_msg = format!("failed to query logs {:?}", e); ManagementServiceError::AuditError(err_msg) })?; - Ok(Response::new(())) + let response = QueryAuditLogsResponse::new(logs); + Ok(Response::new(response)) } } @@ -949,7 +984,8 @@ impl TeaclaveManagementService { .await .map_err(|e| anyhow!("Failed to connect to storage service, {:?}", e))?; let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel))); - let auditor = Auditor::try_new(storage_client.clone())?; + let client_clone = storage_client.clone(); + let auditor = task::spawn_blocking(move || Auditor::try_new(client_clone)).await??; let service = Self { storage_client, auditor, diff --git a/services/proto/src/proto/teaclave_frontend_service.proto b/services/proto/src/proto/teaclave_frontend_service.proto index 2a94a3a0..db89d706 100644 --- a/services/proto/src/proto/teaclave_frontend_service.proto +++ b/services/proto/src/proto/teaclave_frontend_service.proto @@ -252,6 +252,15 @@ message CancelTaskRequest { string task_id = 1; } +message QueryAuditLogsRequest { + string query = 1; + uint64 limit = 2; +} + +message QueryAuditLogsResponse { + repeated teaclave_common_proto.Entry logs = 1; +} + service TeaclaveFrontend { rpc RegisterInputFile (RegisterInputFileRequest) returns (RegisterInputFileResponse); rpc RegisterOutputFile (RegisterOutputFileRequest) returns (RegisterOutputFileResponse); @@ -274,4 +283,5 @@ service TeaclaveFrontend { rpc ApproveTask (ApproveTaskRequest) returns (google.protobuf.Empty); rpc InvokeTask (InvokeTaskRequest) returns (google.protobuf.Empty); rpc CancelTask (CancelTaskRequest) returns (google.protobuf.Empty); + rpc QueryAuditLogs (QueryAuditLogsRequest) returns (QueryAuditLogsResponse); } diff --git a/services/proto/src/proto/teaclave_management_service.proto b/services/proto/src/proto/teaclave_management_service.proto index 25158d50..d528326c 100644 --- a/services/proto/src/proto/teaclave_management_service.proto +++ b/services/proto/src/proto/teaclave_management_service.proto @@ -53,4 +53,5 @@ service TeaclaveManagement { rpc InvokeTask (teaclave_frontend_service_proto.InvokeTaskRequest) returns (google.protobuf.Empty); rpc CancelTask (teaclave_frontend_service_proto.CancelTaskRequest) returns (google.protobuf.Empty); rpc SaveLogs (SaveLogsRequest) returns (google.protobuf.Empty); + rpc QueryAuditLogs (teaclave_frontend_service_proto.QueryAuditLogsRequest) returns (teaclave_frontend_service_proto.QueryAuditLogsResponse); } diff --git a/services/proto/src/teaclave_common.rs b/services/proto/src/teaclave_common.rs index 762e34cc..a3c5b92c 100644 --- a/services/proto/src/teaclave_common.rs +++ b/services/proto/src/teaclave_common.rs @@ -24,7 +24,7 @@ use teaclave_types::{ }; use std::convert::TryInto; -use std::net::Ipv4Addr; +use std::net::Ipv6Addr; use anyhow::{bail, ensure, Error, Result}; @@ -282,11 +282,11 @@ impl std::convert::TryFrom<proto::Entry> for Entry { type Error = Error; fn try_from(proto: crate::teaclave_common_proto::Entry) -> Result<Self> { - const IPV4_LENTGH: usize = 4; + const IPV6_LENTGH: usize = 16; let len = proto.ip.len(); - ensure!(len == IPV4_LENTGH, "invalid ip length: {}", len); - let ip = Ipv4Addr::from(<Vec<u8> as TryInto<[u8; 4]>>::try_into(proto.ip).unwrap()); + ensure!(len == IPV6_LENTGH, "invalid ip length: {}", len); + let ip = Ipv6Addr::from(<Vec<u8> as TryInto<[u8; 16]>>::try_into(proto.ip).unwrap()); let builder = EntryBuilder::new(); let entry = builder diff --git a/services/proto/src/teaclave_frontend_service.rs b/services/proto/src/teaclave_frontend_service.rs index 8f502195..6e45e9cb 100644 --- a/services/proto/src/teaclave_frontend_service.rs +++ b/services/proto/src/teaclave_frontend_service.rs @@ -20,7 +20,7 @@ use anyhow::{Error, Result}; use core::convert::TryInto; use std::collections::HashMap; use teaclave_types::{ - Executor, ExecutorType, ExternalID, FileAuthTag, FileCrypto, FunctionArgument, + Entry, Executor, ExecutorType, ExternalID, FileAuthTag, FileCrypto, FunctionArgument, FunctionArguments, FunctionBuilder, FunctionInput, FunctionOutput, OwnerList, TaskFileOwners, }; use url::Url; @@ -652,3 +652,23 @@ pub fn from_proto_file_ids(vector: Vec<proto::DataMap>) -> Result<HashMap<String }) .collect() } + +impl QueryAuditLogsRequest { + pub fn new(query: String, limit: usize) -> Self { + Self { + query, + limit: limit as u64, + } + } +} + +impl QueryAuditLogsResponse { + pub fn new(entries: Vec<Entry>) -> Self { + let logs: Vec<crate::teaclave_common_proto::Entry> = entries + .into_iter() + .map(crate::teaclave_common_proto::Entry::from) + .collect(); + + Self { logs } + } +} diff --git a/services/proto/src/teaclave_management_service.rs b/services/proto/src/teaclave_management_service.rs index 46412438..0d29fdb5 100644 --- a/services/proto/src/teaclave_management_service.rs +++ b/services/proto/src/teaclave_management_service.rs @@ -22,6 +22,8 @@ pub use proto::teaclave_management_client::TeaclaveManagementClient; pub use proto::teaclave_management_server::TeaclaveManagement; pub use proto::teaclave_management_server::TeaclaveManagementServer; +use teaclave_types::Entry; + pub type RegisterInputFileRequest = crate::teaclave_frontend_service::RegisterInputFileRequest; pub type UpdateInputFileRequest = crate::teaclave_frontend_service::UpdateInputFileRequest; pub type RegisterInputFileResponse = crate::teaclave_frontend_service::RegisterInputFileResponse; @@ -68,3 +70,15 @@ pub type AssignDataRequest = crate::teaclave_frontend_service::AssignDataRequest pub type ApproveTaskRequest = crate::teaclave_frontend_service::ApproveTaskRequest; pub type InvokeTaskRequest = crate::teaclave_frontend_service::InvokeTaskRequest; pub type CancelTaskRequest = crate::teaclave_frontend_service::CancelTaskRequest; +pub type QueryAuditLogsRequest = crate::teaclave_frontend_service::QueryAuditLogsRequest; +pub type QueryAuditLogsResponse = crate::teaclave_frontend_service::QueryAuditLogsResponse; + +impl SaveLogsRequest { + pub fn new(entries: Vec<Entry>) -> Self { + let logs: Vec<crate::teaclave_common_proto::Entry> = entries + .into_iter() + .map(crate::teaclave_common_proto::Entry::from) + .collect(); + Self { logs } + } +} diff --git a/tests/functional/enclave/src/frontend_service.rs b/tests/functional/enclave/src/frontend_service.rs index e495d9d9..ddfa95ae 100644 --- a/tests/functional/enclave/src/frontend_service.rs +++ b/tests/functional/enclave/src/frontend_service.rs @@ -209,6 +209,66 @@ async fn test_register_function() { let mut client = unauthorized_client().await; let response = client.register_function(request).await; assert!(response.is_err()); + + // Wait for the logs to be sent to the auditor + std::thread::sleep(std::time::Duration::from_secs(35)); + + let function_name = "register_function"; + + // query by user name + let request = QueryAuditLogsRequest::new("user:".to_string() + USERNAME, 100); + let response = authorized_client() + .await + .query_audit_logs(request) + .await + .unwrap(); + + let logs: Vec<_> = response + .into_inner() + .logs + .into_iter() + .map(|e| Entry::try_from(e).unwrap()) + .filter(|e| e.message() == function_name) + .collect(); + assert_eq!(logs.len(), 1); + assert!(logs[0].result()); + + // query by function name stored in the message + let request = QueryAuditLogsRequest::new("message:".to_string() + function_name, 100); + let response = authorized_client() + .await + .query_audit_logs(request) + .await + .unwrap(); + let logs: Vec<_> = response + .into_inner() + .logs + .into_iter() + .map(|e| Entry::try_from(e).unwrap()) + .collect(); + assert_eq!(logs.len(), 2); + assert!(logs[0].user().contains(USERNAME)); + assert!(logs[0].result()); + + assert!(!logs[1].result()); + + let request = QueryAuditLogsRequest::new("message:".to_string() + "authenticate", 100); + let response = authorized_client() + .await + .query_audit_logs(request) + .await + .unwrap(); + let logs: Vec<_> = response + .into_inner() + .logs + .into_iter() + .map(|e| Entry::try_from(e).unwrap()) + .collect(); + // "authenticate" message will only show in the entry with false result and empty user + for log in logs { + assert_eq!(log.user(), ""); + assert!(!log.result()); + } } #[async_test_case] diff --git a/types/src/audit.rs b/types/src/audit.rs index 49d07ebe..437a7949 100644 --- a/types/src/audit.rs +++ b/types/src/audit.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::net::Ipv4Addr; +use std::net::Ipv6Addr; use std::time::{SystemTime, UNIX_EPOCH}; use chrono::NaiveDateTime; @@ -26,8 +26,8 @@ pub struct Entry { /// Timestamp datetime: NaiveDateTime, /// Where the request is from. - ip: Ipv4Addr, - /// Who conducst the request. + ip: Ipv6Addr, + /// Who conducts the request. user: String, /// What the user wants. message: String, @@ -39,7 +39,7 @@ pub struct Entry { impl Default for Entry { fn default() -> Self { let datetime = NaiveDateTime::from_timestamp_micros(0).unwrap(); - let ip = Ipv4Addr::UNSPECIFIED; + let ip = Ipv6Addr::UNSPECIFIED; let user = String::new(); let message = String::new(); let result = false; @@ -59,7 +59,7 @@ impl Entry { self.datetime } - pub fn ip(&self) -> Ipv4Addr { + pub fn ip(&self) -> Ipv6Addr { self.ip } @@ -80,7 +80,7 @@ impl Entry { pub struct EntryBuilder { /// The microsecond since the UNIX epoch microsecond: Option<i64>, - ip: Option<Ipv4Addr>, + ip: Option<Ipv6Addr>, user: Option<String>, message: Option<String>, result: Option<bool>, @@ -96,7 +96,7 @@ impl EntryBuilder { self } - pub fn ip(mut self, ip: Ipv4Addr) -> Self { + pub fn ip(mut self, ip: Ipv6Addr) -> Self { self.ip = Some(ip); self } @@ -129,7 +129,7 @@ impl EntryBuilder { Entry { datetime, - ip: self.ip.unwrap_or(Ipv4Addr::UNSPECIFIED), + ip: self.ip.unwrap_or(Ipv6Addr::UNSPECIFIED), user: self.user.unwrap_or_default(), message: self.message.unwrap_or_default(), result: self.result.unwrap_or(false), diff --git a/types/src/user.rs b/types/src/user.rs index 950abd1b..8ffe0064 100644 --- a/types/src/user.rs +++ b/types/src/user.rs @@ -103,3 +103,9 @@ impl UserAuthClaims { UserRole::from_str(&self.role) } } + +impl std::fmt::Display for UserAuthClaims { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{{ user: {}, role: {:?} }}", self.sub, self.get_role()) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
