This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch isolate-extensions-http-request
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 5249dc5e8d1467094189415117db0520063b44df
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Mar 11 22:37:02 2026 +0100

    refactor: Isolate core-ext HTTP requests into single interface
---
 .../management/compact/AdapterGenerationSteps.java |   8 +-
 .../management/AdapterMasterManagement.java        |   9 +-
 .../management/AdapterMigrationManager.java        |  10 +-
 .../management/DescriptionManagement.java          |  12 +-
 .../management/management/GuessManagement.java     |  22 +--
 .../management/management/WorkerRestClient.java    | 126 +++++++------
 .../management/AdapterMasterManagementTest.java    |  12 +-
 .../export/resolver/AdapterResolver.java           |   5 +-
 .../health/monitoring/ExtensionHealthCheck.java    |  11 +-
 .../ExtensionInstanceAvailabilityCheck.java        |  17 +-
 .../health/monitoring/ServiceHealthCheck.java      |  12 +-
 .../ExtensionServiceOperationResult.java           |  52 ++++++
 .../extensions/ExtensionServiceRequestManager.java |  70 ++++++++
 .../streampipes/manager/assets/AssetFetcher.java   |  30 +++-
 .../execution/ExtensionServiceExecutions.java      |  75 --------
 .../HttpExtensionServiceRequestManager.java        | 197 +++++++++++++++++++++
 .../manager/execution/http/DetachHttpRequest.java  |  10 +-
 .../manager/execution/http/InvokeHttpRequest.java  |  14 +-
 .../execution/http/PipelineElementHttpRequest.java |  41 +++--
 .../manager/extensions/ExtensionItemInstaller.java |   9 +-
 .../manager/function/FunctionManager.java          |  25 +--
 .../CustomTransformOutputSchemaGenerator.java      |  23 +--
 .../migration/AbstractMigrationManager.java        |  18 +-
 .../migration/PipelineElementMigrationManager.java |   5 +-
 .../remote/ContainerProvidedOptionsHandler.java    |  19 +-
 .../manager/setup/AutoInstallation.java            |  11 +-
 .../manager/setup/ExtensionsInstallationTask.java  |   9 +-
 .../manager/setup/InstallationConfiguration.java   |  11 +-
 .../setup/PipelineElementInstallationStep.java     |   9 +-
 .../apache/streampipes/rest/ResetManagement.java   |  10 +-
 .../rest/impl/ContainerProvidedOptions.java        |   9 +-
 .../streampipes/rest/impl/ResetResource.java       |  13 +-
 .../impl/admin/ExtensionsInstallationResource.java |  11 +-
 .../rest/impl/admin/MigrationResource.java         |  19 +-
 .../rest/impl/connect/AdapterResource.java         |   6 +-
 .../rest/impl/connect/CompactAdapterResource.java  |  18 +-
 .../rest/impl/connect/DescriptionResource.java     |   5 +-
 .../rest/impl/connect/GuessResource.java           |   6 +-
 .../impl/connect/RuntimeResolvableResource.java    |   6 +-
 .../core/ExtensionServiceRequestConfiguration.java |  31 ++--
 .../streampipes/service/core/PostStartupTask.java  |  16 +-
 .../service/core/StreamPipesCoreApplication.java   |  30 +++-
 42 files changed, 736 insertions(+), 316 deletions(-)

diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/AdapterGenerationSteps.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/AdapterGenerationSteps.java
index 0156f390a8..e8aeca17d9 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/AdapterGenerationSteps.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/AdapterGenerationSteps.java
@@ -28,13 +28,19 @@ import java.util.List;
 
 public class AdapterGenerationSteps {
 
+  private final GuessManagement guessManagement;
+
+  public AdapterGenerationSteps(GuessManagement guessManagement) {
+    this.guessManagement = guessManagement;
+  }
+
   public List<AdapterModelGenerator> getGenerators() {
     return List.of(
         new AdapterBasicsGenerator(),
         new AdapterConfigGenerator(),
         new AdapterSchemaGenerator(
             new SchemaMetadataEnricher(),
-            new GuessManagement()
+            guessManagement
         )
     );
   }
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index dfda65d629..bfc294c0c3 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -53,15 +53,18 @@ public class AdapterMasterManagement {
   private final AdapterResourceManager adapterResourceManager;
 
   private final DataStreamResourceManager dataStreamResourceManager;
+  private final WorkerRestClient workerRestClient;
 
   public AdapterMasterManagement(IAdapterStorage adapterInstanceStorage,
                                  AdapterResourceManager adapterResourceManager,
                                  DataStreamResourceManager 
dataStreamResourceManager,
-                                 AdapterMetrics adapterMetrics) {
+                                 AdapterMetrics adapterMetrics,
+                                 WorkerRestClient workerRestClient) {
     this.adapterInstanceStorage = adapterInstanceStorage;
     this.adapterMetrics = adapterMetrics;
     this.adapterResourceManager = adapterResourceManager;
     this.dataStreamResourceManager = dataStreamResourceManager;
+    this.workerRestClient = workerRestClient;
   }
 
   public void addAdapter(AdapterDescription adapterDescription, String 
adapterId,
@@ -142,7 +145,7 @@ public class AdapterMasterManagement {
     AdapterDescription ad = adapterInstanceStorage.getElementById(elementId);
     try {
       try {
-        WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
+        workerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
       } catch (AdapterException e) {
         if (!forceStop) {
           throw new AdapterException("Could not stop adapter", e);
@@ -182,7 +185,7 @@ public class AdapterMasterManagement {
         adapterInstanceStorage.updateElement(ad);
 
         // Invoke adapter instance
-        WorkerRestClient.invokeStreamAdapter(baseUrl, elementId);
+        workerRestClient.invokeStreamAdapter(baseUrl, elementId);
 
         // register the adapter at the metrics manager so that the 
AdapterHealthCheck
         // can send metrics
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
index 753c78f0d3..318d4501a6 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.connect.management.management;
 
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.migration.AbstractMigrationManager;
 import org.apache.streampipes.manager.migration.IMigrationHandler;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
@@ -38,11 +39,16 @@ public class AdapterMigrationManager extends 
AbstractMigrationManager implements
 
   private final IAdapterStorage adapterStorage;
   private final IAdapterStorage adapterDescriptionStorage;
+  private final WorkerRestClient workerRestClient;
 
   public AdapterMigrationManager(IAdapterStorage adapterStorage,
-                                 IAdapterStorage adapterDescriptionStorage) {
+                                 IAdapterStorage adapterDescriptionStorage,
+                                 WorkerRestClient workerRestClient,
+                                 ExtensionServiceRequestManager 
extensionServiceRequestManager) {
+    super(extensionServiceRequestManager);
     this.adapterStorage = adapterStorage;
     this.adapterDescriptionStorage = adapterDescriptionStorage;
+    this.workerRestClient = workerRestClient;
   }
 
   @Override
@@ -96,7 +102,7 @@ public class AdapterMigrationManager extends 
AbstractMigrationManager implements
                 migrationResult.element().getElementId()
             );
             try {
-              
WorkerRestClient.stopStreamAdapter(extensionsServiceConfig.getServiceUrl(), 
adapterDescription);
+              
workerRestClient.stopStreamAdapter(extensionsServiceConfig.getServiceUrl(), 
adapterDescription);
             } catch (AdapterException e) {
               LOG.error("Stopping adapter failed: {}", 
StringUtils.join(e.getStackTrace(), "\n"));
             }
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/DescriptionManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/DescriptionManagement.java
index c52f0e1736..d06730e6a8 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/DescriptionManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/DescriptionManagement.java
@@ -29,6 +29,12 @@ import java.util.Optional;
 
 public class DescriptionManagement {
 
+  private final WorkerRestClient workerRestClient;
+
+  public DescriptionManagement(WorkerRestClient workerRestClient) {
+    this.workerRestClient = workerRestClient;
+  }
+
   public List<AdapterDescription> getAdapters() {
     IAdapterStorage adapterStorage = 
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterDescriptionStorage();
     return adapterStorage.findAll();
@@ -51,15 +57,15 @@ public class DescriptionManagement {
   }
 
   public String getAssets(String baseUrl) throws AdapterException {
-    return WorkerRestClient.getAssets(baseUrl);
+    return workerRestClient.getAssets(baseUrl);
   }
 
   public byte[] getIconAsset(String baseUrl) throws AdapterException {
-    return WorkerRestClient.getIconAsset(baseUrl);
+    return workerRestClient.getIconAsset(baseUrl);
   }
 
   public String getDocumentationAsset(String baseUrl) throws AdapterException {
-    return WorkerRestClient.getDocumentationAsset(baseUrl);
+    return workerRestClient.getDocumentationAsset(baseUrl);
   }
 
   private boolean isAdapterUsed(AdapterDescription adapter) {
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
index 62adce519e..693e6e7281 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
@@ -27,9 +27,8 @@ import 
org.apache.streampipes.connect.transformer.api.TransformationEngines;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
 import 
org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import 
org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator;
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.guess.SampleData;
 import org.apache.streampipes.model.monitoring.SpLogMessage;
@@ -40,7 +39,6 @@ import 
org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.http.HttpStatus;
-import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,11 +50,14 @@ import java.util.Map;
 public class GuessManagement {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GuessManagement.class);
+  private final ExtensionServiceRequestManager extensionRequestManager;
   private final IExtensionsServiceEndpointGenerator endpointGenerator;
   private final ObjectMapper objectMapper;
 
-  public GuessManagement() {
-    this.endpointGenerator = new ExtensionsServiceEndpointGenerator();
+  public GuessManagement(IExtensionsServiceEndpointGenerator endpointGenerator,
+                         ExtensionServiceRequestManager 
extensionRequestManager) {
+    this.endpointGenerator = endpointGenerator;
+    this.extensionRequestManager = extensionRequestManager;
     this.objectMapper = JacksonSerializer.getObjectMapper();
   }
 
@@ -83,15 +84,10 @@ public class GuessManagement {
 
     LOG.debug("Calling get get sample data at: {}", workerUrl);
 
-    var httpResponse = ExtensionServiceExecutions
-        .extServicePostRequest(workerUrl, adapterDescriptionString)
-        .execute()
-        .returnResponse();
+    var response = extensionRequestManager.requestSampleData(workerUrl, 
adapterDescriptionString);
+    var responseString = response.responseBody();
 
-    var responseString = EntityUtils.toString(httpResponse.getEntity());
-
-    if (httpResponse.getStatusLine()
-                    .getStatusCode() == HttpStatus.SC_OK) {
+    if (response.statusCode() == HttpStatus.SC_OK) {
       return objectMapper.readValue(responseString, SampleData.class);
     } else {
       var exception = objectMapper.readValue(responseString, 
SpLogMessage.class);
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java
index 79f204aa6b..5294d5a640 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java
@@ -22,7 +22,8 @@ package org.apache.streampipes.connect.management.management;
 import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.connect.management.util.WorkerPaths;
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
@@ -34,15 +35,11 @@ import 
org.apache.streampipes.storage.couchdb.impl.connect.AdapterInstanceStorag
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
-import org.apache.http.client.fluent.Request;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 /**
@@ -51,9 +48,14 @@ import java.util.List;
 public class WorkerRestClient {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(WorkerRestClient.class);
+  private final ExtensionServiceRequestManager requestManager;
 
-  public static void invokeStreamAdapter(String baseUrl,
-                                         String elementId) throws 
AdapterException {
+  public WorkerRestClient(ExtensionServiceRequestManager requestManager) {
+    this.requestManager = requestManager;
+  }
+
+  public void invokeStreamAdapter(String baseUrl,
+                                  String elementId) throws AdapterException {
     var adapterStreamDescription = getAndDecryptAdapter(elementId);
     var url = baseUrl + WorkerPaths.getStreamInvokePath();
 
@@ -61,8 +63,8 @@ public class WorkerRestClient {
     updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true);
   }
 
-  public static void stopStreamAdapter(String baseUrl,
-                                       AdapterDescription 
adapterStreamDescription) throws AdapterException {
+  public void stopStreamAdapter(String baseUrl,
+                                AdapterDescription adapterStreamDescription) 
throws AdapterException {
     String url = baseUrl + WorkerPaths.getStreamStopPath();
 
     var ad = getAdapterDescriptionById(new AdapterInstanceStorageImpl(), 
adapterStreamDescription.getElementId());
@@ -71,11 +73,9 @@ public class WorkerRestClient {
     updateStreamAdapterStatus(adapterStreamDescription.getElementId(), false);
   }
 
-  public static List<AdapterDescription> 
getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException {
+  public List<AdapterDescription> 
getAllRunningAdapterInstanceDescriptions(String url) throws AdapterException {
     try {
-      var responseString = ExtensionServiceExecutions
-              .extServiceGetRequest(url)
-              .execute().returnContent().asString();
+      var responseString = 
requestManager.requestRunningAdapters(url).responseBody();
 
       return JacksonSerializer.getObjectMapper().readValue(responseString, 
List.class);
     } catch (IOException e) {
@@ -83,30 +83,31 @@ public class WorkerRestClient {
     }
   }
 
-  private static void startAdapter(String url,
-                                   AdapterDescription ad) throws 
AdapterException {
+  private void startAdapter(String url,
+                            AdapterDescription ad) throws AdapterException {
     LOG.debug("Trying to start adapter on endpoint {} ", url);
     triggerAdapterStateChange(ad, url, "started");
   }
 
 
-  private static void stopAdapter(AdapterDescription ad,
-                                  String url) throws AdapterException {
+  private void stopAdapter(AdapterDescription ad,
+                           String url) throws AdapterException {
 
     LOG.debug("Trying to stop adapter on endpoint {} ", url);
     triggerAdapterStateChange(ad, url, "stopped");
   }
 
-  private static void triggerAdapterStateChange(AdapterDescription ad,
-                                                String url,
-                                                String action) throws 
AdapterException {
+  private void triggerAdapterStateChange(AdapterDescription ad,
+                                         String url,
+                                         String action) throws 
AdapterException {
     try {
       String adapterDescription = 
JacksonSerializer.getObjectMapper().writeValueAsString(ad);
 
-      var response = triggerPost(url, 
ad.getCorrespondingDataStreamElementId(), adapterDescription);
-      var responseString = getResponseBody(response);
+      var response =
+          triggerPost(url, ad.getCorrespondingDataStreamElementId(), 
adapterDescription);
+      var responseString = response.responseBody();
 
-      if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+      if (response.statusCode() != HttpStatus.SC_OK) {
         var exception = getSerializer().readValue(responseString, 
AdapterException.class);
         throw new AdapterException(exception.getMessage(), 
exception.getCause());
       }
@@ -116,32 +117,24 @@ public class WorkerRestClient {
     }
   }
 
-  private static String getResponseBody(HttpResponse response) throws 
IOException {
-    return IOUtils.toString(response.getEntity().getContent(), 
StandardCharsets.UTF_8);
+  private ExtensionServiceOperationResult triggerPost(String url,
+                                                      String elementId,
+                                                      String payload) throws 
IOException {
+    return requestManager.requestAdapterStateChange(url, elementId, payload);
   }
 
-  private static HttpResponse triggerPost(String url,
-                                          String elementId,
-                                          String payload) throws IOException {
-    var request = ExtensionServiceExecutions.extServicePostRequest(url, 
elementId, payload);
-    return request.execute().returnResponse();
-  }
-
-  public static RuntimeOptionsResponse getConfiguration(String baseUrl,
-                                                        String appId,
-                                                        RuntimeOptionsRequest 
runtimeOptionsRequest)
-          throws AdapterException, SpConfigurationException {
+  public RuntimeOptionsResponse getConfiguration(String baseUrl,
+                                                 String appId,
+                                                 RuntimeOptionsRequest 
runtimeOptionsRequest)
+      throws AdapterException, SpConfigurationException {
     String url = baseUrl + WorkerPaths.getRuntimeResolvablePath(appId);
 
     try {
       String payload = 
JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest);
-      var response = ExtensionServiceExecutions.extServicePostRequest(url, 
payload)
-              .execute()
-              .returnResponse();
-
-      String responseString = 
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+      var response = requestManager.requestRuntimeOptions(url, payload);
+      String responseString = response.responseBody();
 
-      if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+      if (response.statusCode() == HttpStatus.SC_OK) {
         return getSerializer().readValue(responseString, 
RuntimeOptionsResponse.class);
       } else {
         var exception = getSerializer().readValue(responseString, 
SpConfigurationException.class);
@@ -152,15 +145,17 @@ public class WorkerRestClient {
     }
   }
 
-  public static String getAssets(String workerPath) throws AdapterException {
+  public String getAssets(String workerPath) throws AdapterException {
     String url = workerPath + "/assets";
     LOG.info("Trying to Assets from endpoint: " + url);
 
     try {
-      return Request.Get(url)
-              .connectTimeout(1000)
-              .socketTimeout(100000)
-              .execute().returnContent().asString();
+      var response = requestManager.requestAdapterAssets(url);
+
+      if (!response.isSuccess()) {
+        throw new AdapterException("Could not get assets endpoint: " + url);
+      }
+      return response.responseBody();
     } catch (IOException e) {
       LOG.error(e.getMessage());
       throw new AdapterException("Could not get assets endpoint: " + url);
@@ -168,28 +163,30 @@ public class WorkerRestClient {
 
   }
 
-  public static byte[] getIconAsset(String baseUrl) throws AdapterException {
+  public byte[] getIconAsset(String baseUrl) throws AdapterException {
     String url = baseUrl + "/assets/icon";
 
     try {
-      return Request.Get(url)
-              .connectTimeout(1000)
-              .socketTimeout(100000)
-              .execute().returnContent().asBytes();
+      var response = requestManager.requestAdapterIconAsset(url);
+      if (!response.isSuccess()) {
+        throw new AdapterException("Could not get icon endpoint: " + url);
+      }
+      return response.responseBytes();
     } catch (IOException e) {
       LOG.error(e.getMessage());
       throw new AdapterException("Could not get icon endpoint: " + url);
     }
   }
 
-  public static String getDocumentationAsset(String baseUrl) throws 
AdapterException {
+  public String getDocumentationAsset(String baseUrl) throws AdapterException {
     String url = baseUrl + "/assets/documentation";
 
     try {
-      return Request.Get(url)
-              .connectTimeout(1000)
-              .socketTimeout(100000)
-              .execute().returnContent().asString();
+      var response = requestManager.requestAdapterDocumentationAsset(url);
+      if (!response.isSuccess()) {
+        throw new AdapterException("Could not get documentation endpoint: " + 
url);
+      }
+      return response.responseBody();
     } catch (IOException e) {
       LOG.error(e.getMessage());
       throw new AdapterException("Could not get documentation endpoint: " + 
url);
@@ -197,7 +194,7 @@ public class WorkerRestClient {
   }
 
 
-  private static AdapterDescription 
getAdapterDescriptionById(AdapterInstanceStorageImpl adapterStorage, String id) 
{
+  private AdapterDescription 
getAdapterDescriptionById(AdapterInstanceStorageImpl adapterStorage, String id) 
{
     AdapterDescription adapterDescription = null;
     List<AdapterDescription> allAdapters = adapterStorage.findAll();
     for (AdapterDescription a : allAdapters) {
@@ -209,31 +206,30 @@ public class WorkerRestClient {
     return adapterDescription;
   }
 
-  private static void updateStreamAdapterStatus(String adapterId,
-                                                boolean running) {
+  private void updateStreamAdapterStatus(String adapterId,
+                                         boolean running) {
     var adapter = getAndDecryptAdapter(adapterId);
     adapter.setRunning(running);
     encryptAndUpdateAdapter(adapter);
   }
 
-  private static void encryptAndUpdateAdapter(AdapterDescription adapter) {
+  private void encryptAndUpdateAdapter(AdapterDescription adapter) {
     AdapterDescription encryptedDescription = new 
Cloner().adapterDescription(adapter);
     SecretProvider.getEncryptionService().apply(encryptedDescription);
     getAdapterStorage().updateElement(encryptedDescription);
   }
 
-  private static AdapterDescription getAndDecryptAdapter(String adapterId) {
+  private AdapterDescription getAndDecryptAdapter(String adapterId) {
     AdapterDescription adapter = getAdapterStorage().getElementById(adapterId);
     SecretProvider.getDecryptionService().apply(adapter);
     return adapter;
   }
 
-  private static IAdapterStorage getAdapterStorage() {
+  private IAdapterStorage getAdapterStorage() {
     return 
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage();
   }
 
-  private static ObjectMapper getSerializer() {
+  private ObjectMapper getSerializer() {
     return JacksonSerializer.getObjectMapper();
   }
 }
-
diff --git 
a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/management/AdapterMasterManagementTest.java
 
b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/management/AdapterMasterManagementTest.java
index 2d8b70e426..fb28632b6d 100644
--- 
a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/management/AdapterMasterManagementTest.java
+++ 
b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/management/AdapterMasterManagementTest.java
@@ -39,6 +39,7 @@ public class AdapterMasterManagementTest {
   public void getAdapter_FailNull() {
     var adapterStorage = mock(AdapterInstanceStorageImpl.class);
     var resourceManager = mock(AdapterResourceManager.class);
+    var workerRestClient = mock(WorkerRestClient.class);
     when(adapterStorage.findAll()).thenReturn(null);
 
     var adapterMasterManagement =
@@ -46,7 +47,8 @@ public class AdapterMasterManagementTest {
             adapterStorage,
             resourceManager,
             null,
-            AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+            AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+            workerRestClient
         );
 
     assertThrows(AdapterException.class, () -> 
adapterMasterManagement.getAdapter("id2"));
@@ -57,6 +59,7 @@ public class AdapterMasterManagementTest {
     var adapterDescriptions = List.of(new AdapterDescription());
     var adapterStorage = mock(AdapterInstanceStorageImpl.class);
     var resourceManager = mock(AdapterResourceManager.class);
+    var workerRestClient = mock(WorkerRestClient.class);
     when(adapterStorage.findAll()).thenReturn(adapterDescriptions);
 
     var adapterMasterManagement =
@@ -64,7 +67,8 @@ public class AdapterMasterManagementTest {
             adapterStorage,
             resourceManager,
             null,
-            AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+            AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+            workerRestClient
         );
 
     assertThrows(AdapterException.class, () -> 
adapterMasterManagement.getAdapter("id2"));
@@ -75,6 +79,7 @@ public class AdapterMasterManagementTest {
     var adapterDescriptions = List.of(new AdapterDescription());
     var adapterStorage = mock(AdapterInstanceStorageImpl.class);
     var resourceManager = mock(AdapterResourceManager.class);
+    var workerRestClient = mock(WorkerRestClient.class);
     when(adapterStorage.findAll()).thenReturn(adapterDescriptions);
 
     AdapterMasterManagement adapterMasterManagement =
@@ -82,7 +87,8 @@ public class AdapterMasterManagementTest {
             adapterStorage,
             resourceManager,
             null,
-            AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+            AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+            workerRestClient
         );
 
     List<AdapterDescription> result = 
adapterMasterManagement.getAllAdapterInstances();
diff --git 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
index a8f902635b..6e3d153e84 100644
--- 
a/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
+++ 
b/streampipes-data-export/src/main/java/org/apache/streampipes/export/resolver/AdapterResolver.java
@@ -22,6 +22,8 @@ package org.apache.streampipes.export.resolver;
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
+import 
org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.export.AssetExportConfiguration;
 import org.apache.streampipes.model.export.ExportItem;
@@ -89,7 +91,8 @@ public class AdapterResolver extends 
AbstractResolver<AdapterDescription> {
               getNoSqlStore().getAdapterInstanceStorage(),
               new SpResourceManager().manageAdapters(),
               new SpResourceManager().manageDataStreams(),
-              AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+              AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+              new WorkerRestClient(new HttpExtensionServiceRequestManager())
           ).stopStreamAdapter(resourceId, true);
         } catch (AdapterException e) {
           LOG.warn("Error when stopping adapter with id {} and name {}", 
resourceId, existingAdapter.getName());
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
index 256c1cc57b..744e080809 100644
--- 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionHealthCheck.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.health.monitoring;
 
 import org.apache.streampipes.health.monitoring.model.HealthCheckData;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.model.health.ExtensionInstanceHealth;
 
 import org.slf4j.Logger;
@@ -31,9 +32,12 @@ public class ExtensionHealthCheck implements Runnable {
   private static final Logger LOG = 
LoggerFactory.getLogger(ExtensionHealthCheck.class);
 
   private final ResourceProvider resourceProvider;
+  private final ExtensionServiceRequestManager extensionRequestManager;
 
-  public ExtensionHealthCheck(ResourceProvider resourceProvider) {
+  public ExtensionHealthCheck(ResourceProvider resourceProvider,
+                              ExtensionServiceRequestManager 
extensionRequestManager) {
     this.resourceProvider = resourceProvider;
+    this.extensionRequestManager = extensionRequestManager;
   }
 
   @Override
@@ -44,7 +48,10 @@ public class ExtensionHealthCheck implements Runnable {
 
       var activeExtensionInstances = new HashMap<String, 
ExtensionInstanceHealth>();
       activeCoreInstances.keySet().forEach(k -> {
-        activeExtensionInstances.put(k, new 
ExtensionInstanceAvailabilityCheck(k).checkRunningInstances());
+        activeExtensionInstances.put(
+            k,
+            new ExtensionInstanceAvailabilityCheck(k, 
extensionRequestManager).checkRunningInstances()
+        );
       });
 
       var healthCheckData = new HealthCheckData(resourceProvider, 
activeResources, activeCoreInstances, activeExtensionInstances);
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
index 46236d90da..9a5cf8dc1b 100644
--- 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ExtensionInstanceAvailabilityCheck.java
@@ -17,17 +17,15 @@
  */
 package org.apache.streampipes.health.monitoring;
 
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.model.health.ExtensionInstanceHealth;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.Set;
 
 public class ExtensionInstanceAvailabilityCheck {
@@ -36,20 +34,21 @@ public class ExtensionInstanceAvailabilityCheck {
   private static final String InstancePath = "/health";
 
   private final String serviceBaseUrl;
+  private final ExtensionServiceRequestManager extensionRequestManager;
 
-  public ExtensionInstanceAvailabilityCheck(String serviceBaseUrl) {
+  public ExtensionInstanceAvailabilityCheck(String serviceBaseUrl,
+                                            ExtensionServiceRequestManager 
extensionRequestManager) {
     this.serviceBaseUrl = serviceBaseUrl;
+    this.extensionRequestManager = extensionRequestManager;
   }
 
   public ExtensionInstanceHealth checkRunningInstances() {
     try {
-      var request = 
ExtensionServiceExecutions.extServiceGetRequest(makeRequestUrl());
-      var response = request.execute().returnResponse();
-      if (response.getStatusLine().getStatusCode() != 200) {
+      var response = 
extensionRequestManager.requestExtensionInstanceHealth(makeRequestUrl());
+      if (response.statusCode() != 200) {
         return new ExtensionInstanceHealth(Set.of(), Set.of());
       }
-      String body = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
-      return deserialize(body);
+      return deserialize(response.responseBody());
 
     } catch (IOException e) {
       LOG.error("Extension service {} is unavailable", serviceBaseUrl);
diff --git 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
index 549066f3c8..ecd07082e2 100644
--- 
a/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
+++ 
b/streampipes-health-monitoring/src/main/java/org/apache/streampipes/health/monitoring/ServiceHealthCheck.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.health.monitoring;
 import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.loadbalance.LoadManager;
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus;
 import org.apache.streampipes.storage.api.system.IExtensionsServiceStorage;
@@ -37,13 +37,16 @@ import java.util.List;
 public class ServiceHealthCheck implements Runnable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ServiceHealthCheck.class);
+  private final ExtensionServiceRequestManager extensionRequestManager;
 
   private final ServiceRegistrationManager serviceRegistrationManager;
   private final int maxUnhealthyDurationBeforeRemovalMs;
 
   private final List<SpServiceRegistration> needDeletedServices = new 
ArrayList<>();
 
-  public ServiceHealthCheck(IExtensionsServiceStorage storage) {
+  public ServiceHealthCheck(IExtensionsServiceStorage storage,
+                            ExtensionServiceRequestManager 
extensionRequestManager) {
+    this.extensionRequestManager = extensionRequestManager;
     this.serviceRegistrationManager = new ServiceRegistrationManager(storage);
     this.maxUnhealthyDurationBeforeRemovalMs = Environments.getEnvironment()
         .getUnhealthyTimeBeforeServiceDeletionInMillis().getValueOrDefault();
@@ -71,9 +74,8 @@ public class ServiceHealthCheck implements Runnable {
     String healthCheckUrl = makeHealthCheckUrl(service);
 
     try {
-      var request = 
ExtensionServiceExecutions.extServiceGetRequest(healthCheckUrl);
-      var response = request.execute();
-      if (response.returnResponse().getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
+      var response = 
extensionRequestManager.requestServiceHealth(healthCheckUrl);
+      if (response.statusCode() != HttpStatus.SC_OK) {
         processUnhealthyService(service);
       } else {
         if (service.getStatus() == SpServiceStatus.UNHEALTHY) {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationResult.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationResult.java
new file mode 100644
index 0000000000..1c0308ab9f
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationResult.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.api.extensions;
+
+import java.nio.charset.StandardCharsets;
+
+public class ExtensionServiceOperationResult {
+
+  private final int statusCode;
+  private final byte[] responseBody;
+
+  public ExtensionServiceOperationResult(int statusCode, String responseBody) {
+    this(statusCode, responseBody == null ? null : 
responseBody.getBytes(StandardCharsets.UTF_8));
+  }
+
+  public ExtensionServiceOperationResult(int statusCode, byte[] responseBody) {
+    this.statusCode = statusCode;
+    this.responseBody = responseBody;
+  }
+
+  public int statusCode() {
+    return statusCode;
+  }
+
+  public String responseBody() {
+    return responseBody == null ? null : new String(responseBody, 
StandardCharsets.UTF_8);
+  }
+
+  public byte[] responseBytes() {
+    return responseBody;
+  }
+
+  public boolean isSuccess() {
+    return statusCode >= 200 && statusCode < 300;
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestManager.java
new file mode 100644
index 0000000000..160fe0a4d9
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.api.extensions;
+
+import java.io.IOException;
+
+public interface ExtensionServiceRequestManager {
+
+  ExtensionServiceOperationResult requestContainerProvidedOptions(String url,
+                                                                  String 
payload) throws IOException;
+
+  ExtensionServiceOperationResult requestMigration(String url,
+                                                   String payload) throws 
IOException;
+
+  ExtensionServiceOperationResult requestDescriptionUpdate(String requestUrl) 
throws IOException;
+
+  ExtensionServiceOperationResult requestExtensionDescription(String 
descriptionUrl) throws IOException;
+
+  ExtensionServiceOperationResult requestFunctionStop(String endpoint) throws 
IOException;
+
+  ExtensionServiceOperationResult requestRunningAdapters(String url) throws 
IOException;
+
+  ExtensionServiceOperationResult requestAdapterStateChange(String url,
+                                                            String elementId,
+                                                            String payload) 
throws IOException;
+
+  ExtensionServiceOperationResult requestRuntimeOptions(String url,
+                                                        String payload) throws 
IOException;
+
+  ExtensionServiceOperationResult requestSampleData(String workerUrl,
+                                                    String payload) throws 
IOException;
+
+  ExtensionServiceOperationResult requestExtensionInstanceHealth(String url) 
throws IOException;
+
+  ExtensionServiceOperationResult requestServiceHealth(String url) throws 
IOException;
+
+  ExtensionServiceOperationResult requestPipelineElementInvocation(String url,
+                                                                   String 
pipelineId,
+                                                                   String 
payload) throws IOException;
+
+  ExtensionServiceOperationResult requestPipelineElementDetach(String url,
+                                                               String 
pipelineId) throws IOException;
+
+  ExtensionServiceOperationResult requestPipelineElementAssets(String url) 
throws IOException;
+
+  ExtensionServiceOperationResult requestAdapterAssets(String url) throws 
IOException;
+
+  ExtensionServiceOperationResult requestAdapterIconAsset(String url) throws 
IOException;
+
+  ExtensionServiceOperationResult requestAdapterDocumentationAsset(String url) 
throws IOException;
+
+  ExtensionServiceOperationResult requestOutputSchema(String url,
+                                                      String payload) throws 
IOException;
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
index 25c3c9accf..f42047b461 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java
@@ -18,11 +18,12 @@
 package org.apache.streampipes.manager.assets;
 
 import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
+import 
org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager;
 import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
-import org.apache.http.client.fluent.Request;
-
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -30,22 +31,33 @@ public class AssetFetcher {
 
   private static final String ASSET_ENDPOINT_APPENDIX = "/assets";
 
-  private SpServiceUrlProvider spServiceUrlProvider;
-  private String appId;
+  private final SpServiceUrlProvider spServiceUrlProvider;
+  private final String appId;
+  private final ExtensionServiceRequestManager requestManager;
 
   public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider,
                       String appId) {
+    this(spServiceUrlProvider, appId, new 
HttpExtensionServiceRequestManager());
+  }
+
+  public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider,
+                      String appId,
+                      ExtensionServiceRequestManager requestManager) {
     this.spServiceUrlProvider = spServiceUrlProvider;
     this.appId = appId;
+    this.requestManager = requestManager;
   }
 
   public InputStream fetchPipelineElementAssets() throws IOException, 
NoServiceEndpointsAvailableException {
     String endpointUrl = new 
ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(appId, 
spServiceUrlProvider);
-    return Request
-        .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX)
-        .execute()
-        .returnContent()
-        .asStream();
+    var response = requestManager.requestPipelineElementAssets(endpointUrl + 
ASSET_ENDPOINT_APPENDIX);
+
+    if (!response.isSuccess()) {
+      throw new IOException("Could not fetch pipeline element assets from " + 
endpointUrl);
+    }
+
+    var responseBytes = response.responseBytes();
+    return new ByteArrayInputStream(responseBytes == null ? new byte[0] : 
responseBytes);
 
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
deleted file mode 100644
index 2164b2776c..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution;
-
-import org.apache.streampipes.manager.util.AuthTokenUtils;
-import org.apache.streampipes.resource.management.SpResourceManager;
-
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-
-public class ExtensionServiceExecutions {
-
-  public static Request extServiceGetRequest(String url) {
-    return Request
-        .Get(url)
-        .addHeader("Authorization", 
AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()))
-        .addHeader("Accept", "application/json")
-        .connectTimeout(10000)
-        .socketTimeout(10000);
-  }
-
-  public static Request extServicePostRequestAsServiceAdmin(String url) {
-    return Request
-        .Post(url)
-        .addHeader("Authorization", 
AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()))
-        .addHeader("Accept", "application/json")
-        .connectTimeout(10000)
-        .socketTimeout(10000);
-  }
-
-  private static String getServiceAdminSid() {
-    return new 
SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId();
-  }
-
-  public static Request extServicePostRequest(String url,
-                                              String payload) {
-    return authenticatedPostRequest(url, 
AuthTokenUtils.getAuthTokenForCurrentUser(), payload);
-  }
-
-  public static Request extServicePostRequest(String url,
-                                             String elementId,
-                                             String payload) {
-    return authenticatedPostRequest(
-        url,
-        AuthTokenUtils.getAuthToken(elementId),
-        payload
-    );
-  }
-
-  private static Request authenticatedPostRequest(String url,
-                                                  String token,
-                                                  String payload) {
-    return Request.Post(url)
-        .addHeader("Authorization", token)
-        .bodyString(payload, ContentType.APPLICATION_JSON)
-        .connectTimeout(1000)
-        .socketTimeout(100000);
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/HttpExtensionServiceRequestManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/HttpExtensionServiceRequestManager.java
new file mode 100644
index 0000000000..13deb5d102
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/HttpExtensionServiceRequestManager.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.execution;
+
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
+import org.apache.streampipes.manager.util.AuthTokenUtils;
+import org.apache.streampipes.resource.management.SpResourceManager;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+
+public class HttpExtensionServiceRequestManager implements 
ExtensionServiceRequestManager {
+
+  @Override
+  public ExtensionServiceOperationResult 
requestContainerProvidedOptions(String url,
+                                                                         
String payload) throws IOException {
+    return post(url, AuthTokenUtils.getAuthTokenForCurrentUser(), payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestMigration(String url,
+                                                          String payload) 
throws IOException {
+    return post(url, AuthTokenUtils.getAuthTokenForCurrentUser(), payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestDescriptionUpdate(String 
requestUrl) throws IOException {
+    return get(requestUrl, 
AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()));
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestExtensionDescription(String 
descriptionUrl) throws IOException {
+    return get(descriptionUrl, 
AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()));
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestFunctionStop(String endpoint) 
throws IOException {
+    return post(endpoint, 
AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()), null);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestRunningAdapters(String url) 
throws IOException {
+    return get(url, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()));
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestAdapterStateChange(String url,
+                                                                   String 
elementId,
+                                                                   String 
payload) throws IOException {
+    return post(url, AuthTokenUtils.getAuthToken(elementId), payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestRuntimeOptions(String url,
+                                                               String payload) 
throws IOException {
+    return post(url, AuthTokenUtils.getAuthTokenForCurrentUser(), payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestSampleData(String workerUrl,
+                                                           String payload) 
throws IOException {
+    return post(workerUrl, AuthTokenUtils.getAuthTokenForCurrentUser(), 
payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestExtensionInstanceHealth(String 
url) throws IOException {
+    return get(url, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()));
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestServiceHealth(String url) 
throws IOException {
+    return get(url, AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()));
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestPipelineElementInvocation(String url,
+                                                                          
String pipelineId,
+                                                                          
String payload) throws IOException {
+    return post(url, AuthTokenUtils.getAuthToken(pipelineId), payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestPipelineElementDetach(String 
url,
+                                                                      String 
pipelineId) throws IOException {
+    return delete(url, AuthTokenUtils.getAuthToken(pipelineId));
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestPipelineElementAssets(String 
url) throws IOException {
+    return get(url, null);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestAdapterAssets(String url) 
throws IOException {
+    return get(url, null);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestAdapterIconAsset(String url) 
throws IOException {
+    return get(url, null);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestAdapterDocumentationAsset(String url) throws IOException {
+    return get(url, null);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult requestOutputSchema(String url,
+                                                             String payload) 
throws IOException {
+    return post(url, null, payload);
+  }
+
+  private ExtensionServiceOperationResult get(String url,
+                                              String token) throws IOException 
{
+    var request = Request
+        .Get(url)
+        .addHeader("Accept", "application/json")
+        .connectTimeout(10000)
+        .socketTimeout(10000);
+
+    var response = addAuthorizationHeader(request, token)
+        .execute()
+        .returnResponse();
+
+    return new ExtensionServiceOperationResult(
+        response.getStatusLine().getStatusCode(),
+        response.getEntity() == null ? null : 
EntityUtils.toByteArray(response.getEntity())
+    );
+  }
+
+  private ExtensionServiceOperationResult post(String url,
+                                               String token,
+                                               String payload) throws 
IOException {
+    var request = Request
+        .Post(url)
+        .addHeader("Accept", "application/json");
+
+    if (payload != null) {
+      request = request.bodyString(payload, ContentType.APPLICATION_JSON);
+    }
+
+    var response = addAuthorizationHeader(request, token)
+        .connectTimeout(payload == null ? 10000 : 1000)
+        .socketTimeout(payload == null ? 10000 : 100000)
+        .execute()
+        .returnResponse();
+
+    return new ExtensionServiceOperationResult(
+        response.getStatusLine().getStatusCode(),
+        response.getEntity() == null ? null : 
EntityUtils.toByteArray(response.getEntity())
+    );
+  }
+
+  private ExtensionServiceOperationResult delete(String url,
+                                                 String token) throws 
IOException {
+    var response = addAuthorizationHeader(Request.Delete(url), token)
+        .addHeader("Accept", "application/json")
+        .connectTimeout(10000)
+        .socketTimeout(10000)
+        .execute()
+        .returnResponse();
+
+    return new ExtensionServiceOperationResult(
+        response.getStatusLine().getStatusCode(),
+        response.getEntity() == null ? null : 
EntityUtils.toByteArray(response.getEntity())
+    );
+  }
+
+  private Request addAuthorizationHeader(Request request, String token) {
+    return token == null ? request : request.addHeader("Authorization", token);
+  }
+
+  private String getServiceAdminSid() {
+    return new 
SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId();
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
index ac9b73705a..4a87e265e4 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
@@ -18,21 +18,25 @@
 
 package org.apache.streampipes.manager.execution.http;
 
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult;
 import org.apache.streampipes.model.api.EndpointSelectable;
 
-import org.apache.http.client.fluent.Request;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 
 public class DetachHttpRequest extends PipelineElementHttpRequest {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DetachHttpRequest.class);
 
   @Override
-  protected Request initRequest(EndpointSelectable pipelineElement, String 
endpointUrl) {
+  protected ExtensionServiceOperationResult performRequest(EndpointSelectable 
pipelineElement,
+                                                           String endpointUrl,
+                                                           String pipelineId) 
throws IOException {
     LOG.info("Detaching element: " + endpointUrl);
-    return Request.Delete(endpointUrl);
+    return requestManager().requestPipelineElementDetach(endpointUrl, 
pipelineId);
   }
 
   @Override
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java
index 1656e677b0..88e2db8066 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/InvokeHttpRequest.java
@@ -18,26 +18,26 @@
 
 package org.apache.streampipes.manager.execution.http;
 
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult;
 import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class InvokeHttpRequest extends PipelineElementHttpRequest {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InvokeHttpRequest.class);
 
   @Override
-  protected Request initRequest(EndpointSelectable pipelineElement,
-                             String endpointUrl) throws 
JsonProcessingException {
+  protected ExtensionServiceOperationResult performRequest(EndpointSelectable 
pipelineElement,
+                                                           String endpointUrl,
+                                                           String pipelineId) 
throws IOException {
     LOG.info("Invoking element: " + endpointUrl);
-    return Request
-        .Post(endpointUrl)
-        .bodyString(toJson(pipelineElement), ContentType.APPLICATION_JSON);
+    return requestManager().requestPipelineElementInvocation(endpointUrl, 
pipelineId, toJson(pipelineElement));
   }
 
   @Override
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java
index 400626e54d..0e2229a62f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineElementHttpRequest.java
@@ -18,46 +18,57 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.apache.streampipes.manager.util.AuthTokenUtils;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
+import 
org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager;
 import org.apache.streampipes.model.api.EndpointSelectable;
 import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.gson.JsonSyntaxException;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
 
 import java.io.IOException;
 
 public abstract class PipelineElementHttpRequest {
 
+  private final ExtensionServiceRequestManager requestManager;
+
+  public PipelineElementHttpRequest() {
+    this(new HttpExtensionServiceRequestManager());
+  }
+
+  public PipelineElementHttpRequest(ExtensionServiceRequestManager 
requestManager) {
+    this.requestManager = requestManager;
+  }
+
   public PipelineElementStatus execute(EndpointSelectable pipelineElement,
                                        String endpointUrl,
                                        String pipelineId) {
     try {
-      Response httpResp = initRequest(pipelineElement, endpointUrl)
-              .addHeader("Authorization", 
AuthTokenUtils.getAuthToken(pipelineId))
-              .connectTimeout(10000)
-              .execute();
-      return handleResponse(httpResp, pipelineElement, endpointUrl);
+      ExtensionServiceOperationResult response = 
performRequest(pipelineElement, endpointUrl, pipelineId);
+      return handleResponse(response, pipelineElement, endpointUrl);
     } catch (Exception e) {
       logError(endpointUrl, pipelineElement.getName(), e.getMessage());
       return new PipelineElementStatus(endpointUrl, pipelineElement.getName(), 
false, e.getMessage());
     }
   }
 
-  protected abstract Request initRequest(EndpointSelectable pipelineElement,
-                                      String endpointUrl) throws 
JsonProcessingException;
+  protected abstract ExtensionServiceOperationResult 
performRequest(EndpointSelectable pipelineElement,
+                                                                    String 
endpointUrl,
+                                                                    String 
pipelineId) throws IOException;
 
   protected abstract void logError(String endpointUrl,
                                 String pipelineElementName,
                                 String exceptionMessage);
 
-  protected PipelineElementStatus handleResponse(Response httpResp,
+  protected PipelineElementStatus 
handleResponse(ExtensionServiceOperationResult response,
                                                  EndpointSelectable 
pipelineElement,
                                                  String endpointUrl) throws 
JsonSyntaxException, IOException {
-    String resp = httpResp.returnContent().asString();
+    if (!response.isSuccess()) {
+      throw new IOException("Request failed with status code " + 
response.statusCode());
+    }
+
+    String resp = response.responseBody();
     org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer
         .getObjectMapper()
         .readValue(resp, org.apache.streampipes.model.Response.class);
@@ -70,4 +81,8 @@ public abstract class PipelineElementHttpRequest {
     return new PipelineElementStatus(endpointUrl, pipelineElementName, 
response.isSuccess(),
         response.getOptionalMessage());
   }
+
+  protected ExtensionServiceRequestManager requestManager() {
+    return requestManager;
+  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
index 16fc95a029..109529cd31 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
@@ -18,8 +18,8 @@
 package org.apache.streampipes.manager.extensions;
 
 import org.apache.streampipes.commons.exceptions.SepaParseException;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import 
org.apache.streampipes.manager.api.extensions.IExtensionsResourceUrlProvider;
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
 import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import 
org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest;
 import org.apache.streampipes.model.message.Message;
@@ -28,9 +28,12 @@ import java.io.IOException;
 
 public class ExtensionItemInstaller {
 
+  private final ExtensionServiceRequestManager extensionRequestManager;
   private final IExtensionsResourceUrlProvider urlProvider;
 
-  public ExtensionItemInstaller(IExtensionsResourceUrlProvider urlProvider) {
+  public ExtensionItemInstaller(IExtensionsResourceUrlProvider urlProvider,
+                                ExtensionServiceRequestManager 
extensionRequestManager) {
+    this.extensionRequestManager = extensionRequestManager;
     this.urlProvider = urlProvider;
   }
 
@@ -52,6 +55,6 @@ public class ExtensionItemInstaller {
   }
 
   private String fetchDescription(String descriptionUrl) throws IOException {
-    return 
ExtensionServiceExecutions.extServiceGetRequest(descriptionUrl).execute().returnContent().asString();
+    return 
extensionRequestManager.requestExtensionDescription(descriptionUrl).responseBody();
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
index 691714faba..5ca4f3227b 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.manager.function;
 
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.function.FunctionState;
 import org.apache.streampipes.model.function.FunctionsShutdownResponse;
@@ -26,19 +26,23 @@ import 
org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
-import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 
 public class FunctionManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FunctionManager.class);
   private static final String FUNCTION_STOP_PATH = "/api/v1/functions/stop";
 
-  public static void stopAllFunctionsAndPersistState(IFunctionStateStorage 
functionStateStorage) {
+  private final ExtensionServiceRequestManager requestManager;
+
+  public FunctionManager(ExtensionServiceRequestManager requestManager) {
+    this.requestManager = requestManager;
+  }
+
+  public void stopAllFunctionsAndPersistState(IFunctionStateStorage 
functionStateStorage) {
     var extensions = 
StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage().findAll();
 
     LOG.info("Triggering function stop at {} extension services...", 
extensions.size());
@@ -50,23 +54,22 @@ public class FunctionManager {
     });
   }
 
-  private static FunctionsShutdownResponse 
triggerFunctionStop(SpServiceRegistration service) {
+  private FunctionsShutdownResponse triggerFunctionStop(SpServiceRegistration 
service) {
     var endpoint = service.getServiceUrl() + FUNCTION_STOP_PATH;
 
     try {
       LOG.info("Triggering function stop at {}", endpoint);
-      var response = 
ExtensionServiceExecutions.extServicePostRequestAsServiceAdmin(endpoint).execute();
-      var httpResponse = response.returnResponse();
-      int statusCode = httpResponse.getStatusLine().getStatusCode();
+      var response = requestManager.requestFunctionStop(endpoint);
+      int statusCode = response.statusCode();
 
       if (statusCode >= 200 && statusCode < 300) {
         LOG.debug("Function stop triggered at {} (HTTP {})", 
service.getSvcId(), statusCode);
-        if (httpResponse.getEntity() == null) {
+        if (response.responseBody() == null) {
           return null;
         }
 
         return JacksonSerializer.getObjectMapper().readValue(
-            EntityUtils.toString(httpResponse.getEntity(), 
StandardCharsets.UTF_8),
+            response.responseBody(),
             FunctionsShutdownResponse.class
         );
       } else {
@@ -80,7 +83,7 @@ public class FunctionManager {
     }
   }
 
-  private static void persistReturnedFunctionStates(IFunctionStateStorage 
functionStateStorage,
+  private void persistReturnedFunctionStates(IFunctionStateStorage 
functionStateStorage,
                                                     FunctionsShutdownResponse 
shutdownResponse) {
     if (shutdownResponse == null || shutdownResponse.getFunctions() == null) {
       return;
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
index bf2e38aae6..a0fcf39e16 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.streampipes.manager.matching.output;
 
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
+import 
org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager;
 import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -28,17 +31,14 @@ import 
org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
 import com.google.gson.JsonSyntaxException;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.entity.ContentType;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 
 public class CustomTransformOutputSchemaGenerator extends 
OutputSchemaGenerator<CustomTransformOutputStrategy> {
 
   private DataProcessorInvocation dataProcessorInvocation;
   private CustomTransformOutputStrategy outputStrategy;
+  private final ExtensionServiceRequestManager requestManager;
 
   public static CustomTransformOutputSchemaGenerator from(OutputStrategy 
strategy, DataProcessorInvocation invocation) {
     return new 
CustomTransformOutputSchemaGenerator((CustomTransformOutputStrategy) strategy, 
invocation);
@@ -48,6 +48,7 @@ public class CustomTransformOutputSchemaGenerator extends 
OutputSchemaGenerator<
                                               DataProcessorInvocation 
invocation) {
     super(strategy);
     this.dataProcessorInvocation = invocation;
+    this.requestManager = new HttpExtensionServiceRequestManager();
   }
 
 
@@ -69,18 +70,20 @@ public class CustomTransformOutputSchemaGenerator extends 
OutputSchemaGenerator<
           dataProcessorInvocation.getAppId(),
           SpServiceUrlProvider.DATA_PROCESSOR
       );
-      Response httpResp = Request.Post(endpointUrl + 
"/output").bodyString(httpRequestBody,
-          ContentType
-              .APPLICATION_JSON).execute();
-      return handleResponse(httpResp);
+      var response = requestManager.requestOutputSchema(endpointUrl + 
"/output", httpRequestBody);
+      return handleResponse(response);
     } catch (Exception e) {
       e.printStackTrace();
       return new EventSchema();
     }
   }
 
-  private EventSchema handleResponse(Response httpResp) throws 
JsonSyntaxException, IOException {
-    String resp = httpResp.returnContent().asString(StandardCharsets.UTF_8);
+  private EventSchema handleResponse(ExtensionServiceOperationResult response) 
throws JsonSyntaxException, IOException {
+    if (!response.isSuccess()) {
+      throw new IOException("Could not compute output schema, status code: " + 
response.statusCode());
+    }
+
+    String resp = response.responseBody();
 
     return JacksonSerializer
         .getObjectMapper()
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
index 1bc61b0ae2..ff45a709f8 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
@@ -19,7 +19,7 @@
 package org.apache.streampipes.manager.migration;
 
 import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
 import org.apache.streampipes.model.extensions.migration.MigrationRequest;
@@ -45,9 +45,14 @@ import static 
org.apache.streampipes.manager.migration.MigrationUtils.getRequest
 public abstract class AbstractMigrationManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractMigrationManager.class);
+  private final ExtensionServiceRequestManager extensionRequestManager;
 
   protected static final String MIGRATION_ENDPOINT = "api/v1/migrations";
 
+  protected AbstractMigrationManager(ExtensionServiceRequestManager 
extensionRequestManager) {
+    this.extensionRequestManager = extensionRequestManager;
+  }
+
   /**
    * Performs the actual migration of a pipeline element.
    * This includes the communication with the extensions service which runs 
the migration.
@@ -71,15 +76,15 @@ public abstract class AbstractMigrationManager {
 
       String serializedRequest = 
JacksonSerializer.getObjectMapper().writeValueAsString(migrationRequest);
 
-      var migrationResponse = ExtensionServiceExecutions.extServicePostRequest(
+      var migrationResponse = extensionRequestManager.requestMigration(
           url,
           serializedRequest
-      ).execute();
+      );
 
       TypeReference<MigrationResult<T>> typeReference = new TypeReference<>() {
       };
 
-      String migrationResponseString = 
migrationResponse.returnContent().asString();
+      String migrationResponseString = migrationResponse.responseBody();
       return JacksonSerializer
           .getObjectMapper()
           .readValue(migrationResponseString, typeReference);
@@ -138,10 +143,7 @@ public abstract class AbstractMigrationManager {
   protected void performUpdate(String requestUrl) {
 
     try {
-      var entityPayload = 
ExtensionServiceExecutions.extServiceGetRequest(requestUrl)
-          .execute()
-          .returnContent()
-          .asString();
+      var entityPayload = 
extensionRequestManager.requestDescriptionUpdate(requestUrl).responseBody();
       var updateResult = new 
TypeExtractor(entityPayload).getTypeVerifier().verifyAndUpdate();
       if (!updateResult.isSuccess()) {
         LOG.error(
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index ab461575f3..4741de8f84 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.manager.migration;
 
 import org.apache.streampipes.commons.prometheus.pipelines.PipelinesStats;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
@@ -55,7 +56,9 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
 
   public PipelineElementMigrationManager(IPipelineStorage pipelineStorage,
                                          IDataProcessorStorage 
dataProcessorStorage,
-                                         IDataSinkStorage dataSinkStorage) {
+                                         IDataSinkStorage dataSinkStorage,
+                                         ExtensionServiceRequestManager 
extensionServiceRequestManager) {
+    super(extensionServiceRequestManager);
     this.pipelineStorage = pipelineStorage;
     this.dataProcessorStorage = dataProcessorStorage;
     this.dataSinkStorage = dataSinkStorage;
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java
index eb212bb33e..c327931124 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java
@@ -18,7 +18,7 @@
 package org.apache.streampipes.manager.remote;
 
 import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
-import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
@@ -27,31 +27,32 @@ import 
org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;
 
 import com.google.gson.JsonSyntaxException;
-import org.apache.http.client.fluent.Response;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 
 public class ContainerProvidedOptionsHandler {
 
+  private final ExtensionServiceRequestManager extensionRequestManager;
+
+  public ContainerProvidedOptionsHandler(ExtensionServiceRequestManager 
extensionRequestManager) {
+    this.extensionRequestManager = extensionRequestManager;
+  }
 
   public RuntimeOptionsResponse fetchRemoteOptions(RuntimeOptionsRequest 
request) {
 
     try {
       var payload = 
JacksonSerializer.getObjectMapper().writeValueAsString(request);
       var url = getEndpointUrl(request.getAppId());
-      var resp = ExtensionServiceExecutions.extServicePostRequest(url, 
payload).execute();
-
-      return handleResponse(resp);
+      var response = 
extensionRequestManager.requestContainerProvidedOptions(url, payload);
+      return handleResponse(response.responseBody());
     } catch (Exception e) {
       e.printStackTrace();
       return new RuntimeOptionsResponse();
     }
   }
 
-  private RuntimeOptionsResponse handleResponse(Response httpResp) throws 
JsonSyntaxException, IOException {
-    String resp = httpResp.returnContent().asString(StandardCharsets.UTF_8);
-    return JacksonSerializer.getObjectMapper().readValue(resp, 
RuntimeOptionsResponse.class);
+  private RuntimeOptionsResponse handleResponse(String responseBody) throws 
JsonSyntaxException, IOException {
+    return JacksonSerializer.getObjectMapper().readValue(responseBody, 
RuntimeOptionsResponse.class);
   }
 
   private String getEndpointUrl(String appId) throws 
NoServiceEndpointsAvailableException {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
index f7e14125a5..0cea95bf8d 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.manager.setup;
 import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.commons.environment.Environments;
 import 
org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.model.client.setup.InitialSettings;
 
 import org.slf4j.Logger;
@@ -35,19 +36,25 @@ public class AutoInstallation implements 
BackgroundTaskNotifier {
   private static final Logger LOG = 
LoggerFactory.getLogger(AutoInstallation.class);
 
   private final Environment env;
+  private final ExtensionServiceRequestManager extensionServiceRequestManager;
   private final AtomicInteger errorCount = new AtomicInteger();
   private int numberOfBackgroundSteps = 0;
   private int executedBackgroundSteps = 0;
 
-  public AutoInstallation() {
+  public AutoInstallation(ExtensionServiceRequestManager 
extensionServiceRequestManager) {
     this.env = Environments.getEnvironment();
+    this.extensionServiceRequestManager = extensionServiceRequestManager;
   }
 
   public void startAutoInstallation() {
     InitialSettings settings = collectInitialSettings();
 
     List<InstallationStep> steps = 
InstallationConfiguration.getInstallationSteps(settings);
-    List<Runnable> backgroundSteps = 
InstallationConfiguration.getBackgroundInstallationSteps(settings, this);
+    List<Runnable> backgroundSteps = 
InstallationConfiguration.getBackgroundInstallationSteps(
+        settings,
+        this,
+        extensionServiceRequestManager
+    );
     numberOfBackgroundSteps = backgroundSteps.size();
 
     steps.forEach(step -> {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java
index 524ee4f32a..2027f4a500 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/ExtensionsInstallationTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.manager.setup;
 
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.extensions.AvailableExtensionsProvider;
 import org.apache.streampipes.model.client.setup.InitialSettings;
 import org.apache.streampipes.model.extensions.ExtensionItemDescription;
@@ -41,13 +42,16 @@ public class ExtensionsInstallationTask implements Runnable 
{
   private final InitialSettings settings;
   private final BackgroundTaskNotifier callback;
   private final INoSqlStorage storage;
+  private final ExtensionServiceRequestManager extensionServiceRequestManager;
 
   public ExtensionsInstallationTask(InitialSettings settings,
                                     INoSqlStorage storage,
-                                    BackgroundTaskNotifier callback) {
+                                    BackgroundTaskNotifier callback,
+                                    ExtensionServiceRequestManager 
extensionServiceRequestManager) {
     this.settings = settings;
     this.storage = storage;
     this.callback = callback;
+    this.extensionServiceRequestManager = extensionServiceRequestManager;
   }
 
   @Override
@@ -76,7 +80,8 @@ public class ExtensionsInstallationTask implements Runnable {
       for (ExtensionItemDescription extensionItem : availableExtensions) {
         steps.add(new PipelineElementInstallationStep(
             extensionItem,
-            settings.getInitialAdminUserSid())
+            settings.getInitialAdminUserSid(),
+            extensionServiceRequestManager)
         );
       }
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
index 294fd7073b..af4baa93c6 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/InstallationConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.manager.setup;
 
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.model.client.setup.InitialSettings;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
@@ -42,7 +43,13 @@ public class InstallationConfiguration {
   }
 
   public static List<Runnable> getBackgroundInstallationSteps(InitialSettings 
settings,
-                                                              
BackgroundTaskNotifier callback) {
-    return List.of(new ExtensionsInstallationTask(settings, 
StorageDispatcher.INSTANCE.getNoSqlStore(), callback));
+                                                              
BackgroundTaskNotifier callback,
+                                                              
ExtensionServiceRequestManager extensionServiceRequestManager) {
+    return List.of(new ExtensionsInstallationTask(
+        settings,
+        StorageDispatcher.INSTANCE.getNoSqlStore(),
+        callback,
+        extensionServiceRequestManager
+    ));
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
index 0bbb8f31b1..6a3e55f829 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/PipelineElementInstallationStep.java
@@ -18,6 +18,7 @@
 package org.apache.streampipes.manager.setup;
 
 import org.apache.streampipes.commons.exceptions.SepaParseException;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.extensions.ExtensionItemInstaller;
 import org.apache.streampipes.manager.extensions.ExtensionsResourceUrlProvider;
 import org.apache.streampipes.model.extensions.ExtensionItemDescription;
@@ -30,12 +31,15 @@ public class PipelineElementInstallationStep extends 
InstallationStep {
 
   private final ExtensionItemDescription extensionItem;
   private final String principalSid;
+  private final ExtensionServiceRequestManager extensionServiceRequestManager;
 
 
   public PipelineElementInstallationStep(ExtensionItemDescription 
extensionItem,
-                                         String principalSid) {
+                                         String principalSid,
+                                         ExtensionServiceRequestManager 
extensionServiceRequestManager) {
     this.extensionItem = extensionItem;
     this.principalSid = principalSid;
+    this.extensionServiceRequestManager = extensionServiceRequestManager;
   }
 
   @Override
@@ -43,7 +47,8 @@ public class PipelineElementInstallationStep extends 
InstallationStep {
     var installationReq = 
ExtensionItemInstallationRequest.fromDescription(extensionItem, true);
     var resourceUrlProvider = new 
ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery());
     try {
-      new 
ExtensionItemInstaller(resourceUrlProvider).installExtension(installationReq, 
principalSid);
+      new ExtensionItemInstaller(resourceUrlProvider, 
extensionServiceRequestManager)
+          .installExtension(installationReq, principalSid);
       logSuccess(getTitle());
     } catch (SepaParseException | IOException e) {
       logFailure(getTitle());
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index 462294286c..1c72d3b94a 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -22,6 +22,7 @@ import 
org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
 import org.apache.streampipes.manager.file.FileManager;
 import org.apache.streampipes.manager.pipeline.PipelineCacheManager;
@@ -56,7 +57,7 @@ public class ResetManagement {
    *
    * @param username of the user to delte the resources
    */
-  public static void reset(String username) {
+  public static void reset(String username, WorkerRestClient workerRestClient) 
{
     logger.info("Start resetting the system");
 
     setHideTutorialToFalse(username);
@@ -65,7 +66,7 @@ public class ResetManagement {
 
     stopAndDeleteAllPipelines();
 
-    stopAndDeleteAllAdapters();
+    stopAndDeleteAllAdapters(workerRestClient);
 
     deleteAllFiles();
 
@@ -101,13 +102,14 @@ public class ResetManagement {
     });
   }
 
-  private static void stopAndDeleteAllAdapters() {
+  private static void stopAndDeleteAllAdapters(WorkerRestClient 
workerRestClient) {
     AdapterMasterManagement adapterMasterManagement = new 
AdapterMasterManagement(
         StorageDispatcher.INSTANCE.getNoSqlStore()
                                   .getAdapterInstanceStorage(),
         new SpResourceManager().manageAdapters(),
         new SpResourceManager().manageDataStreams(),
-        AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+        AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+        workerRestClient
     );
 
     List<AdapterDescription> allAdapters = 
adapterMasterManagement.getAllAdapterInstances();
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
index 1d41f47cc4..fa1b1d798b 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.rest.impl;
 
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
@@ -33,11 +34,17 @@ import 
org.springframework.web.bind.annotation.RestController;
 @RequestMapping("/api/v2/pe/options")
 public class ContainerProvidedOptions extends AbstractRestResource {
 
+  private final ContainerProvidedOptionsHandler 
containerProvidedOptionsHandler;
+
+  public ContainerProvidedOptions(ExtensionServiceRequestManager 
extensionServiceRequestManager) {
+    this.containerProvidedOptionsHandler = new 
ContainerProvidedOptionsHandler(extensionServiceRequestManager);
+  }
+
   @PostMapping(
       produces = MediaType.APPLICATION_JSON_VALUE,
       consumes = MediaType.APPLICATION_JSON_VALUE
   )
   public ResponseEntity<RuntimeOptionsResponse> 
fetchRemoteOptions(@RequestBody RuntimeOptionsRequest request) {
-    return ok(new 
ContainerProvidedOptionsHandler().fetchRemoteOptions(request));
+    return ok(containerProvidedOptionsHandler.fetchRemoteOptions(request));
   }
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ResetResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ResetResource.java
index bbf7f2aa2c..66eb908b43 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ResetResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ResetResource.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.rest.impl;
 
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import org.apache.streampipes.model.client.user.Principal;
 import org.apache.streampipes.model.client.user.PrincipalType;
 import org.apache.streampipes.model.message.Notifications;
@@ -38,10 +39,16 @@ import java.util.ArrayList;
 @RequestMapping("/api/v2/reset")
 public class ResetResource extends AbstractAuthGuardedRestResource {
 
-  @PostMapping(produces =  MediaType.APPLICATION_JSON_VALUE)
+  private final WorkerRestClient workerRestClient;
+
+  public ResetResource(WorkerRestClient workerRestClient) {
+    this.workerRestClient = workerRestClient;
+  }
+
+  @PostMapping(produces = MediaType.APPLICATION_JSON_VALUE)
   @Operation(summary = "Resets StreamPipes instance")
   public ResponseEntity<SuccessMessage> reset() {
-    ResetManagement.reset(getAuthenticatedUsername());
+    ResetManagement.reset(getAuthenticatedUsername(), workerRestClient);
     var userStorage = getUserStorage();
 
 
@@ -50,7 +57,7 @@ public class ResetResource extends 
AbstractAuthGuardedRestResource {
     for (var user : allUsers) {
       if (user.getPrincipalType() == PrincipalType.USER_ACCOUNT
           && !user.getPrincipalId().equals(getAuthenticatedUserSid())) {
-        ResetManagement.reset(user.getUsername());
+        ResetManagement.reset(user.getUsername(), workerRestClient);
         userStorage.deleteUser(user.getPrincipalId());
       }
     }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java
index 42bb1403e4..24a120bd09 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/ExtensionsInstallationResource.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.rest.impl.admin;
 
 import org.apache.streampipes.commons.exceptions.SepaParseException;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.assets.AssetManager;
 import org.apache.streampipes.manager.extensions.ExtensionItemInstaller;
 import org.apache.streampipes.manager.extensions.ExtensionsResourceUrlProvider;
@@ -49,13 +50,19 @@ import java.io.IOException;
 @PreAuthorize(AuthConstants.IS_ADMIN_ROLE)
 public class ExtensionsInstallationResource extends 
AbstractAuthGuardedRestResource {
 
+  private final ExtensionServiceRequestManager extensionServiceRequestManager;
+
+  public ExtensionsInstallationResource(ExtensionServiceRequestManager 
extensionServiceRequestManager) {
+    this.extensionServiceRequestManager = extensionServiceRequestManager;
+  }
+
   @PostMapping(
       consumes = MediaType.APPLICATION_JSON_VALUE,
       produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<Message> addElement(@RequestBody 
ExtensionItemInstallationRequest installationReq) {
     var descriptionUrlProvider = new 
ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery());
     try {
-      return ok(new ExtensionItemInstaller(descriptionUrlProvider)
+      return ok(new ExtensionItemInstaller(descriptionUrlProvider, 
extensionServiceRequestManager)
           .installExtension(installationReq, getAuthenticatedUserSid()));
     } catch (IOException | SepaParseException e) {
       return constructErrorMessage(new 
Notification(NotificationType.PARSE_ERROR, e.getMessage()));
@@ -68,7 +75,7 @@ public class ExtensionsInstallationResource extends 
AbstractAuthGuardedRestResou
   public ResponseEntity<Message> updateElement(@RequestBody 
ExtensionItemInstallationRequest installationReq) {
     var descriptionUrlProvider = new 
ExtensionsResourceUrlProvider(SpServiceDiscovery.getServiceDiscovery());
     try {
-      return ok(new ExtensionItemInstaller(descriptionUrlProvider)
+      return ok(new ExtensionItemInstaller(descriptionUrlProvider, 
extensionServiceRequestManager)
           .updateExtension(installationReq));
     } catch (IOException | SepaParseException e) {
       return constructErrorMessage(new 
Notification(NotificationType.PARSE_ERROR, e.getMessage()));
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
index 4af17cb901..5fc813d6de 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
@@ -19,7 +19,9 @@
 package org.apache.streampipes.rest.impl.admin;
 
 import 
org.apache.streampipes.connect.management.management.AdapterMigrationManager;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import org.apache.streampipes.health.monitoring.ServiceRegistrationManager;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
 import org.apache.streampipes.manager.health.CoreServiceStatusManager;
 import 
org.apache.streampipes.manager.migration.PipelineElementMigrationManager;
@@ -73,6 +75,14 @@ public class MigrationResource extends 
AbstractAuthGuardedRestResource {
   private final CoreServiceStatusManager coreServiceStatusManager = new 
CoreServiceStatusManager(
       getNoSqlStorage().getSpCoreConfigurationStorage()
   );
+  private final ExtensionServiceRequestManager extensionServiceRequestManager;
+  private final WorkerRestClient workerRestClient;
+
+  public MigrationResource(ExtensionServiceRequestManager 
extensionServiceRequestManager,
+                           WorkerRestClient workerRestClient) {
+    this.extensionServiceRequestManager = extensionServiceRequestManager;
+    this.workerRestClient = workerRestClient;
+  }
 
   @PostMapping(path = "{serviceId}", consumes = 
MediaType.APPLICATION_JSON_VALUE)
   @Operation(
@@ -119,12 +129,17 @@ public class MigrationResource extends 
AbstractAuthGuardedRestResource {
                 List.of(SpServiceTagPrefix.DATA_PROCESSOR, 
SpServiceTagPrefix.DATA_SINK)
             );
 
-            new AdapterMigrationManager(adapterStorage, 
adapterDescriptionStorage)
+            new AdapterMigrationManager(
+                adapterStorage,
+                adapterDescriptionStorage,
+                workerRestClient,
+                extensionServiceRequestManager)
               .handleMigrations(extensionsServiceConfig, adapterMigrations);
             new PipelineElementMigrationManager(
                 pipelineStorage,
                 dataProcessorStorage,
-                dataSinkStorage)
+                dataSinkStorage,
+                extensionServiceRequestManager)
                 .handleMigrations(extensionsServiceConfig, 
pipelineElementMigrations);
           }
         }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
index b9a79917e0..d07cf9c55a 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AdapterResource.java
@@ -23,6 +23,7 @@ import 
org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
 import 
org.apache.streampipes.connect.management.management.AdapterUpdateManagement;
 import 
org.apache.streampipes.connect.management.management.CompactAdapterManagement;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import org.apache.streampipes.manager.pipeline.PipelineManager;
 import org.apache.streampipes.model.client.user.DefaultRole;
 import org.apache.streampipes.model.client.user.Permission;
@@ -69,13 +70,14 @@ public class AdapterResource extends 
AbstractAdapterResource<AdapterMasterManage
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AdapterResource.class);
 
-  public AdapterResource() {
+  public AdapterResource(WorkerRestClient workerRestClient) {
     super(() -> new AdapterMasterManagement(
         StorageDispatcher.INSTANCE.getNoSqlStore()
             .getAdapterInstanceStorage(),
         new SpResourceManager().manageAdapters(),
         new SpResourceManager().manageDataStreams(),
-        AdapterMetricsManager.INSTANCE.getAdapterMetrics()));
+        AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+        workerRestClient));
   }
 
   @PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE)
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java
index 2101cb3fe6..a4211f2a9e 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/CompactAdapterResource.java
@@ -25,6 +25,10 @@ import 
org.apache.streampipes.connect.management.compact.PersistPipelineHandler;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
 import 
org.apache.streampipes.connect.management.management.AdapterUpdateManagement;
 import 
org.apache.streampipes.connect.management.management.CompactAdapterManagement;
+import org.apache.streampipes.connect.management.management.GuessManagement;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
+import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import 
org.apache.streampipes.manager.pipeline.compact.CompactPipelineManagement;
 import org.apache.streampipes.model.connect.adapter.compact.CompactAdapter;
 import org.apache.streampipes.model.message.Notifications;
@@ -54,15 +58,23 @@ public class CompactAdapterResource extends 
AbstractAdapterResource<AdapterMaste
   private final CompactAdapterManagement compactAdapterManagement;
   private final AdapterUpdateManagement adapterUpdateManagement;
 
-  public CompactAdapterResource() {
+  public CompactAdapterResource(WorkerRestClient workerRestClient,
+                                ExtensionServiceRequestManager 
extensionServiceRequestManager) {
     super(() -> new AdapterMasterManagement(
         StorageDispatcher.INSTANCE.getNoSqlStore()
                                   .getAdapterInstanceStorage(),
         new SpResourceManager().manageAdapters(),
         new SpResourceManager().manageDataStreams(),
-        AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+        AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+        workerRestClient
     ));
-    this.compactAdapterManagement = new CompactAdapterManagement(new 
AdapterGenerationSteps().getGenerators());
+    var guessManagement = new GuessManagement(
+        new ExtensionsServiceEndpointGenerator(),
+        extensionServiceRequestManager
+    );
+    this.compactAdapterManagement = new CompactAdapterManagement(
+        new AdapterGenerationSteps(guessManagement).getGenerators()
+    );
     this.adapterUpdateManagement = new 
AdapterUpdateManagement(managementService);
   }
 
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
index 5d22800231..3dfc02014a 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java
@@ -23,6 +23,7 @@ import 
org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.commons.media.ImageMimeTypeDetector;
 import 
org.apache.streampipes.connect.management.management.DescriptionManagement;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import 
org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator;
 import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -49,8 +50,8 @@ public class DescriptionResource extends 
AbstractAdapterResource<DescriptionMana
   private static final Logger LOG = 
LoggerFactory.getLogger(DescriptionResource.class);
   private final IExtensionsServiceEndpointGenerator endpointGenerator;
 
-  public DescriptionResource() {
-    super(DescriptionManagement::new);
+  public DescriptionResource(WorkerRestClient workerRestClient) {
+    super(() -> new DescriptionManagement(workerRestClient));
     endpointGenerator = new ExtensionsServiceEndpointGenerator();
   }
 
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
index 921fb565cd..1b57e21491 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/GuessResource.java
@@ -22,6 +22,8 @@ import 
org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableExce
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.connect.management.management.GuessManagement;
 import 
org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
+import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.monitoring.SpLogMessage;
 import org.apache.streampipes.model.schema.EventSchema;
@@ -44,8 +46,8 @@ public class GuessResource extends 
AbstractAdapterResource<GuessManagement> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GuessResource.class);
 
-  public GuessResource() {
-    super(GuessManagement::new);
+  public GuessResource(ExtensionServiceRequestManager 
extensionServiceRequestManager) {
+    super(() -> new GuessManagement(new ExtensionsServiceEndpointGenerator(), 
extensionServiceRequestManager));
   }
 
 
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
index fddde0f7a9..1d471a1ce2 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java
@@ -48,10 +48,12 @@ public class RuntimeResolvableResource extends 
AbstractAdapterResource<Void> {
   private static final Logger LOG = 
LoggerFactory.getLogger(RuntimeResolvableResource.class);
 
   private final IExtensionsServiceEndpointGenerator endpointGenerator;
+  private final WorkerRestClient workerRestClient;
 
-  public RuntimeResolvableResource() {
+  public RuntimeResolvableResource(WorkerRestClient workerRestClient) {
     super();
     this.endpointGenerator = new ExtensionsServiceEndpointGenerator();
+    this.workerRestClient = workerRestClient;
   }
 
   @PostMapping(
@@ -69,7 +71,7 @@ public class RuntimeResolvableResource extends 
AbstractAdapterResource<Void> {
           
runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags()
       );
       
SecretProvider.getDecryptionService().applyConfig(runtimeOptionsRequest.getStaticProperties());
-      RuntimeOptionsResponse result = 
WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest);
+      RuntimeOptionsResponse result = 
workerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest);
 
       return ok(result);
     } catch (AdapterException | NoServiceEndpointsAvailableException e) {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java
similarity index 50%
copy from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
copy to 
streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java
index ac9b73705a..4e2bd33111 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/DetachHttpRequest.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java
@@ -15,28 +15,25 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.service.core;
 
-package org.apache.streampipes.manager.execution.http;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
+import 
org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager;
 
-import org.apache.streampipes.model.api.EndpointSelectable;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
 
-import org.apache.http.client.fluent.Request;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Configuration
+public class ExtensionServiceRequestConfiguration {
 
-
-public class DetachHttpRequest extends PipelineElementHttpRequest {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(DetachHttpRequest.class);
-
-  @Override
-  protected Request initRequest(EndpointSelectable pipelineElement, String 
endpointUrl) {
-    LOG.info("Detaching element: " + endpointUrl);
-    return Request.Delete(endpointUrl);
+  @Bean
+  public ExtensionServiceRequestManager extensionServiceRequestManager() {
+    return new HttpExtensionServiceRequestManager();
   }
 
-  @Override
-  protected void logError(String endpointUrl, String pipelineElementName, 
String exceptionMessage) {
-    LOG.error("Could not stop pipeline element {} at {}: {}", endpointUrl, 
pipelineElementName, exceptionMessage);
+  @Bean
+  public WorkerRestClient workerRestClient(ExtensionServiceRequestManager 
extensionServiceRequestManager) {
+    return new WorkerRestClient(extensionServiceRequestManager);
   }
 }
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
index c08739b1d2..465247cd53 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
@@ -21,10 +21,12 @@ package org.apache.streampipes.service.core;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
 import 
org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import org.apache.streampipes.health.monitoring.ExtensionHealthCheck;
 import org.apache.streampipes.health.monitoring.PostStartupRecovery;
 import org.apache.streampipes.health.monitoring.ResourceProvider;
 import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.pipeline.Pipeline;
@@ -56,11 +58,15 @@ public class PostStartupTask implements Runnable {
   private final ScheduledExecutorService executorService;
   private final WorkerAdministrationManagement workerAdministrationManagement;
   private final PostStartupRecovery postStartupRecovery;
+  private final ExtensionServiceRequestManager extensionServiceRequestManager;
 
   private final INoSqlStorage storage = 
StorageDispatcher.INSTANCE.getNoSqlStore();
 
-  public PostStartupTask(IPipelineStorage pipelineStorage) {
+  public PostStartupTask(IPipelineStorage pipelineStorage,
+                         ExtensionServiceRequestManager 
extensionServiceRequestManager,
+                         WorkerRestClient workerRestClient) {
     this.pipelineStorage = pipelineStorage;
+    this.extensionServiceRequestManager = extensionServiceRequestManager;
     this.executorService = Executors.newSingleThreadScheduledExecutor();
     var resourceManager = new SpResourceManager();
     this.workerAdministrationManagement = new WorkerAdministrationManagement(
@@ -77,16 +83,18 @@ public class PostStartupTask implements Runnable {
                     
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
                     new SpResourceManager().manageAdapters(),
                     new SpResourceManager().manageDataStreams(),
-                    AdapterMetricsManager.INSTANCE.getAdapterMetrics()
+                    AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+                    workerRestClient
                 )
-            )
+            ),
+            extensionServiceRequestManager
         )
     );
   }
 
   @Override
   public void run() {
-    new ServiceHealthCheck(storage.getExtensionsServiceStorage()).run();
+    new ServiceHealthCheck(storage.getExtensionsServiceStorage(), 
extensionServiceRequestManager).run();
     performAdapterAssetUpdate();
     startAllPreviouslyStoppedPipelines();
     runHealthCheckOnce();
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index d7029f2bcf..541bb57850 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.service.core;
 import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
 import 
org.apache.streampipes.connect.management.management.AdapterMasterManagement;
+import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import org.apache.streampipes.connect.transformer.api.TransformationEngine;
 import org.apache.streampipes.connect.transformer.api.TransformationEngines;
 import org.apache.streampipes.connect.transformer.groovy.GroovyScriptEngine;
@@ -29,6 +30,7 @@ import 
org.apache.streampipes.health.monitoring.ResourceProvider;
 import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
 import org.apache.streampipes.loadbalance.LoadManager;
 import 
org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import org.apache.streampipes.manager.function.FunctionManager;
 import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
 import org.apache.streampipes.manager.health.CoreServiceStatusManager;
@@ -82,7 +84,7 @@ import java.util.function.Supplier;
 @EnableScheduling
 @Import({OpenApiConfiguration.class, StreamPipesPasswordEncoder.class,
     StreamPipesPrometheusConfig.class, WebSecurityConfig.class, 
WelcomePageController.class,
-    StorageApiConfiguration.class})
+    StorageApiConfiguration.class, ExtensionServiceRequestConfiguration.class})
 @ComponentScan({"org.apache.streampipes.rest.*", 
"org.apache.streampipes.service.core.oauth2",
     "org.apache.streampipes.service.core.scheduler"})
 public class StreamPipesCoreApplication extends StreamPipesServiceBase {
@@ -99,6 +101,12 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
   @Autowired
   private IFunctionStateStorage functionStateStorage;
 
+  @Autowired
+  private ExtensionServiceRequestManager extensionServiceRequestManager;
+
+  @Autowired
+  private WorkerRestClient workerRestClient;
+
   public static void main(String[] args) {
     StreamPipesCoreApplication application = new StreamPipesCoreApplication();
     application.initialize(() -> List.of(
@@ -159,12 +167,17 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
     new ApplyDefaultRolesAndPrivilegesTask().execute();
     coreStatusManager.updateCoreStatus(SpCoreConfigurationStatus.READY);
 
-    executorService.schedule(new PostStartupTask(getPipelineStorage()),
+    executorService.schedule(new PostStartupTask(
+            getPipelineStorage(),
+            extensionServiceRequestManager,
+            workerRestClient),
         env.getInitialHealthCheckDelayInMillis().getValueOrDefault(),
         TimeUnit.MILLISECONDS);
 
     
scheduleHealthChecks(env.getHealthCheckIntervalInMillis().getValueOrDefault(), 
List
-        .of(new 
ServiceHealthCheck(StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage()),
+        .of(new ServiceHealthCheck(
+                
StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage(),
+                extensionServiceRequestManager),
             new ExtensionHealthCheck(
                 new ResourceProvider(
                     
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI(),
@@ -173,9 +186,10 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
                         
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
                         new SpResourceManager().manageAdapters(),
                         new SpResourceManager().manageDataStreams(),
-                        AdapterMetricsManager.INSTANCE.getAdapterMetrics()
-                    )
-                )
+                        AdapterMetricsManager.INSTANCE.getAdapterMetrics(),
+                        workerRestClient
+                    )),
+                extensionServiceRequestManager
             )));
 
     var logFetchInterval = 
env.getLogFetchIntervalInMillis().getValueOrDefault();
@@ -212,7 +226,7 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
     try {
       TimeUnit.MILLISECONDS.sleep(initialSleepBeforeInstallation);
       LOG.info("Starting installation procedure");
-      new AutoInstallation().startAutoInstallation();
+      new 
AutoInstallation(extensionServiceRequestManager).startAutoInstallation();
     } catch (InterruptedException e) {
       LOG.error("Ooops, something went wrong during the installation", e);
     }
@@ -242,7 +256,7 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
       }
     });
 
-    FunctionManager.stopAllFunctionsAndPersistState(functionStateStorage);
+    new 
FunctionManager(extensionServiceRequestManager).stopAllFunctionsAndPersistState(functionStateStorage);
 
     LOG.info("Thanks for using Apache StreamPipes - see you next time!");
   }

Reply via email to