This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch connectors-http-config in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0edb033bf1c7a1006c3db4aed7f295222fa6a22e Author: Maciej Modzelewski <[email protected]> AuthorDate: Wed Nov 19 15:46:24 2025 +0100 feat(connectors): implement http configuration provider --- Cargo.lock | 1 + core/connectors/runtime/Cargo.toml | 1 + core/connectors/runtime/example_config/config.toml | 7 +- core/connectors/runtime/src/configs/connectors.rs | 14 + .../src/configs/connectors/http_provider.rs | 533 +++++++++++++++++++++ .../src/configs/connectors/response_extractor.rs | 320 +++++++++++++ .../runtime/src/configs/connectors/url_builder.rs | 257 ++++++++++ core/connectors/runtime/src/configs/runtime.rs | 33 +- core/connectors/runtime/src/error.rs | 3 + 9 files changed, 1165 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca82d5574..e191a35a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4738,6 +4738,7 @@ dependencies = [ "mimalloc", "once_cell", "postcard", + "reqwest", "serde", "serde_json", "serde_yaml_ng", diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index ceee9bb6e..01af8c146 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -46,6 +46,7 @@ iggy_connector_sdk = { workspace = true } mimalloc = { workspace = true } once_cell = { workspace = true } postcard = { workspace = true } +reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_yaml_ng = { workspace = true } diff --git a/core/connectors/runtime/example_config/config.toml b/core/connectors/runtime/example_config/config.toml index 6bcbc9b22..55cd8dedf 100644 --- a/core/connectors/runtime/example_config/config.toml +++ b/core/connectors/runtime/example_config/config.toml @@ -48,5 +48,8 @@ ca_file = "core/certs/iggy_ca_cert.pem" path = "local_state" [connectors] -config_type = "local" -config_dir = "core/connectors/runtime/example_config/connectors" +config_type = "http" +endpoint = "" + +[connectors.custom_headers] +Cookie = "session=test" diff --git a/core/connectors/runtime/src/configs/connectors.rs b/core/connectors/runtime/src/configs/connectors.rs index 7d2502cd3..4adee182a 100644 --- a/core/connectors/runtime/src/configs/connectors.rs +++ b/core/connectors/runtime/src/configs/connectors.rs @@ -17,8 +17,12 @@ * under the License. */ +pub mod http_provider; mod local_provider; +mod response_extractor; +mod url_builder; +use crate::configs::connectors::http_provider::HttpConnectorsConfigProvider; use crate::configs::connectors::local_provider::LocalConnectorsConfigProvider; use crate::configs::runtime::ConnectorsConfig as RuntimeConnectorsConfig; use crate::error::RuntimeError; @@ -233,6 +237,16 @@ pub async fn create_connectors_config_provider( let provider = provider.init().await?; Ok(Box::new(provider)) } + RuntimeConnectorsConfig::Http(config) => { + let provider = HttpConnectorsConfigProvider::new( + &config.base_url, + &config.custom_headers, + config.timeout.get_duration(), + config.url_templates.clone(), + config.response.clone(), + )?; + Ok(Box::new(provider)) + } } } diff --git a/core/connectors/runtime/src/configs/connectors/http_provider.rs b/core/connectors/runtime/src/configs/connectors/http_provider.rs new file mode 100644 index 000000000..263e3985a --- /dev/null +++ b/core/connectors/runtime/src/configs/connectors/http_provider.rs @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::configs::connectors::response_extractor::ResponseExtractor; +use crate::configs::connectors::url_builder::{ + TEMPLATE_CREATE_SINK, TEMPLATE_CREATE_SOURCE, TEMPLATE_DELETE_SINK_CONFIG, + TEMPLATE_DELETE_SOURCE_CONFIG, TEMPLATE_GET_ACTIVE_CONFIGS, TEMPLATE_GET_ACTIVE_SINK_CONFIG, + TEMPLATE_GET_ACTIVE_SOURCE_CONFIG, TEMPLATE_GET_ACTIVE_VERSIONS, TEMPLATE_GET_SINK_CONFIG, + TEMPLATE_GET_SINK_CONFIGS, TEMPLATE_GET_SOURCE_CONFIG, TEMPLATE_GET_SOURCE_CONFIGS, + TEMPLATE_SET_ACTIVE_SINK, TEMPLATE_SET_ACTIVE_SOURCE, UrlBuilder, +}; +use crate::configs::connectors::{ + ConnectorConfigVersions, ConnectorsConfig, ConnectorsConfigProvider, CreateSinkConfig, + CreateSourceConfig, SinkConfig, SourceConfig, +}; +use crate::configs::runtime::ResponseConfig; +use crate::error::RuntimeError; +use async_trait::async_trait; +use reqwest; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::Duration; + +#[derive(Debug, Serialize, Deserialize)] +struct SetActiveVersionRequest { + version: u64, +} + +pub struct HttpConnectorsConfigProvider { + url_builder: UrlBuilder, + response_extractor: ResponseExtractor, + client: reqwest::Client, +} + +impl HttpConnectorsConfigProvider { + pub fn new( + base_url: &str, + custom_headers: &HashMap<String, String>, + timeout: Duration, + url_templates: Option<HashMap<String, String>>, + response_config: Option<ResponseConfig>, + ) -> Result<Self, RuntimeError> { + // Build default headers for the client + let mut headers = reqwest::header::HeaderMap::new(); + for (key, value) in custom_headers { + let header_name = + reqwest::header::HeaderName::from_bytes(key.as_bytes()).map_err(|e| { + RuntimeError::InvalidConfiguration(format!( + "Invalid header name '{}': {}", + key, e + )) + })?; + let header_value = reqwest::header::HeaderValue::from_str(value).map_err(|e| { + RuntimeError::InvalidConfiguration(format!( + "Invalid header value for '{}': {}", + key, e + )) + })?; + headers.insert(header_name, header_value); + } + + // Build the HTTP client with default headers and timeout + let client = reqwest::Client::builder() + .default_headers(headers) + .timeout(timeout) + .build() + .map_err(|e| { + RuntimeError::InvalidConfiguration(format!("Failed to build HTTP client: {}", e)) + })?; + + // Initialize URL builder with custom templates or defaults + let url_builder = UrlBuilder::new(base_url.to_owned(), url_templates); + + // Initialize response extractor with custom config or defaults + let response_extractor = ResponseExtractor::new(response_config); + + Ok(Self { + url_builder, + response_extractor, + client, + }) + } +} + +#[async_trait] +impl ConnectorsConfigProvider for HttpConnectorsConfigProvider { + async fn create_sink_config( + &self, + key: &str, + config: CreateSinkConfig, + ) -> Result<SinkConfig, RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + let url = self.url_builder.build(TEMPLATE_CREATE_SINK, &vars); + + let response = self + .client + .post(&url) + .json(&config) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + let response_text = response.text().await.map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to read response: {}", e)) + })?; + + self.response_extractor.extract(&response_text) + } + + async fn create_source_config( + &self, + key: &str, + config: CreateSourceConfig, + ) -> Result<SourceConfig, RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + let url = self.url_builder.build(TEMPLATE_CREATE_SOURCE, &vars); + + let response = self + .client + .post(&url) + .json(&config) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + let response_text = response.text().await.map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to read response: {}", e)) + })?; + + self.response_extractor.extract(&response_text) + } + + async fn get_active_configs(&self) -> Result<ConnectorsConfig, RuntimeError> { + let vars = HashMap::new(); + let url = self.url_builder.build(TEMPLATE_GET_ACTIVE_CONFIGS, &vars); + + let response = self + .client + .get(&url) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + let response_text = response.text().await.map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to read response: {}", e)) + })?; + + self.response_extractor.extract(&response_text) + } + + async fn get_active_configs_versions(&self) -> Result<ConnectorConfigVersions, RuntimeError> { + let vars = HashMap::new(); + let url = self.url_builder.build(TEMPLATE_GET_ACTIVE_VERSIONS, &vars); + + let response = self + .client + .get(&url) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + let response_text = response.text().await.map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to read response: {}", e)) + })?; + + self.response_extractor.extract(&response_text) + } + + async fn set_active_sink_version(&self, key: &str, version: u64) -> Result<(), RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + let url = self.url_builder.build(TEMPLATE_SET_ACTIVE_SINK, &vars); + + let request_body = SetActiveVersionRequest { version }; + let response = self + .client + .put(&url) + .json(&request_body) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + Ok(()) + } + + async fn set_active_source_version(&self, key: &str, version: u64) -> Result<(), RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + let url = self.url_builder.build(TEMPLATE_SET_ACTIVE_SOURCE, &vars); + + let request_body = SetActiveVersionRequest { version }; + let response = self + .client + .put(&url) + .json(&request_body) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + Ok(()) + } + + async fn get_sink_configs(&self, key: &str) -> Result<Vec<SinkConfig>, RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + let url = self.url_builder.build(TEMPLATE_GET_SINK_CONFIGS, &vars); + + let response = self + .client + .get(&url) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + let response_text = response.text().await.map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to read response: {}", e)) + })?; + + self.response_extractor.extract(&response_text) + } + + async fn get_sink_config( + &self, + key: &str, + version: Option<u64>, + ) -> Result<Option<SinkConfig>, RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + + let url = match version { + Some(v) => { + let version_str = v.to_string(); + vars.insert("version", &version_str); + self.url_builder.build(TEMPLATE_GET_SINK_CONFIG, &vars) + } + None => self + .url_builder + .build(TEMPLATE_GET_ACTIVE_SINK_CONFIG, &vars), + }; + + let response = self + .client + .get(&url) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if response.status() == reqwest::StatusCode::NOT_FOUND { + return Ok(None); + } + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + let response_text = response.text().await.map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to read response: {}", e)) + })?; + + let config: SinkConfig = self.response_extractor.extract(&response_text)?; + Ok(Some(config)) + } + + async fn get_source_configs(&self, key: &str) -> Result<Vec<SourceConfig>, RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + let url = self.url_builder.build(TEMPLATE_GET_SOURCE_CONFIGS, &vars); + + let response = self + .client + .get(&url) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + let response_text = response.text().await.map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to read response: {}", e)) + })?; + + self.response_extractor.extract(&response_text) + } + + async fn get_source_config( + &self, + key: &str, + version: Option<u64>, + ) -> Result<Option<SourceConfig>, RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + + let url = match version { + Some(v) => { + let version_str = v.to_string(); + vars.insert("version", &version_str); + self.url_builder.build(TEMPLATE_GET_SOURCE_CONFIG, &vars) + } + None => self + .url_builder + .build(TEMPLATE_GET_ACTIVE_SOURCE_CONFIG, &vars), + }; + + let response = self + .client + .get(&url) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if response.status() == reqwest::StatusCode::NOT_FOUND { + return Ok(None); + } + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + let response_text = response.text().await.map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to read response: {}", e)) + })?; + + let config: SourceConfig = self.response_extractor.extract(&response_text)?; + Ok(Some(config)) + } + + async fn delete_sink_config( + &self, + key: &str, + version: Option<u64>, + ) -> Result<(), RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + + let url = if let Some(v) = version { + let version_str = v.to_string(); + let mut query_params = HashMap::new(); + query_params.insert("version", version_str.as_str()); + self.url_builder + .build_with_query(TEMPLATE_DELETE_SINK_CONFIG, &vars, &query_params) + } else { + self.url_builder.build(TEMPLATE_DELETE_SINK_CONFIG, &vars) + }; + + let response = self + .client + .delete(&url) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + Ok(()) + } + + async fn delete_source_config( + &self, + key: &str, + version: Option<u64>, + ) -> Result<(), RuntimeError> { + let mut vars = HashMap::new(); + vars.insert("key", key); + + let url = if let Some(v) = version { + let version_str = v.to_string(); + let mut query_params = HashMap::new(); + query_params.insert("version", version_str.as_str()); + self.url_builder + .build_with_query(TEMPLATE_DELETE_SOURCE_CONFIG, &vars, &query_params) + } else { + self.url_builder.build(TEMPLATE_DELETE_SOURCE_CONFIG, &vars) + }; + + let response = self + .client + .delete(&url) + .send() + .await + .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(RuntimeError::HttpRequestFailed(format!( + "HTTP {} - {}", + status, error_text + ))); + } + + Ok(()) + } +} diff --git a/core/connectors/runtime/src/configs/connectors/response_extractor.rs b/core/connectors/runtime/src/configs/connectors/response_extractor.rs new file mode 100644 index 000000000..fa9b55849 --- /dev/null +++ b/core/connectors/runtime/src/configs/connectors/response_extractor.rs @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::configs::runtime::ResponseConfig; +use crate::error::RuntimeError; +use serde::de::DeserializeOwned; +use serde_json::Value; + +/// Extracts data from JSON responses using configured paths +pub struct ResponseExtractor { + data_path: Option<String>, + error_path: Option<String>, +} + +impl ResponseExtractor { + /// Creates a new ResponseExtractor with optional configuration + pub fn new(config: Option<ResponseConfig>) -> Self { + match config { + Some(cfg) => Self { + data_path: cfg.data_path, + error_path: cfg.error_path, + }, + None => Self { + data_path: None, + error_path: None, + }, + } + } + + /// Extracts data from a JSON response body + /// + /// If data_path is configured, navigates to that path and deserializes. + /// Otherwise, deserializes the entire response body directly. + pub fn extract<T: DeserializeOwned>(&self, response_body: &str) -> Result<T, RuntimeError> { + // Parse the response as JSON + let json_value: Value = serde_json::from_str(response_body).map_err(|e| { + RuntimeError::HttpRequestFailed(format!("Failed to parse JSON response: {}", e)) + })?; + + // Check for errors first if error_path is configured + if let Some(error_path) = &self.error_path { + if let Some(error_value) = self.navigate_path(&json_value, error_path) { + // If the error field exists and is not null, return an error + if !error_value.is_null() { + let error_msg = match error_value { + Value::String(s) => s.clone(), + _ => error_value.to_string(), + }; + return Err(RuntimeError::HttpRequestFailed(format!( + "API returned error: {}", + error_msg + ))); + } + } + } + + // Extract data from the specified path or use the entire response + let target_value = if let Some(data_path) = &self.data_path { + self.navigate_path(&json_value, data_path).ok_or_else(|| { + RuntimeError::HttpRequestFailed(format!( + "Data path '{}' not found in response. Response structure: {}", + data_path, + Self::summarize_structure(&json_value) + )) + })? + } else { + &json_value + }; + + // Deserialize the target value into the desired type + serde_json::from_value(target_value.clone()).map_err(|e| { + RuntimeError::HttpRequestFailed(format!( + "Failed to deserialize data: {}. Value: {}", + e, target_value + )) + }) + } + + /// Navigates through a JSON structure using dot-notation path + /// + /// Example: "data.config" navigates to json["data"]["config"] + fn navigate_path<'a>(&self, json: &'a Value, path: &str) -> Option<&'a Value> { + let parts: Vec<&str> = path.split('.').collect(); + let mut current = json; + + for part in parts { + current = match current { + Value::Object(map) => map.get(part)?, + Value::Array(arr) => { + // Support array indexing like "items.0.name" + if let Ok(index) = part.parse::<usize>() { + arr.get(index)? + } else { + return None; + } + } + _ => return None, + }; + } + + Some(current) + } + + /// Creates a summary of JSON structure for error messages + fn summarize_structure(json: &Value) -> String { + match json { + Value::Object(map) => { + let keys: Vec<&String> = map.keys().collect(); + if keys.len() <= 5 { + format!( + "{{ {} }}", + keys.iter() + .map(|k| k.as_str()) + .collect::<Vec<_>>() + .join(", ") + ) + } else { + format!( + "{{ {} ... and {} more fields }}", + keys[..5] + .iter() + .map(|k| k.as_str()) + .collect::<Vec<_>>() + .join(", "), + keys.len() - 5 + ) + } + } + Value::Array(arr) => format!("[array of {} items]", arr.len()), + Value::String(_) => "string".to_string(), + Value::Number(_) => "number".to_string(), + Value::Bool(_) => "bool".to_string(), + Value::Null => "null".to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Serialize, Deserialize, PartialEq)] + struct TestData { + name: String, + value: i32, + } + + #[test] + fn test_direct_extraction_no_path() { + let extractor = ResponseExtractor::new(None); + let response = r#"{"name": "test", "value": 42}"#; + + let result: Result<TestData, _> = extractor.extract(response); + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + TestData { + name: "test".to_string(), + value: 42 + } + ); + } + + #[test] + fn test_nested_extraction_with_path() { + let config = ResponseConfig { + data_path: Some("data.config".to_string()), + error_path: None, + }; + let extractor = ResponseExtractor::new(Some(config)); + let response = r#"{"status": "ok", "data": {"config": {"name": "test", "value": 42}}}"#; + + let result: Result<TestData, _> = extractor.extract(response); + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + TestData { + name: "test".to_string(), + value: 42 + } + ); + } + + #[test] + fn test_wrapped_response() { + let config = ResponseConfig { + data_path: Some("result".to_string()), + error_path: None, + }; + let extractor = ResponseExtractor::new(Some(config)); + let response = r#"{"result": {"name": "wrapped", "value": 99}}"#; + + let result: Result<TestData, _> = extractor.extract(response); + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + TestData { + name: "wrapped".to_string(), + value: 99 + } + ); + } + + #[test] + fn test_missing_path_error() { + let config = ResponseConfig { + data_path: Some("missing.path".to_string()), + error_path: None, + }; + let extractor = ResponseExtractor::new(Some(config)); + let response = r#"{"status": "ok", "data": {"config": {}}}"#; + + let result: Result<TestData, _> = extractor.extract(response); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Data path 'missing.path' not found") + ); + } + + #[test] + fn test_error_path_detection() { + let config = ResponseConfig { + data_path: Some("data".to_string()), + error_path: Some("error.message".to_string()), + }; + let extractor = ResponseExtractor::new(Some(config)); + let response = r#"{"error": {"message": "Something went wrong"}, "data": null}"#; + + let result: Result<TestData, _> = extractor.extract(response); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("API returned error: Something went wrong") + ); + } + + #[test] + fn test_error_path_null_is_ok() { + let config = ResponseConfig { + data_path: Some("data".to_string()), + error_path: Some("error".to_string()), + }; + let extractor = ResponseExtractor::new(Some(config)); + let response = r#"{"error": null, "data": {"name": "success", "value": 1}}"#; + + let result: Result<TestData, _> = extractor.extract(response); + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + TestData { + name: "success".to_string(), + value: 1 + } + ); + } + + #[test] + fn test_deep_nested_path() { + let config = ResponseConfig { + data_path: Some("response.body.result.data".to_string()), + error_path: None, + }; + let extractor = ResponseExtractor::new(Some(config)); + let response = + r#"{"response": {"body": {"result": {"data": {"name": "deep", "value": 123}}}}}"#; + + let result: Result<TestData, _> = extractor.extract(response); + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + TestData { + name: "deep".to_string(), + value: 123 + } + ); + } + + #[test] + fn test_array_index_access() { + let config = ResponseConfig { + data_path: Some("items.0".to_string()), + error_path: None, + }; + let extractor = ResponseExtractor::new(Some(config)); + let response = + r#"{"items": [{"name": "first", "value": 10}, {"name": "second", "value": 20}]}"#; + + let result: Result<TestData, _> = extractor.extract(response); + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + TestData { + name: "first".to_string(), + value: 10 + } + ); + } +} diff --git a/core/connectors/runtime/src/configs/connectors/url_builder.rs b/core/connectors/runtime/src/configs/connectors/url_builder.rs new file mode 100644 index 000000000..a627898fe --- /dev/null +++ b/core/connectors/runtime/src/configs/connectors/url_builder.rs @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::collections::HashMap; + +/// Default RESTful URL templates for connector operations +pub const DEFAULT_CREATE_SINK: &str = "/sinks/{key}/configs"; +pub const DEFAULT_CREATE_SOURCE: &str = "/sources/{key}/configs"; +pub const DEFAULT_GET_ACTIVE_CONFIGS: &str = "/configs/active"; +pub const DEFAULT_GET_ACTIVE_VERSIONS: &str = "/configs/active/versions"; +pub const DEFAULT_SET_ACTIVE_SINK: &str = "/sinks/{key}/configs/active"; +pub const DEFAULT_SET_ACTIVE_SOURCE: &str = "/sources/{key}/configs/active"; +pub const DEFAULT_GET_SINK_CONFIGS: &str = "/sinks/{key}/configs"; +pub const DEFAULT_GET_SINK_CONFIG_BY_VERSION: &str = "/sinks/{key}/configs/{version}"; +pub const DEFAULT_GET_ACTIVE_SINK_CONFIG: &str = "/sinks/{key}/configs/active"; +pub const DEFAULT_GET_SOURCE_CONFIGS: &str = "/sources/{key}/configs"; +pub const DEFAULT_GET_SOURCE_CONFIG_BY_VERSION: &str = "/sources/{key}/configs/{version}"; +pub const DEFAULT_GET_ACTIVE_SOURCE_CONFIG: &str = "/sources/{key}/configs/active"; +pub const DEFAULT_DELETE_SINK_CONFIG: &str = "/sinks/{key}/configs"; +pub const DEFAULT_DELETE_SOURCE_CONFIG: &str = "/sources/{key}/configs"; + +/// Template operation names used as keys in the url_templates configuration +pub const TEMPLATE_CREATE_SINK: &str = "create_sink"; +pub const TEMPLATE_CREATE_SOURCE: &str = "create_source"; +pub const TEMPLATE_GET_ACTIVE_CONFIGS: &str = "get_active_configs"; +pub const TEMPLATE_GET_ACTIVE_VERSIONS: &str = "get_active_versions"; +pub const TEMPLATE_SET_ACTIVE_SINK: &str = "set_active_sink"; +pub const TEMPLATE_SET_ACTIVE_SOURCE: &str = "set_active_source"; +pub const TEMPLATE_GET_SINK_CONFIGS: &str = "get_sink_configs"; +pub const TEMPLATE_GET_SINK_CONFIG: &str = "get_sink_config"; +pub const TEMPLATE_GET_ACTIVE_SINK_CONFIG: &str = "get_active_sink_config"; +pub const TEMPLATE_GET_SOURCE_CONFIGS: &str = "get_source_configs"; +pub const TEMPLATE_GET_SOURCE_CONFIG: &str = "get_source_config"; +pub const TEMPLATE_GET_ACTIVE_SOURCE_CONFIG: &str = "get_active_source_config"; +pub const TEMPLATE_DELETE_SINK_CONFIG: &str = "delete_sink_config"; +pub const TEMPLATE_DELETE_SOURCE_CONFIG: &str = "delete_source_config"; + +/// URL builder for constructing HTTP endpoints from templates +pub struct UrlBuilder { + base_url: String, + templates: HashMap<String, String>, +} + +impl UrlBuilder { + /// Creates a new UrlBuilder with optional custom templates + /// + /// If templates are not provided, uses RESTful defaults + pub fn new(base_url: String, custom_templates: Option<HashMap<String, String>>) -> Self { + let mut templates = HashMap::new(); + + // Initialize with defaults + templates.insert( + TEMPLATE_CREATE_SINK.to_string(), + DEFAULT_CREATE_SINK.to_string(), + ); + templates.insert( + TEMPLATE_CREATE_SOURCE.to_string(), + DEFAULT_CREATE_SOURCE.to_string(), + ); + templates.insert( + TEMPLATE_GET_ACTIVE_CONFIGS.to_string(), + DEFAULT_GET_ACTIVE_CONFIGS.to_string(), + ); + templates.insert( + TEMPLATE_GET_ACTIVE_VERSIONS.to_string(), + DEFAULT_GET_ACTIVE_VERSIONS.to_string(), + ); + templates.insert( + TEMPLATE_SET_ACTIVE_SINK.to_string(), + DEFAULT_SET_ACTIVE_SINK.to_string(), + ); + templates.insert( + TEMPLATE_SET_ACTIVE_SOURCE.to_string(), + DEFAULT_SET_ACTIVE_SOURCE.to_string(), + ); + templates.insert( + TEMPLATE_GET_SINK_CONFIGS.to_string(), + DEFAULT_GET_SINK_CONFIGS.to_string(), + ); + templates.insert( + TEMPLATE_GET_SINK_CONFIG.to_string(), + DEFAULT_GET_SINK_CONFIG_BY_VERSION.to_string(), + ); + templates.insert( + TEMPLATE_GET_ACTIVE_SINK_CONFIG.to_string(), + DEFAULT_GET_ACTIVE_SINK_CONFIG.to_string(), + ); + templates.insert( + TEMPLATE_GET_SOURCE_CONFIGS.to_string(), + DEFAULT_GET_SOURCE_CONFIGS.to_string(), + ); + templates.insert( + TEMPLATE_GET_SOURCE_CONFIG.to_string(), + DEFAULT_GET_SOURCE_CONFIG_BY_VERSION.to_string(), + ); + templates.insert( + TEMPLATE_GET_ACTIVE_SOURCE_CONFIG.to_string(), + DEFAULT_GET_ACTIVE_SOURCE_CONFIG.to_string(), + ); + templates.insert( + TEMPLATE_DELETE_SINK_CONFIG.to_string(), + DEFAULT_DELETE_SINK_CONFIG.to_string(), + ); + templates.insert( + TEMPLATE_DELETE_SOURCE_CONFIG.to_string(), + DEFAULT_DELETE_SOURCE_CONFIG.to_string(), + ); + + // Override with custom templates if provided + if let Some(custom) = custom_templates { + for (key, value) in custom { + templates.insert(key, value); + } + } + + Self { + base_url, + templates, + } + } + + /// Builds a URL for the specified operation with variable substitution + pub fn build(&self, operation: &str, vars: &HashMap<&str, &str>) -> String { + let template = self + .templates + .get(operation) + .map(|s| s.as_str()) + .unwrap_or(""); + + let mut url = template.to_string(); + + // Replace template variables: {key}, {version}, {type} + for (var_name, var_value) in vars { + let placeholder = format!("{{{}}}", var_name); + url = url.replace(&placeholder, var_value); + } + + format!("{}{}", self.base_url, url) + } + + /// Builds a URL with query parameters appended + pub fn build_with_query( + &self, + operation: &str, + vars: &HashMap<&str, &str>, + query_params: &HashMap<&str, &str>, + ) -> String { + let base = self.build(operation, vars); + + if query_params.is_empty() { + return base; + } + + let query_string: Vec<String> = query_params + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect(); + + format!("{}?{}", base, query_string.join("&")) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_templates() { + let builder = UrlBuilder::new("http://localhost:8080".to_string(), None); + + let mut vars = HashMap::new(); + vars.insert("key", "my-sink"); + + let url = builder.build(TEMPLATE_CREATE_SINK, &vars); + assert_eq!(url, "http://localhost:8080/sinks/my-sink/configs"); + } + + #[test] + fn test_custom_template() { + let mut custom = HashMap::new(); + custom.insert( + TEMPLATE_CREATE_SINK.to_string(), + "/api/v2/sinks/{key}/config".to_string(), + ); + + let builder = UrlBuilder::new("http://localhost:8080".to_string(), Some(custom)); + + let mut vars = HashMap::new(); + vars.insert("key", "my-sink"); + + let url = builder.build(TEMPLATE_CREATE_SINK, &vars); + assert_eq!(url, "http://localhost:8080/api/v2/sinks/my-sink/config"); + } + + #[test] + fn test_version_substitution() { + let builder = UrlBuilder::new("http://localhost:8080".to_string(), None); + + let mut vars = HashMap::new(); + vars.insert("key", "my-sink"); + vars.insert("version", "42"); + + let url = builder.build(TEMPLATE_GET_SINK_CONFIG, &vars); + assert_eq!(url, "http://localhost:8080/sinks/my-sink/configs/42"); + } + + #[test] + fn test_query_parameters() { + let builder = UrlBuilder::new("http://localhost:8080".to_string(), None); + + let mut vars = HashMap::new(); + vars.insert("key", "my-sink"); + + let mut query_params = HashMap::new(); + query_params.insert("version", "5"); + + let url = builder.build_with_query(TEMPLATE_DELETE_SINK_CONFIG, &vars, &query_params); + assert_eq!(url, "http://localhost:8080/sinks/my-sink/configs?version=5"); + } + + #[test] + fn test_query_based_custom_template() { + let mut custom = HashMap::new(); + custom.insert( + TEMPLATE_GET_SINK_CONFIG.to_string(), + "/api?action=get&key={key}&version={version}".to_string(), + ); + + let builder = UrlBuilder::new("http://localhost:8080".to_string(), Some(custom)); + + let mut vars = HashMap::new(); + vars.insert("key", "my-sink"); + vars.insert("version", "10"); + + let url = builder.build(TEMPLATE_GET_SINK_CONFIG, &vars); + assert_eq!( + url, + "http://localhost:8080/api?action=get&key=my-sink&version=10" + ); + } +} diff --git a/core/connectors/runtime/src/configs/runtime.rs b/core/connectors/runtime/src/configs/runtime.rs index 1375a3a55..182f643d5 100644 --- a/core/connectors/runtime/src/configs/runtime.rs +++ b/core/connectors/runtime/src/configs/runtime.rs @@ -16,14 +16,14 @@ * specific language governing permissions and limitations * under the License. */ - use crate::api::config::HttpConfig; use figment::providers::{Format, Toml}; use figment::value::Dict; use figment::{Metadata, Profile, Provider}; use iggy_common::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME}; -use iggy_common::{CustomEnvProvider, FileConfigProvider}; +use iggy_common::{CustomEnvProvider, FileConfigProvider, IggyDuration}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fmt::Formatter; #[derive(Debug, Default, Clone, Deserialize, Serialize)] @@ -57,10 +57,33 @@ pub struct LocalConnectorsConfig { pub config_dir: String, } +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct HttpConnectorsConfig { + pub base_url: String, + pub custom_headers: HashMap<String, String>, + #[serde(default = "default_http_timeout")] + pub timeout: IggyDuration, + pub url_templates: Option<HashMap<String, String>>, + pub response: Option<ResponseConfig>, +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct ResponseConfig { + pub data_path: Option<String>, + pub error_path: Option<String>, +} + +fn default_http_timeout() -> IggyDuration { + IggyDuration::new_from_secs(10) +} + #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(tag = "config_type", rename_all = "lowercase")] pub enum ConnectorsConfig { Local(LocalConnectorsConfig), + Http(HttpConnectorsConfig), } impl Default for ConnectorsConfig { @@ -126,6 +149,12 @@ impl std::fmt::Display for ConnectorsConfig { "{{ type: \"file\", config_dir: {:?} }}", config.config_dir ), + ConnectorsConfig::Http(config) => write!( + f, + "{{ type: \"http\", endpoint: {:?}, custom_headers: {:?} }}", + config.base_url, + config.custom_headers.keys() + ), } } } diff --git a/core/connectors/runtime/src/error.rs b/core/connectors/runtime/src/error.rs index 8378e702b..b392b60b6 100644 --- a/core/connectors/runtime/src/error.rs +++ b/core/connectors/runtime/src/error.rs @@ -55,6 +55,8 @@ pub enum RuntimeError { CannotConvertConfiguration, #[error("IO operation failed with error: {0:?}")] IoError(#[from] std::io::Error), + #[error("HTTP request failed: {0}")] + HttpRequestFailed(String), } impl RuntimeError { @@ -66,6 +68,7 @@ impl RuntimeError { RuntimeError::SourceConfigNotFound(_, _) => "source_config_not_found", RuntimeError::MissingIggyCredentials => "invalid_configuration", RuntimeError::InvalidConfiguration(_) => "invalid_configuration", + RuntimeError::HttpRequestFailed(_) => "http_request_failed", _ => "error", } }
