This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch connectors-http-config
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 0edb033bf1c7a1006c3db4aed7f295222fa6a22e
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Wed Nov 19 15:46:24 2025 +0100

    feat(connectors): implement http configuration provider
---
 Cargo.lock                                         |   1 +
 core/connectors/runtime/Cargo.toml                 |   1 +
 core/connectors/runtime/example_config/config.toml |   7 +-
 core/connectors/runtime/src/configs/connectors.rs  |  14 +
 .../src/configs/connectors/http_provider.rs        | 533 +++++++++++++++++++++
 .../src/configs/connectors/response_extractor.rs   | 320 +++++++++++++
 .../runtime/src/configs/connectors/url_builder.rs  | 257 ++++++++++
 core/connectors/runtime/src/configs/runtime.rs     |  33 +-
 core/connectors/runtime/src/error.rs               |   3 +
 9 files changed, 1165 insertions(+), 4 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index ca82d5574..e191a35a9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4738,6 +4738,7 @@ dependencies = [
  "mimalloc",
  "once_cell",
  "postcard",
+ "reqwest",
  "serde",
  "serde_json",
  "serde_yaml_ng",
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index ceee9bb6e..01af8c146 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -46,6 +46,7 @@ iggy_connector_sdk = { workspace = true }
 mimalloc = { workspace = true }
 once_cell = { workspace = true }
 postcard = { workspace = true }
+reqwest = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
 serde_yaml_ng = { workspace = true }
diff --git a/core/connectors/runtime/example_config/config.toml 
b/core/connectors/runtime/example_config/config.toml
index 6bcbc9b22..55cd8dedf 100644
--- a/core/connectors/runtime/example_config/config.toml
+++ b/core/connectors/runtime/example_config/config.toml
@@ -48,5 +48,8 @@ ca_file = "core/certs/iggy_ca_cert.pem"
 path = "local_state"
 
 [connectors]
-config_type = "local"
-config_dir = "core/connectors/runtime/example_config/connectors"
+config_type = "http"
+endpoint = ""
+
+[connectors.custom_headers]
+Cookie = "session=test"
diff --git a/core/connectors/runtime/src/configs/connectors.rs 
b/core/connectors/runtime/src/configs/connectors.rs
index 7d2502cd3..4adee182a 100644
--- a/core/connectors/runtime/src/configs/connectors.rs
+++ b/core/connectors/runtime/src/configs/connectors.rs
@@ -17,8 +17,12 @@
  * under the License.
  */
 
+pub mod http_provider;
 mod local_provider;
+mod response_extractor;
+mod url_builder;
 
+use crate::configs::connectors::http_provider::HttpConnectorsConfigProvider;
 use crate::configs::connectors::local_provider::LocalConnectorsConfigProvider;
 use crate::configs::runtime::ConnectorsConfig as RuntimeConnectorsConfig;
 use crate::error::RuntimeError;
@@ -233,6 +237,16 @@ pub async fn create_connectors_config_provider(
             let provider = provider.init().await?;
             Ok(Box::new(provider))
         }
+        RuntimeConnectorsConfig::Http(config) => {
+            let provider = HttpConnectorsConfigProvider::new(
+                &config.base_url,
+                &config.custom_headers,
+                config.timeout.get_duration(),
+                config.url_templates.clone(),
+                config.response.clone(),
+            )?;
+            Ok(Box::new(provider))
+        }
     }
 }
 
diff --git a/core/connectors/runtime/src/configs/connectors/http_provider.rs 
b/core/connectors/runtime/src/configs/connectors/http_provider.rs
new file mode 100644
index 000000000..263e3985a
--- /dev/null
+++ b/core/connectors/runtime/src/configs/connectors/http_provider.rs
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::configs::connectors::response_extractor::ResponseExtractor;
+use crate::configs::connectors::url_builder::{
+    TEMPLATE_CREATE_SINK, TEMPLATE_CREATE_SOURCE, TEMPLATE_DELETE_SINK_CONFIG,
+    TEMPLATE_DELETE_SOURCE_CONFIG, TEMPLATE_GET_ACTIVE_CONFIGS, 
TEMPLATE_GET_ACTIVE_SINK_CONFIG,
+    TEMPLATE_GET_ACTIVE_SOURCE_CONFIG, TEMPLATE_GET_ACTIVE_VERSIONS, 
TEMPLATE_GET_SINK_CONFIG,
+    TEMPLATE_GET_SINK_CONFIGS, TEMPLATE_GET_SOURCE_CONFIG, 
TEMPLATE_GET_SOURCE_CONFIGS,
+    TEMPLATE_SET_ACTIVE_SINK, TEMPLATE_SET_ACTIVE_SOURCE, UrlBuilder,
+};
+use crate::configs::connectors::{
+    ConnectorConfigVersions, ConnectorsConfig, ConnectorsConfigProvider, 
CreateSinkConfig,
+    CreateSourceConfig, SinkConfig, SourceConfig,
+};
+use crate::configs::runtime::ResponseConfig;
+use crate::error::RuntimeError;
+use async_trait::async_trait;
+use reqwest;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::time::Duration;
+
+#[derive(Debug, Serialize, Deserialize)]
+struct SetActiveVersionRequest {
+    version: u64,
+}
+
+pub struct HttpConnectorsConfigProvider {
+    url_builder: UrlBuilder,
+    response_extractor: ResponseExtractor,
+    client: reqwest::Client,
+}
+
+impl HttpConnectorsConfigProvider {
+    pub fn new(
+        base_url: &str,
+        custom_headers: &HashMap<String, String>,
+        timeout: Duration,
+        url_templates: Option<HashMap<String, String>>,
+        response_config: Option<ResponseConfig>,
+    ) -> Result<Self, RuntimeError> {
+        // Build default headers for the client
+        let mut headers = reqwest::header::HeaderMap::new();
+        for (key, value) in custom_headers {
+            let header_name =
+                
reqwest::header::HeaderName::from_bytes(key.as_bytes()).map_err(|e| {
+                    RuntimeError::InvalidConfiguration(format!(
+                        "Invalid header name '{}': {}",
+                        key, e
+                    ))
+                })?;
+            let header_value = 
reqwest::header::HeaderValue::from_str(value).map_err(|e| {
+                RuntimeError::InvalidConfiguration(format!(
+                    "Invalid header value for '{}': {}",
+                    key, e
+                ))
+            })?;
+            headers.insert(header_name, header_value);
+        }
+
+        // Build the HTTP client with default headers and timeout
+        let client = reqwest::Client::builder()
+            .default_headers(headers)
+            .timeout(timeout)
+            .build()
+            .map_err(|e| {
+                RuntimeError::InvalidConfiguration(format!("Failed to build 
HTTP client: {}", e))
+            })?;
+
+        // Initialize URL builder with custom templates or defaults
+        let url_builder = UrlBuilder::new(base_url.to_owned(), url_templates);
+
+        // Initialize response extractor with custom config or defaults
+        let response_extractor = ResponseExtractor::new(response_config);
+
+        Ok(Self {
+            url_builder,
+            response_extractor,
+            client,
+        })
+    }
+}
+
+#[async_trait]
+impl ConnectorsConfigProvider for HttpConnectorsConfigProvider {
+    async fn create_sink_config(
+        &self,
+        key: &str,
+        config: CreateSinkConfig,
+    ) -> Result<SinkConfig, RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+        let url = self.url_builder.build(TEMPLATE_CREATE_SINK, &vars);
+
+        let response = self
+            .client
+            .post(&url)
+            .json(&config)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        let response_text = response.text().await.map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to read response: 
{}", e))
+        })?;
+
+        self.response_extractor.extract(&response_text)
+    }
+
+    async fn create_source_config(
+        &self,
+        key: &str,
+        config: CreateSourceConfig,
+    ) -> Result<SourceConfig, RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+        let url = self.url_builder.build(TEMPLATE_CREATE_SOURCE, &vars);
+
+        let response = self
+            .client
+            .post(&url)
+            .json(&config)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        let response_text = response.text().await.map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to read response: 
{}", e))
+        })?;
+
+        self.response_extractor.extract(&response_text)
+    }
+
+    async fn get_active_configs(&self) -> Result<ConnectorsConfig, 
RuntimeError> {
+        let vars = HashMap::new();
+        let url = self.url_builder.build(TEMPLATE_GET_ACTIVE_CONFIGS, &vars);
+
+        let response = self
+            .client
+            .get(&url)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        let response_text = response.text().await.map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to read response: 
{}", e))
+        })?;
+
+        self.response_extractor.extract(&response_text)
+    }
+
+    async fn get_active_configs_versions(&self) -> 
Result<ConnectorConfigVersions, RuntimeError> {
+        let vars = HashMap::new();
+        let url = self.url_builder.build(TEMPLATE_GET_ACTIVE_VERSIONS, &vars);
+
+        let response = self
+            .client
+            .get(&url)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        let response_text = response.text().await.map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to read response: 
{}", e))
+        })?;
+
+        self.response_extractor.extract(&response_text)
+    }
+
+    async fn set_active_sink_version(&self, key: &str, version: u64) -> 
Result<(), RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+        let url = self.url_builder.build(TEMPLATE_SET_ACTIVE_SINK, &vars);
+
+        let request_body = SetActiveVersionRequest { version };
+        let response = self
+            .client
+            .put(&url)
+            .json(&request_body)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        Ok(())
+    }
+
+    async fn set_active_source_version(&self, key: &str, version: u64) -> 
Result<(), RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+        let url = self.url_builder.build(TEMPLATE_SET_ACTIVE_SOURCE, &vars);
+
+        let request_body = SetActiveVersionRequest { version };
+        let response = self
+            .client
+            .put(&url)
+            .json(&request_body)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        Ok(())
+    }
+
+    async fn get_sink_configs(&self, key: &str) -> Result<Vec<SinkConfig>, 
RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+        let url = self.url_builder.build(TEMPLATE_GET_SINK_CONFIGS, &vars);
+
+        let response = self
+            .client
+            .get(&url)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        let response_text = response.text().await.map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to read response: 
{}", e))
+        })?;
+
+        self.response_extractor.extract(&response_text)
+    }
+
+    async fn get_sink_config(
+        &self,
+        key: &str,
+        version: Option<u64>,
+    ) -> Result<Option<SinkConfig>, RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+
+        let url = match version {
+            Some(v) => {
+                let version_str = v.to_string();
+                vars.insert("version", &version_str);
+                self.url_builder.build(TEMPLATE_GET_SINK_CONFIG, &vars)
+            }
+            None => self
+                .url_builder
+                .build(TEMPLATE_GET_ACTIVE_SINK_CONFIG, &vars),
+        };
+
+        let response = self
+            .client
+            .get(&url)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if response.status() == reqwest::StatusCode::NOT_FOUND {
+            return Ok(None);
+        }
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        let response_text = response.text().await.map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to read response: 
{}", e))
+        })?;
+
+        let config: SinkConfig = 
self.response_extractor.extract(&response_text)?;
+        Ok(Some(config))
+    }
+
+    async fn get_source_configs(&self, key: &str) -> Result<Vec<SourceConfig>, 
RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+        let url = self.url_builder.build(TEMPLATE_GET_SOURCE_CONFIGS, &vars);
+
+        let response = self
+            .client
+            .get(&url)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        let response_text = response.text().await.map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to read response: 
{}", e))
+        })?;
+
+        self.response_extractor.extract(&response_text)
+    }
+
+    async fn get_source_config(
+        &self,
+        key: &str,
+        version: Option<u64>,
+    ) -> Result<Option<SourceConfig>, RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+
+        let url = match version {
+            Some(v) => {
+                let version_str = v.to_string();
+                vars.insert("version", &version_str);
+                self.url_builder.build(TEMPLATE_GET_SOURCE_CONFIG, &vars)
+            }
+            None => self
+                .url_builder
+                .build(TEMPLATE_GET_ACTIVE_SOURCE_CONFIG, &vars),
+        };
+
+        let response = self
+            .client
+            .get(&url)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if response.status() == reqwest::StatusCode::NOT_FOUND {
+            return Ok(None);
+        }
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        let response_text = response.text().await.map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to read response: 
{}", e))
+        })?;
+
+        let config: SourceConfig = 
self.response_extractor.extract(&response_text)?;
+        Ok(Some(config))
+    }
+
+    async fn delete_sink_config(
+        &self,
+        key: &str,
+        version: Option<u64>,
+    ) -> Result<(), RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+
+        let url = if let Some(v) = version {
+            let version_str = v.to_string();
+            let mut query_params = HashMap::new();
+            query_params.insert("version", version_str.as_str());
+            self.url_builder
+                .build_with_query(TEMPLATE_DELETE_SINK_CONFIG, &vars, 
&query_params)
+        } else {
+            self.url_builder.build(TEMPLATE_DELETE_SINK_CONFIG, &vars)
+        };
+
+        let response = self
+            .client
+            .delete(&url)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        Ok(())
+    }
+
+    async fn delete_source_config(
+        &self,
+        key: &str,
+        version: Option<u64>,
+    ) -> Result<(), RuntimeError> {
+        let mut vars = HashMap::new();
+        vars.insert("key", key);
+
+        let url = if let Some(v) = version {
+            let version_str = v.to_string();
+            let mut query_params = HashMap::new();
+            query_params.insert("version", version_str.as_str());
+            self.url_builder
+                .build_with_query(TEMPLATE_DELETE_SOURCE_CONFIG, &vars, 
&query_params)
+        } else {
+            self.url_builder.build(TEMPLATE_DELETE_SOURCE_CONFIG, &vars)
+        };
+
+        let response = self
+            .client
+            .delete(&url)
+            .send()
+            .await
+            .map_err(|e| RuntimeError::HttpRequestFailed(e.to_string()))?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let error_text = response
+                .text()
+                .await
+                .unwrap_or_else(|_| "Unknown error".to_string());
+            return Err(RuntimeError::HttpRequestFailed(format!(
+                "HTTP {} - {}",
+                status, error_text
+            )));
+        }
+
+        Ok(())
+    }
+}
diff --git 
a/core/connectors/runtime/src/configs/connectors/response_extractor.rs 
b/core/connectors/runtime/src/configs/connectors/response_extractor.rs
new file mode 100644
index 000000000..fa9b55849
--- /dev/null
+++ b/core/connectors/runtime/src/configs/connectors/response_extractor.rs
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::configs::runtime::ResponseConfig;
+use crate::error::RuntimeError;
+use serde::de::DeserializeOwned;
+use serde_json::Value;
+
+/// Extracts data from JSON responses using configured paths
+pub struct ResponseExtractor {
+    data_path: Option<String>,
+    error_path: Option<String>,
+}
+
+impl ResponseExtractor {
+    /// Creates a new ResponseExtractor with optional configuration
+    pub fn new(config: Option<ResponseConfig>) -> Self {
+        match config {
+            Some(cfg) => Self {
+                data_path: cfg.data_path,
+                error_path: cfg.error_path,
+            },
+            None => Self {
+                data_path: None,
+                error_path: None,
+            },
+        }
+    }
+
+    /// Extracts data from a JSON response body
+    ///
+    /// If data_path is configured, navigates to that path and deserializes.
+    /// Otherwise, deserializes the entire response body directly.
+    pub fn extract<T: DeserializeOwned>(&self, response_body: &str) -> 
Result<T, RuntimeError> {
+        // Parse the response as JSON
+        let json_value: Value = 
serde_json::from_str(response_body).map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!("Failed to parse JSON 
response: {}", e))
+        })?;
+
+        // Check for errors first if error_path is configured
+        if let Some(error_path) = &self.error_path {
+            if let Some(error_value) = self.navigate_path(&json_value, 
error_path) {
+                // If the error field exists and is not null, return an error
+                if !error_value.is_null() {
+                    let error_msg = match error_value {
+                        Value::String(s) => s.clone(),
+                        _ => error_value.to_string(),
+                    };
+                    return Err(RuntimeError::HttpRequestFailed(format!(
+                        "API returned error: {}",
+                        error_msg
+                    )));
+                }
+            }
+        }
+
+        // Extract data from the specified path or use the entire response
+        let target_value = if let Some(data_path) = &self.data_path {
+            self.navigate_path(&json_value, data_path).ok_or_else(|| {
+                RuntimeError::HttpRequestFailed(format!(
+                    "Data path '{}' not found in response. Response structure: 
{}",
+                    data_path,
+                    Self::summarize_structure(&json_value)
+                ))
+            })?
+        } else {
+            &json_value
+        };
+
+        // Deserialize the target value into the desired type
+        serde_json::from_value(target_value.clone()).map_err(|e| {
+            RuntimeError::HttpRequestFailed(format!(
+                "Failed to deserialize data: {}. Value: {}",
+                e, target_value
+            ))
+        })
+    }
+
+    /// Navigates through a JSON structure using dot-notation path
+    ///
+    /// Example: "data.config" navigates to json["data"]["config"]
+    fn navigate_path<'a>(&self, json: &'a Value, path: &str) -> Option<&'a 
Value> {
+        let parts: Vec<&str> = path.split('.').collect();
+        let mut current = json;
+
+        for part in parts {
+            current = match current {
+                Value::Object(map) => map.get(part)?,
+                Value::Array(arr) => {
+                    // Support array indexing like "items.0.name"
+                    if let Ok(index) = part.parse::<usize>() {
+                        arr.get(index)?
+                    } else {
+                        return None;
+                    }
+                }
+                _ => return None,
+            };
+        }
+
+        Some(current)
+    }
+
+    /// Creates a summary of JSON structure for error messages
+    fn summarize_structure(json: &Value) -> String {
+        match json {
+            Value::Object(map) => {
+                let keys: Vec<&String> = map.keys().collect();
+                if keys.len() <= 5 {
+                    format!(
+                        "{{ {} }}",
+                        keys.iter()
+                            .map(|k| k.as_str())
+                            .collect::<Vec<_>>()
+                            .join(", ")
+                    )
+                } else {
+                    format!(
+                        "{{ {} ... and {} more fields }}",
+                        keys[..5]
+                            .iter()
+                            .map(|k| k.as_str())
+                            .collect::<Vec<_>>()
+                            .join(", "),
+                        keys.len() - 5
+                    )
+                }
+            }
+            Value::Array(arr) => format!("[array of {} items]", arr.len()),
+            Value::String(_) => "string".to_string(),
+            Value::Number(_) => "number".to_string(),
+            Value::Bool(_) => "bool".to_string(),
+            Value::Null => "null".to_string(),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use serde::{Deserialize, Serialize};
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq)]
+    struct TestData {
+        name: String,
+        value: i32,
+    }
+
+    #[test]
+    fn test_direct_extraction_no_path() {
+        let extractor = ResponseExtractor::new(None);
+        let response = r#"{"name": "test", "value": 42}"#;
+
+        let result: Result<TestData, _> = extractor.extract(response);
+        assert!(result.is_ok());
+        assert_eq!(
+            result.unwrap(),
+            TestData {
+                name: "test".to_string(),
+                value: 42
+            }
+        );
+    }
+
+    #[test]
+    fn test_nested_extraction_with_path() {
+        let config = ResponseConfig {
+            data_path: Some("data.config".to_string()),
+            error_path: None,
+        };
+        let extractor = ResponseExtractor::new(Some(config));
+        let response = r#"{"status": "ok", "data": {"config": {"name": "test", 
"value": 42}}}"#;
+
+        let result: Result<TestData, _> = extractor.extract(response);
+        assert!(result.is_ok());
+        assert_eq!(
+            result.unwrap(),
+            TestData {
+                name: "test".to_string(),
+                value: 42
+            }
+        );
+    }
+
+    #[test]
+    fn test_wrapped_response() {
+        let config = ResponseConfig {
+            data_path: Some("result".to_string()),
+            error_path: None,
+        };
+        let extractor = ResponseExtractor::new(Some(config));
+        let response = r#"{"result": {"name": "wrapped", "value": 99}}"#;
+
+        let result: Result<TestData, _> = extractor.extract(response);
+        assert!(result.is_ok());
+        assert_eq!(
+            result.unwrap(),
+            TestData {
+                name: "wrapped".to_string(),
+                value: 99
+            }
+        );
+    }
+
+    #[test]
+    fn test_missing_path_error() {
+        let config = ResponseConfig {
+            data_path: Some("missing.path".to_string()),
+            error_path: None,
+        };
+        let extractor = ResponseExtractor::new(Some(config));
+        let response = r#"{"status": "ok", "data": {"config": {}}}"#;
+
+        let result: Result<TestData, _> = extractor.extract(response);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("Data path 'missing.path' not found")
+        );
+    }
+
+    #[test]
+    fn test_error_path_detection() {
+        let config = ResponseConfig {
+            data_path: Some("data".to_string()),
+            error_path: Some("error.message".to_string()),
+        };
+        let extractor = ResponseExtractor::new(Some(config));
+        let response = r#"{"error": {"message": "Something went wrong"}, 
"data": null}"#;
+
+        let result: Result<TestData, _> = extractor.extract(response);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("API returned error: Something went wrong")
+        );
+    }
+
+    #[test]
+    fn test_error_path_null_is_ok() {
+        let config = ResponseConfig {
+            data_path: Some("data".to_string()),
+            error_path: Some("error".to_string()),
+        };
+        let extractor = ResponseExtractor::new(Some(config));
+        let response = r#"{"error": null, "data": {"name": "success", "value": 
1}}"#;
+
+        let result: Result<TestData, _> = extractor.extract(response);
+        assert!(result.is_ok());
+        assert_eq!(
+            result.unwrap(),
+            TestData {
+                name: "success".to_string(),
+                value: 1
+            }
+        );
+    }
+
+    #[test]
+    fn test_deep_nested_path() {
+        let config = ResponseConfig {
+            data_path: Some("response.body.result.data".to_string()),
+            error_path: None,
+        };
+        let extractor = ResponseExtractor::new(Some(config));
+        let response =
+            r#"{"response": {"body": {"result": {"data": {"name": "deep", 
"value": 123}}}}}"#;
+
+        let result: Result<TestData, _> = extractor.extract(response);
+        assert!(result.is_ok());
+        assert_eq!(
+            result.unwrap(),
+            TestData {
+                name: "deep".to_string(),
+                value: 123
+            }
+        );
+    }
+
+    #[test]
+    fn test_array_index_access() {
+        let config = ResponseConfig {
+            data_path: Some("items.0".to_string()),
+            error_path: None,
+        };
+        let extractor = ResponseExtractor::new(Some(config));
+        let response =
+            r#"{"items": [{"name": "first", "value": 10}, {"name": "second", 
"value": 20}]}"#;
+
+        let result: Result<TestData, _> = extractor.extract(response);
+        assert!(result.is_ok());
+        assert_eq!(
+            result.unwrap(),
+            TestData {
+                name: "first".to_string(),
+                value: 10
+            }
+        );
+    }
+}
diff --git a/core/connectors/runtime/src/configs/connectors/url_builder.rs 
b/core/connectors/runtime/src/configs/connectors/url_builder.rs
new file mode 100644
index 000000000..a627898fe
--- /dev/null
+++ b/core/connectors/runtime/src/configs/connectors/url_builder.rs
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashMap;
+
+/// Default RESTful URL templates for connector operations
+pub const DEFAULT_CREATE_SINK: &str = "/sinks/{key}/configs";
+pub const DEFAULT_CREATE_SOURCE: &str = "/sources/{key}/configs";
+pub const DEFAULT_GET_ACTIVE_CONFIGS: &str = "/configs/active";
+pub const DEFAULT_GET_ACTIVE_VERSIONS: &str = "/configs/active/versions";
+pub const DEFAULT_SET_ACTIVE_SINK: &str = "/sinks/{key}/configs/active";
+pub const DEFAULT_SET_ACTIVE_SOURCE: &str = "/sources/{key}/configs/active";
+pub const DEFAULT_GET_SINK_CONFIGS: &str = "/sinks/{key}/configs";
+pub const DEFAULT_GET_SINK_CONFIG_BY_VERSION: &str = 
"/sinks/{key}/configs/{version}";
+pub const DEFAULT_GET_ACTIVE_SINK_CONFIG: &str = "/sinks/{key}/configs/active";
+pub const DEFAULT_GET_SOURCE_CONFIGS: &str = "/sources/{key}/configs";
+pub const DEFAULT_GET_SOURCE_CONFIG_BY_VERSION: &str = 
"/sources/{key}/configs/{version}";
+pub const DEFAULT_GET_ACTIVE_SOURCE_CONFIG: &str = 
"/sources/{key}/configs/active";
+pub const DEFAULT_DELETE_SINK_CONFIG: &str = "/sinks/{key}/configs";
+pub const DEFAULT_DELETE_SOURCE_CONFIG: &str = "/sources/{key}/configs";
+
+/// Template operation names used as keys in the url_templates configuration
+pub const TEMPLATE_CREATE_SINK: &str = "create_sink";
+pub const TEMPLATE_CREATE_SOURCE: &str = "create_source";
+pub const TEMPLATE_GET_ACTIVE_CONFIGS: &str = "get_active_configs";
+pub const TEMPLATE_GET_ACTIVE_VERSIONS: &str = "get_active_versions";
+pub const TEMPLATE_SET_ACTIVE_SINK: &str = "set_active_sink";
+pub const TEMPLATE_SET_ACTIVE_SOURCE: &str = "set_active_source";
+pub const TEMPLATE_GET_SINK_CONFIGS: &str = "get_sink_configs";
+pub const TEMPLATE_GET_SINK_CONFIG: &str = "get_sink_config";
+pub const TEMPLATE_GET_ACTIVE_SINK_CONFIG: &str = "get_active_sink_config";
+pub const TEMPLATE_GET_SOURCE_CONFIGS: &str = "get_source_configs";
+pub const TEMPLATE_GET_SOURCE_CONFIG: &str = "get_source_config";
+pub const TEMPLATE_GET_ACTIVE_SOURCE_CONFIG: &str = "get_active_source_config";
+pub const TEMPLATE_DELETE_SINK_CONFIG: &str = "delete_sink_config";
+pub const TEMPLATE_DELETE_SOURCE_CONFIG: &str = "delete_source_config";
+
+/// URL builder for constructing HTTP endpoints from templates
+pub struct UrlBuilder {
+    base_url: String,
+    templates: HashMap<String, String>,
+}
+
+impl UrlBuilder {
+    /// Creates a new UrlBuilder with optional custom templates
+    ///
+    /// If templates are not provided, uses RESTful defaults
+    pub fn new(base_url: String, custom_templates: Option<HashMap<String, 
String>>) -> Self {
+        let mut templates = HashMap::new();
+
+        // Initialize with defaults
+        templates.insert(
+            TEMPLATE_CREATE_SINK.to_string(),
+            DEFAULT_CREATE_SINK.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_CREATE_SOURCE.to_string(),
+            DEFAULT_CREATE_SOURCE.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_GET_ACTIVE_CONFIGS.to_string(),
+            DEFAULT_GET_ACTIVE_CONFIGS.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_GET_ACTIVE_VERSIONS.to_string(),
+            DEFAULT_GET_ACTIVE_VERSIONS.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_SET_ACTIVE_SINK.to_string(),
+            DEFAULT_SET_ACTIVE_SINK.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_SET_ACTIVE_SOURCE.to_string(),
+            DEFAULT_SET_ACTIVE_SOURCE.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_GET_SINK_CONFIGS.to_string(),
+            DEFAULT_GET_SINK_CONFIGS.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_GET_SINK_CONFIG.to_string(),
+            DEFAULT_GET_SINK_CONFIG_BY_VERSION.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_GET_ACTIVE_SINK_CONFIG.to_string(),
+            DEFAULT_GET_ACTIVE_SINK_CONFIG.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_GET_SOURCE_CONFIGS.to_string(),
+            DEFAULT_GET_SOURCE_CONFIGS.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_GET_SOURCE_CONFIG.to_string(),
+            DEFAULT_GET_SOURCE_CONFIG_BY_VERSION.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_GET_ACTIVE_SOURCE_CONFIG.to_string(),
+            DEFAULT_GET_ACTIVE_SOURCE_CONFIG.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_DELETE_SINK_CONFIG.to_string(),
+            DEFAULT_DELETE_SINK_CONFIG.to_string(),
+        );
+        templates.insert(
+            TEMPLATE_DELETE_SOURCE_CONFIG.to_string(),
+            DEFAULT_DELETE_SOURCE_CONFIG.to_string(),
+        );
+
+        // Override with custom templates if provided
+        if let Some(custom) = custom_templates {
+            for (key, value) in custom {
+                templates.insert(key, value);
+            }
+        }
+
+        Self {
+            base_url,
+            templates,
+        }
+    }
+
+    /// Builds a URL for the specified operation with variable substitution
+    pub fn build(&self, operation: &str, vars: &HashMap<&str, &str>) -> String 
{
+        let template = self
+            .templates
+            .get(operation)
+            .map(|s| s.as_str())
+            .unwrap_or("");
+
+        let mut url = template.to_string();
+
+        // Replace template variables: {key}, {version}, {type}
+        for (var_name, var_value) in vars {
+            let placeholder = format!("{{{}}}", var_name);
+            url = url.replace(&placeholder, var_value);
+        }
+
+        format!("{}{}", self.base_url, url)
+    }
+
+    /// Builds a URL with query parameters appended
+    pub fn build_with_query(
+        &self,
+        operation: &str,
+        vars: &HashMap<&str, &str>,
+        query_params: &HashMap<&str, &str>,
+    ) -> String {
+        let base = self.build(operation, vars);
+
+        if query_params.is_empty() {
+            return base;
+        }
+
+        let query_string: Vec<String> = query_params
+            .iter()
+            .map(|(k, v)| format!("{}={}", k, v))
+            .collect();
+
+        format!("{}?{}", base, query_string.join("&"))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_default_templates() {
+        let builder = UrlBuilder::new("http://localhost:8080".to_string(), 
None);
+
+        let mut vars = HashMap::new();
+        vars.insert("key", "my-sink");
+
+        let url = builder.build(TEMPLATE_CREATE_SINK, &vars);
+        assert_eq!(url, "http://localhost:8080/sinks/my-sink/configs";);
+    }
+
+    #[test]
+    fn test_custom_template() {
+        let mut custom = HashMap::new();
+        custom.insert(
+            TEMPLATE_CREATE_SINK.to_string(),
+            "/api/v2/sinks/{key}/config".to_string(),
+        );
+
+        let builder = UrlBuilder::new("http://localhost:8080".to_string(), 
Some(custom));
+
+        let mut vars = HashMap::new();
+        vars.insert("key", "my-sink");
+
+        let url = builder.build(TEMPLATE_CREATE_SINK, &vars);
+        assert_eq!(url, "http://localhost:8080/api/v2/sinks/my-sink/config";);
+    }
+
+    #[test]
+    fn test_version_substitution() {
+        let builder = UrlBuilder::new("http://localhost:8080".to_string(), 
None);
+
+        let mut vars = HashMap::new();
+        vars.insert("key", "my-sink");
+        vars.insert("version", "42");
+
+        let url = builder.build(TEMPLATE_GET_SINK_CONFIG, &vars);
+        assert_eq!(url, "http://localhost:8080/sinks/my-sink/configs/42";);
+    }
+
+    #[test]
+    fn test_query_parameters() {
+        let builder = UrlBuilder::new("http://localhost:8080".to_string(), 
None);
+
+        let mut vars = HashMap::new();
+        vars.insert("key", "my-sink");
+
+        let mut query_params = HashMap::new();
+        query_params.insert("version", "5");
+
+        let url = builder.build_with_query(TEMPLATE_DELETE_SINK_CONFIG, &vars, 
&query_params);
+        assert_eq!(url, 
"http://localhost:8080/sinks/my-sink/configs?version=5";);
+    }
+
+    #[test]
+    fn test_query_based_custom_template() {
+        let mut custom = HashMap::new();
+        custom.insert(
+            TEMPLATE_GET_SINK_CONFIG.to_string(),
+            "/api?action=get&key={key}&version={version}".to_string(),
+        );
+
+        let builder = UrlBuilder::new("http://localhost:8080".to_string(), 
Some(custom));
+
+        let mut vars = HashMap::new();
+        vars.insert("key", "my-sink");
+        vars.insert("version", "10");
+
+        let url = builder.build(TEMPLATE_GET_SINK_CONFIG, &vars);
+        assert_eq!(
+            url,
+            "http://localhost:8080/api?action=get&key=my-sink&version=10";
+        );
+    }
+}
diff --git a/core/connectors/runtime/src/configs/runtime.rs 
b/core/connectors/runtime/src/configs/runtime.rs
index 1375a3a55..182f643d5 100644
--- a/core/connectors/runtime/src/configs/runtime.rs
+++ b/core/connectors/runtime/src/configs/runtime.rs
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 use crate::api::config::HttpConfig;
 use figment::providers::{Format, Toml};
 use figment::value::Dict;
 use figment::{Metadata, Profile, Provider};
 use iggy_common::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
-use iggy_common::{CustomEnvProvider, FileConfigProvider};
+use iggy_common::{CustomEnvProvider, FileConfigProvider, IggyDuration};
 use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
 use std::fmt::Formatter;
 
 #[derive(Debug, Default, Clone, Deserialize, Serialize)]
@@ -57,10 +57,33 @@ pub struct LocalConnectorsConfig {
     pub config_dir: String,
 }
 
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct HttpConnectorsConfig {
+    pub base_url: String,
+    pub custom_headers: HashMap<String, String>,
+    #[serde(default = "default_http_timeout")]
+    pub timeout: IggyDuration,
+    pub url_templates: Option<HashMap<String, String>>,
+    pub response: Option<ResponseConfig>,
+}
+
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct ResponseConfig {
+    pub data_path: Option<String>,
+    pub error_path: Option<String>,
+}
+
+fn default_http_timeout() -> IggyDuration {
+    IggyDuration::new_from_secs(10)
+}
+
 #[derive(Debug, Clone, Deserialize, Serialize)]
 #[serde(tag = "config_type", rename_all = "lowercase")]
 pub enum ConnectorsConfig {
     Local(LocalConnectorsConfig),
+    Http(HttpConnectorsConfig),
 }
 
 impl Default for ConnectorsConfig {
@@ -126,6 +149,12 @@ impl std::fmt::Display for ConnectorsConfig {
                 "{{ type: \"file\", config_dir: {:?} }}",
                 config.config_dir
             ),
+            ConnectorsConfig::Http(config) => write!(
+                f,
+                "{{ type: \"http\", endpoint: {:?}, custom_headers: {:?} }}",
+                config.base_url,
+                config.custom_headers.keys()
+            ),
         }
     }
 }
diff --git a/core/connectors/runtime/src/error.rs 
b/core/connectors/runtime/src/error.rs
index 8378e702b..b392b60b6 100644
--- a/core/connectors/runtime/src/error.rs
+++ b/core/connectors/runtime/src/error.rs
@@ -55,6 +55,8 @@ pub enum RuntimeError {
     CannotConvertConfiguration,
     #[error("IO operation failed with error: {0:?}")]
     IoError(#[from] std::io::Error),
+    #[error("HTTP request failed: {0}")]
+    HttpRequestFailed(String),
 }
 
 impl RuntimeError {
@@ -66,6 +68,7 @@ impl RuntimeError {
             RuntimeError::SourceConfigNotFound(_, _) => 
"source_config_not_found",
             RuntimeError::MissingIggyCredentials => "invalid_configuration",
             RuntimeError::InvalidConfiguration(_) => "invalid_configuration",
+            RuntimeError::HttpRequestFailed(_) => "http_request_failed",
             _ => "error",
         }
     }


Reply via email to