This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch add-broker-communication-mode
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit d2d214618087b0db6d1e655bb6e2305ee9db5499
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri Mar 13 17:26:48 2026 +0100

    feat: Add initial version of broker communication mode
---
 pom.xml                                            |   1 +
 .../apache/streampipes/commons/constants/Envs.java |   6 +
 .../commons/environment/DefaultEnvironment.java    |  15 ++
 .../commons/environment/Environment.java           |   6 +
 .../sinks/internal/jvm/datalake/DataLakeSink.java  |   2 +-
 .../messaging/kafka/SpKafkaConsumer.java           |   2 +-
 .../messaging/kafka/SpKafkaProducer.java           |   6 +-
 .../ExtensionServiceBrokerErrorEnvelope.java       |  50 ++--
 .../ExtensionServiceBrokerRequestEnvelope.java     |  72 ++++++
 .../ExtensionServiceBrokerResponseEnvelope.java    |  72 ++++++
 .../transport/ExtensionServiceBrokerTopics.java    |  59 +++++
 .../transport/ExtensionServiceTransportMode.java   |  46 ++--
 .../pom.xml                                        |  52 ++--
 .../extensions/ExtensionBrokerRequestReceiver.java | 239 ++++++++++++++++++
 .../extensions/ExtensionServiceOperationType.java  |   1 +
 .../extensions/ExtensionServiceRequestTarget.java  |  15 +-
 .../extensions/ExtensionServiceRequestTargets.java |   9 +
 .../core/ExtensionServiceRequestConfiguration.java |  37 ++-
 .../service/core/StreamPipesCoreApplication.java   |   2 +-
 .../extensions/CoreExtensionTransportMode.java     |  38 +--
 .../extensions/CoreNatsRequestReplyClient.java     |  82 +++++++
 ...ansportAwareExtensionServiceRequestManager.java | 267 +++++++++++++++++++++
 streampipes-service-extensions/pom.xml             |   5 +
 .../StreamPipesExtensionsServiceBase.java          |  33 +++
 .../standalone/manager/ProtocolManager.java        |   2 +-
 25 files changed, 1006 insertions(+), 113 deletions(-)

diff --git a/pom.xml b/pom.xml
index efbe0d1d6c..7d45f2e858 100644
--- a/pom.xml
+++ b/pom.xml
@@ -912,6 +912,7 @@
         <module>streampipes-service-core-minimal</module>
         <module>streampipes-service-discovery</module>
         <module>streampipes-service-discovery-api</module>
+        <module>streampipes-nats-extensions</module>
         <module>streampipes-service-extensions</module>
         <module>streampipes-storage-api</module>
         <module>streampipes-storage-couchdb</module>
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 5974facc9c..c7cb5b9519 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -101,6 +101,12 @@ public enum Envs {
   SP_NATS_PORT("SP_NATS_PORT", "4222"),
 
   SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"),
+  SP_CORE_EXTENSION_TRANSPORT_MODE("SP_CORE_EXTENSION_TRANSPORT_MODE", "auto"),
+  SP_EXTENSION_TRANSPORT_MODE("SP_EXTENSION_TRANSPORT_MODE", "http"),
+  SP_EXTENSION_REQUEST_TOPIC_PREFIX(
+      "SP_EXTENSION_REQUEST_TOPIC_PREFIX",
+      "sp.extensions.request"
+  ),
 
   CPU_RESOURCE_WEIGHT("SP_CPU_RESOURCE_WEIGHT", "1.0"),
   MEMORY_RESOURCE_WEIGHT("SP_MEMORY_RESOURCE_WEIGHT", "1.0"),
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index dd01aa8af7..5ff5b0e568 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -312,6 +312,21 @@ public class DefaultEnvironment implements Environment {
     return new StringEnvironmentVariable(Envs.SP_PULSAR_URL);
   }
 
+  @Override
+  public StringEnvironmentVariable getCoreExtensionTransportMode() {
+    return new 
StringEnvironmentVariable(Envs.SP_CORE_EXTENSION_TRANSPORT_MODE);
+  }
+
+  @Override
+  public StringEnvironmentVariable getExtensionTransportMode() {
+    return new StringEnvironmentVariable(Envs.SP_EXTENSION_TRANSPORT_MODE);
+  }
+
+  @Override
+  public StringEnvironmentVariable getExtensionRequestTopicPrefix() {
+    return new 
StringEnvironmentVariable(Envs.SP_EXTENSION_REQUEST_TOPIC_PREFIX);
+  }
+
   @Override
   public StringEnvironmentVariable getCustomServiceTags() {
     return new StringEnvironmentVariable(Envs.SP_SERVICE_TAGS);
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 8949d4663a..e0e52234b6 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -156,6 +156,12 @@ public interface Environment {
 
   StringEnvironmentVariable getPulsarUrl();
 
+  StringEnvironmentVariable getCoreExtensionTransportMode();
+
+  StringEnvironmentVariable getExtensionTransportMode();
+
+  StringEnvironmentVariable getExtensionRequestTopicPrefix();
+
   StringEnvironmentVariable getCustomServiceTags();
 
   StringEnvironmentVariable getAllowedUploadFiletypes();
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 0a1b5bf1ea..40230d9e5a 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -194,7 +194,7 @@ public class DataLakeSink implements IStreamPipesDataSink, 
SupportsRuntimeConfig
         .peek(ep -> {
           // Set all properties to DIMENSION_PROPERTY when seleted in 
dimensions
           if (dimensions.contains(ep.getRuntimeName())) {
-            LOG.info("Using {} as dimension", ep.getRuntimeName());
+            LOG.debug("Using {} as dimension", ep.getRuntimeName());
             ep.setPropertyScope(PropertyScope.DIMENSION_PROPERTY.name());
           }
         })
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index ec2adcb80b..3f3939cf2e 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -88,7 +88,7 @@ public class SpKafkaConsumer implements EventConsumer, 
Runnable,
 
   @Override
   public void connect(InternalEventProcessor<byte[]> eventProcessor) throws 
SpRuntimeException {
-    LOG.info("Kafka consumer: Connecting to {}", 
protocol.getTopicDefinition().getActualTopicName());
+    LOG.debug("Kafka consumer: Connecting to {}", 
protocol.getTopicDefinition().getActualTopicName());
     var patternTopic = isPatternTopic();
     this.eventProcessor = eventProcessor;
     this.isRunning = true;
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index eb07b66c6b..68637be02c 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -93,7 +93,7 @@ public class SpKafkaProducer implements EventProducer, 
Serializable {
 
   @Override
   public void connect() {
-    LOG.info("Kafka producer: Connecting to " + 
protocol.getTopicDefinition().getActualTopicName());
+    LOG.debug("Kafka producer: Connecting to " + 
protocol.getTopicDefinition().getActualTopicName());
     this.brokerUrl = protocol.getBrokerHostname() + ":" + 
protocol.getKafkaPort();
     this.topic = protocol.getTopicDefinition().getActualTopicName();
 
@@ -106,7 +106,7 @@ public class SpKafkaProducer implements EventProducer, 
Serializable {
     this.producer = new KafkaProducer<>(makeProperties(protocol, 
Collections.emptyList()));
     this.connected = true;
 
-    LOG.info("Successfully created Kafka producer for topic " + this.topic);
+    LOG.debug("Successfully created Kafka producer for topic " + this.topic);
   }
 
   /**
@@ -136,7 +136,7 @@ public class SpKafkaProducer implements EventProducer, 
Serializable {
       LOG.info("Successfully created Kafka topic " + topic);
 
     } else {
-      LOG.info("Topic " + topic + "already exists in the broker, skipping 
topic creation");
+      LOG.debug("Topic " + topic + "already exists in the broker, skipping 
topic creation");
     }
   }
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerErrorEnvelope.java
similarity index 55%
copy from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
copy to 
streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerErrorEnvelope.java
index 53fa966da1..5bb2619fb6 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerErrorEnvelope.java
@@ -16,24 +16,34 @@
  *
  */
 
-package org.apache.streampipes.manager.api.extensions;
-
-public enum ExtensionServiceOperationType {
-  CONTAINER_PROVIDED_OPTIONS,
-  MIGRATION,
-  DESCRIPTION_UPDATE,
-  EXTENSION_DESCRIPTION,
-  FUNCTION_STOP,
-  ADAPTER_STATE_CHANGE,
-  RUNTIME_OPTIONS,
-  SAMPLE_DATA,
-  EXTENSION_INSTANCE_HEALTH,
-  SERVICE_HEALTH,
-  PIPELINE_ELEMENT_INVOCATION,
-  PIPELINE_ELEMENT_DETACH,
-  PIPELINE_ELEMENT_ASSETS,
-  ADAPTER_ASSETS,
-  ADAPTER_ICON_ASSET,
-  ADAPTER_DOCUMENTATION_ASSET,
-  OUTPUT_SCHEMA;
+package org.apache.streampipes.model.extensions.transport;
+
+public class ExtensionServiceBrokerErrorEnvelope {
+
+  private String errorType;
+  private String message;
+
+  public ExtensionServiceBrokerErrorEnvelope() {
+  }
+
+  public ExtensionServiceBrokerErrorEnvelope(String errorType, String message) 
{
+    this.errorType = errorType;
+    this.message = message;
+  }
+
+  public String getErrorType() {
+    return errorType;
+  }
+
+  public void setErrorType(String errorType) {
+    this.errorType = errorType;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
 }
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerRequestEnvelope.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerRequestEnvelope.java
new file mode 100644
index 0000000000..2f43dd734c
--- /dev/null
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerRequestEnvelope.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.extensions.transport;
+
+public class ExtensionServiceBrokerRequestEnvelope {
+
+  private String requestId;
+  private String operation;
+  private String payload;
+  private String authToken;
+
+  public ExtensionServiceBrokerRequestEnvelope() {
+  }
+
+  public ExtensionServiceBrokerRequestEnvelope(String requestId,
+                                               String operation,
+                                               String payload,
+                                               String authToken) {
+    this.requestId = requestId;
+    this.operation = operation;
+    this.payload = payload;
+    this.authToken = authToken;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public void setRequestId(String requestId) {
+    this.requestId = requestId;
+  }
+
+  public String getOperation() {
+    return operation;
+  }
+
+  public void setOperation(String operation) {
+    this.operation = operation;
+  }
+
+  public String getPayload() {
+    return payload;
+  }
+
+  public void setPayload(String payload) {
+    this.payload = payload;
+  }
+
+  public String getAuthToken() {
+    return authToken;
+  }
+
+  public void setAuthToken(String authToken) {
+    this.authToken = authToken;
+  }
+}
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerResponseEnvelope.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerResponseEnvelope.java
new file mode 100644
index 0000000000..95b2965546
--- /dev/null
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerResponseEnvelope.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.extensions.transport;
+
+public class ExtensionServiceBrokerResponseEnvelope {
+
+  private String requestId;
+  private int statusCode;
+  private String payload;
+  private ExtensionServiceBrokerErrorEnvelope error;
+
+  public ExtensionServiceBrokerResponseEnvelope() {
+  }
+
+  public ExtensionServiceBrokerResponseEnvelope(String requestId,
+                                                int statusCode,
+                                                String payload,
+                                                
ExtensionServiceBrokerErrorEnvelope error) {
+    this.requestId = requestId;
+    this.statusCode = statusCode;
+    this.payload = payload;
+    this.error = error;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public void setRequestId(String requestId) {
+    this.requestId = requestId;
+  }
+
+  public int getStatusCode() {
+    return statusCode;
+  }
+
+  public void setStatusCode(int statusCode) {
+    this.statusCode = statusCode;
+  }
+
+  public String getPayload() {
+    return payload;
+  }
+
+  public void setPayload(String payload) {
+    this.payload = payload;
+  }
+
+  public ExtensionServiceBrokerErrorEnvelope getError() {
+    return error;
+  }
+
+  public void setError(ExtensionServiceBrokerErrorEnvelope error) {
+    this.error = error;
+  }
+}
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerTopics.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerTopics.java
new file mode 100644
index 0000000000..d345b8f97e
--- /dev/null
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerTopics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.extensions.transport;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public final class ExtensionServiceBrokerTopics {
+
+  public static final String DEFAULT_REQUEST_TOPIC_PREFIX = 
"sp.extensions.request";
+
+  public static final String TRANSPORT_TAG_HTTP = "transport:http";
+  public static final String TRANSPORT_TAG_NATS = "transport:nats";
+
+  private ExtensionServiceBrokerTopics() {
+  }
+
+  public static String serviceWildcard(String topicPrefix, String serviceId) {
+    return serviceTopic(topicPrefix, serviceId, List.of()) + ".>";
+  }
+
+  public static String serviceTopic(String topicPrefix,
+                                    String serviceId,
+                                    List<String> topicSegments) {
+    return Stream.concat(
+            Stream.of(topicPrefix, serviceId),
+            topicSegments.stream())
+        .filter(Objects::nonNull)
+        .map(ExtensionServiceBrokerTopics::normalizeSegment)
+        .filter(part -> !part.isEmpty())
+        .collect(Collectors.joining("."));
+  }
+
+  private static String normalizeSegment(String value) {
+    return trimSlashes(value).replace("/", ".");
+  }
+
+  private static String trimSlashes(String value) {
+    return value.replaceAll("^/+", "").replaceAll("/+$", "");
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceTransportMode.java
similarity index 56%
copy from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
copy to 
streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceTransportMode.java
index 53fa966da1..2b48e5a0c4 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceTransportMode.java
@@ -16,24 +16,32 @@
  *
  */
 
-package org.apache.streampipes.manager.api.extensions;
+package org.apache.streampipes.model.extensions.transport;
 
-public enum ExtensionServiceOperationType {
-  CONTAINER_PROVIDED_OPTIONS,
-  MIGRATION,
-  DESCRIPTION_UPDATE,
-  EXTENSION_DESCRIPTION,
-  FUNCTION_STOP,
-  ADAPTER_STATE_CHANGE,
-  RUNTIME_OPTIONS,
-  SAMPLE_DATA,
-  EXTENSION_INSTANCE_HEALTH,
-  SERVICE_HEALTH,
-  PIPELINE_ELEMENT_INVOCATION,
-  PIPELINE_ELEMENT_DETACH,
-  PIPELINE_ELEMENT_ASSETS,
-  ADAPTER_ASSETS,
-  ADAPTER_ICON_ASSET,
-  ADAPTER_DOCUMENTATION_ASSET,
-  OUTPUT_SCHEMA;
+import java.util.Locale;
+
+public enum ExtensionServiceTransportMode {
+  HTTP,
+  NATS,
+  DUAL;
+
+  public boolean supportsHttp() {
+    return this == HTTP || this == DUAL;
+  }
+
+  public boolean supportsNats() {
+    return this == NATS || this == DUAL;
+  }
+
+  public static ExtensionServiceTransportMode from(String value) {
+    if (value == null || value.isBlank()) {
+      return HTTP;
+    }
+
+    try {
+      return 
ExtensionServiceTransportMode.valueOf(value.trim().toUpperCase(Locale.ROOT));
+    } catch (IllegalArgumentException e) {
+      return HTTP;
+    }
+  }
 }
diff --git a/streampipes-service-extensions/pom.xml 
b/streampipes-nats-extensions/pom.xml
similarity index 58%
copy from streampipes-service-extensions/pom.xml
copy to streampipes-nats-extensions/pom.xml
index d370d673d1..5dfc39b74a 100644
--- a/streampipes-service-extensions/pom.xml
+++ b/streampipes-nats-extensions/pom.xml
@@ -1,3 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   ~ Licensed to the Apache Software Foundation (ASF) under one or more
   ~ contributor license agreements.  See the NOTICE file distributed with
@@ -16,55 +17,37 @@
   ~
   -->
 
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
     <parent>
-        <artifactId>streampipes-parent</artifactId>
         <groupId>org.apache.streampipes</groupId>
+        <artifactId>streampipes-parent</artifactId>
         <version>0.99.0-SNAPSHOT</version>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>streampipes-service-extensions</artifactId>
+    <artifactId>streampipes-nats-extensions</artifactId>
 
     <dependencies>
-        <!-- StreamPipes dependencies -->
         <dependency>
             <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-service-base</artifactId>
+            <artifactId>streampipes-commons</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-connect-transformer-api</artifactId>
+            <artifactId>streampipes-extensions-management</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-connect-transformer-groovy</artifactId>
+            <artifactId>streampipes-model</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-connect-transformer-js</artifactId>
-            <version>0.99.0-SNAPSHOT</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-rest-extensions</artifactId>
-            <version>0.99.0-SNAPSHOT</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.javassist</groupId>
-                    <artifactId>javassist</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-web</artifactId>
+            <groupId>io.nats</groupId>
+            <artifactId>jnats</artifactId>
         </dependency>
 
         <!-- Test dependencies -->
@@ -73,17 +56,18 @@
             <artifactId>junit-jupiter-api</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
+
     <build>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-checkstyle-plugin</artifactId>
+                <configuration>
+                    <propertyExpansion>
+                        
checkstyle.config.base.path=${project.parent.basedir}/tools/maven
+                    </propertyExpansion>
+                </configuration>
             </plugin>
         </plugins>
     </build>
diff --git 
a/streampipes-nats-extensions/src/main/java/org/apache/streampipes/nats/extensions/ExtensionBrokerRequestReceiver.java
 
b/streampipes-nats-extensions/src/main/java/org/apache/streampipes/nats/extensions/ExtensionBrokerRequestReceiver.java
new file mode 100644
index 0000000000..df517d7c8b
--- /dev/null
+++ 
b/streampipes-nats-extensions/src/main/java/org/apache/streampipes/nats/extensions/ExtensionBrokerRequestReceiver.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.nats.extensions;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import 
org.apache.streampipes.extensions.management.connect.AdapterWorkerRequestManagement;
+import 
org.apache.streampipes.extensions.management.monitoring.ServiceMonitorManagement;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerErrorEnvelope;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerRequestEnvelope;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerResponseEnvelope;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceTransportMode;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.nats.client.Connection;
+import io.nats.client.Dispatcher;
+import io.nats.client.Message;
+import io.nats.client.Nats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExtensionBrokerRequestReceiver {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExtensionBrokerRequestReceiver.class);
+
+  private static final String ADAPTER_STATE_CHANGE_OPERATION = 
"ADAPTER_STATE_CHANGE";
+  private static final String SERVICE_LOAD_OPERATION = "SERVICE_LOAD";
+  private static final String STATE_CHANGE_START = "start";
+  private static final String STATE_CHANGE_STOP = "stop";
+  private static final int HTTP_STATUS_OK = 200;
+  private static final int HTTP_STATUS_BAD_REQUEST = 400;
+  private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR = 500;
+  private static final int HTTP_STATUS_NOT_IMPLEMENTED = 501;
+
+  private final ObjectMapper objectMapper;
+  private final ServiceMonitorManagement serviceMonitorManagement;
+  private final AdapterWorkerRequestManagement adapterWorkerRequestManagement;
+
+  private Connection natsConnection;
+  private Dispatcher dispatcher;
+
+  public ExtensionBrokerRequestReceiver() {
+    this(new ServiceMonitorManagement(), new AdapterWorkerRequestManagement());
+  }
+
+  public ExtensionBrokerRequestReceiver(ServiceMonitorManagement 
serviceMonitorManagement,
+                                        AdapterWorkerRequestManagement 
adapterWorkerRequestManagement) {
+    this.objectMapper = JacksonSerializer.getObjectMapper();
+    this.serviceMonitorManagement = serviceMonitorManagement;
+    this.adapterWorkerRequestManagement = adapterWorkerRequestManagement;
+  }
+
+  public synchronized boolean start(String serviceId,
+                                    ExtensionServiceTransportMode mode,
+                                    String topicPrefix) {
+    if (!mode.supportsNats()) {
+      return false;
+    }
+
+    try {
+      var env = Environments.getEnvironment();
+      String natsUrl = "nats://" + env.getNatsHost().getValueOrDefault()
+          + ":" + env.getNatsPort().getValueOrDefault();
+      this.natsConnection = Nats.connect(natsUrl);
+
+      String subscriptionTopic = 
ExtensionServiceBrokerTopics.serviceWildcard(topicPrefix, serviceId);
+      this.dispatcher = natsConnection.createDispatcher(this::onMessage);
+      this.dispatcher.subscribe(subscriptionTopic);
+
+      LOG.info("Extension broker receiver listening on topic {}", 
subscriptionTopic);
+      return true;
+    } catch (Exception e) {
+      LOG.warn("Could not start extension broker receiver", e);
+      stop();
+      return false;
+    }
+  }
+
+  public synchronized void stop() {
+    if (natsConnection != null && dispatcher != null) {
+      natsConnection.closeDispatcher(dispatcher);
+      dispatcher = null;
+    }
+
+    if (natsConnection != null) {
+      try {
+        natsConnection.close();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warn("Interrupted while closing extension broker receiver", e);
+      } finally {
+        natsConnection = null;
+      }
+    }
+  }
+
+  private void onMessage(Message message) {
+    String replyTo = message.getReplyTo();
+    if (replyTo == null || replyTo.isBlank()) {
+      return;
+    }
+
+    ExtensionServiceBrokerResponseEnvelope response;
+    try {
+      var request = objectMapper.readValue(message.getData(), 
ExtensionServiceBrokerRequestEnvelope.class);
+      response = handleRequest(request, message.getSubject());
+    } catch (Exception e) {
+      response = error(null, HTTP_STATUS_INTERNAL_SERVER_ERROR, e);
+    }
+
+    publishResponse(replyTo, response);
+  }
+
+  private ExtensionServiceBrokerResponseEnvelope 
handleRequest(ExtensionServiceBrokerRequestEnvelope request,
+                                                               String topic) {
+    try {
+      if (SERVICE_LOAD_OPERATION.equals(request.getOperation())) {
+        var payload = 
objectMapper.writeValueAsString(serviceMonitorManagement.getCurrentReport());
+        return new ExtensionServiceBrokerResponseEnvelope(
+            request.getRequestId(),
+            HTTP_STATUS_OK,
+            payload,
+            null
+        );
+      }
+
+      if (ADAPTER_STATE_CHANGE_OPERATION.equals(request.getOperation())) {
+        return handleAdapterStateChangeRequest(request, topic);
+      }
+
+      return new ExtensionServiceBrokerResponseEnvelope(
+          request.getRequestId(),
+          HTTP_STATUS_NOT_IMPLEMENTED,
+          null,
+          new ExtensionServiceBrokerErrorEnvelope(
+              "UnsupportedOperation",
+              "No broker handler available for operation " + 
request.getOperation()
+          )
+      );
+    } catch (Exception e) {
+      return error(request.getRequestId(), HTTP_STATUS_INTERNAL_SERVER_ERROR, 
e);
+    }
+  }
+
+  private ExtensionServiceBrokerResponseEnvelope 
handleAdapterStateChangeRequest(
+      ExtensionServiceBrokerRequestEnvelope request,
+      String topic
+  ) throws Exception {
+    if (request.getPayload() == null || request.getPayload().isBlank()) {
+      return new ExtensionServiceBrokerResponseEnvelope(
+          request.getRequestId(),
+          HTTP_STATUS_BAD_REQUEST,
+          null,
+          new ExtensionServiceBrokerErrorEnvelope("InvalidPayload", "Missing 
adapter payload")
+      );
+    }
+
+    var adapterDescription = objectMapper.readValue(request.getPayload(), 
AdapterDescription.class);
+    var command = extractStateChangeCommand(topic);
+
+    try {
+      if (STATE_CHANGE_START.equals(command)) {
+        var payload = 
objectMapper.writeValueAsString(adapterWorkerRequestManagement.invokeAdapter(adapterDescription));
+        return new 
ExtensionServiceBrokerResponseEnvelope(request.getRequestId(), HTTP_STATUS_OK, 
payload, null);
+      }
+
+      if (STATE_CHANGE_STOP.equals(command)) {
+        var payload = 
objectMapper.writeValueAsString(adapterWorkerRequestManagement.stopAdapter(adapterDescription));
+        return new 
ExtensionServiceBrokerResponseEnvelope(request.getRequestId(), HTTP_STATUS_OK, 
payload, null);
+      }
+
+      return new ExtensionServiceBrokerResponseEnvelope(
+          request.getRequestId(),
+          HTTP_STATUS_BAD_REQUEST,
+          null,
+          new ExtensionServiceBrokerErrorEnvelope(
+              "InvalidCommand",
+              "Unknown adapter state change command in topic " + topic
+          )
+      );
+    } catch (AdapterException e) {
+      return new ExtensionServiceBrokerResponseEnvelope(
+          request.getRequestId(),
+          HTTP_STATUS_INTERNAL_SERVER_ERROR,
+          objectMapper.writeValueAsString(e),
+          new 
ExtensionServiceBrokerErrorEnvelope(e.getClass().getSimpleName(), 
e.getMessage())
+      );
+    }
+  }
+
+  private String extractStateChangeCommand(String topic) {
+    int separatorIndex = topic.lastIndexOf('.');
+    if (separatorIndex < 0 || separatorIndex + 1 >= topic.length()) {
+      return "";
+    }
+
+    return topic.substring(separatorIndex + 1);
+  }
+
+  private ExtensionServiceBrokerResponseEnvelope error(String requestId, int 
statusCode, Exception e) {
+    return new ExtensionServiceBrokerResponseEnvelope(
+        requestId,
+        statusCode,
+        null,
+        new ExtensionServiceBrokerErrorEnvelope(e.getClass().getSimpleName(), 
e.getMessage())
+    );
+  }
+
+  private void publishResponse(String replyTo, 
ExtensionServiceBrokerResponseEnvelope response) {
+    if (natsConnection == null) {
+      return;
+    }
+
+    try {
+      natsConnection.publish(replyTo, 
objectMapper.writeValueAsBytes(response));
+    } catch (Exception e) {
+      LOG.warn("Could not publish broker response to subject {}", replyTo, e);
+    }
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
index 53fa966da1..92350f1ca6 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
@@ -29,6 +29,7 @@ public enum ExtensionServiceOperationType {
   SAMPLE_DATA,
   EXTENSION_INSTANCE_HEALTH,
   SERVICE_HEALTH,
+  SERVICE_LOAD,
   PIPELINE_ELEMENT_INVOCATION,
   PIPELINE_ELEMENT_DETACH,
   PIPELINE_ELEMENT_ASSETS,
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTarget.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTarget.java
index 2667b1ade6..31b311e82b 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTarget.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTarget.java
@@ -18,12 +18,13 @@
 
 package org.apache.streampipes.manager.api.extensions;
 
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 public record ExtensionServiceRequestTarget(String baseUrl,
                                             String serviceId,
@@ -80,13 +81,7 @@ public record ExtensionServiceRequestTarget(String baseUrl,
         ? List.of(operation.name().toLowerCase(Locale.ROOT))
         : topicSegments;
 
-    return Stream.concat(
-            Stream.of(topicPrefix, serviceId),
-            segments.stream())
-        .filter(Objects::nonNull)
-        .map(ExtensionServiceRequestTarget::toTopicSegment)
-        .filter(part -> !part.isEmpty())
-        .collect(Collectors.joining("."));
+    return ExtensionServiceBrokerTopics.serviceTopic(topicPrefix, serviceId, 
segments);
   }
 
   private static String trimTrailingSlash(String value) {
@@ -96,8 +91,4 @@ public record ExtensionServiceRequestTarget(String baseUrl,
   private static String trimSlashes(String value) {
     return value.replaceAll("^/+", "").replaceAll("/+$", "");
   }
-
-  private static String toTopicSegment(String value) {
-    return trimSlashes(value).replace("/", ".");
-  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTargets.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTargets.java
index 82f9972480..f1a11c56cb 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTargets.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTargets.java
@@ -134,6 +134,15 @@ public final class ExtensionServiceRequestTargets {
     );
   }
 
+  public static ExtensionServiceRequestTarget 
serviceLoad(SpServiceRegistration service) {
+    return forService(
+        service,
+        ExtensionServiceOperationType.SERVICE_LOAD,
+        path("serviceMonitor"),
+        topic("monitoring", "service-load")
+    );
+  }
+
   public static ExtensionServiceRequestTarget pipelineInvocation(String 
baseUrl,
                                                                  String 
serviceId,
                                                                  
SpServiceUrlProvider provider,
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java
index 4e2bd33111..b5831d8ab6 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java
@@ -17,19 +17,52 @@
  */
 package org.apache.streampipes.service.core;
 
+import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.connect.management.management.WorkerRestClient;
 import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
 import 
org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics;
+import 
org.apache.streampipes.service.core.extensions.CoreExtensionTransportMode;
+import 
org.apache.streampipes.service.core.extensions.CoreNatsRequestReplyClient;
+import 
org.apache.streampipes.service.core.extensions.TransportAwareExtensionServiceRequestManager;
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.time.Duration;
+
 @Configuration
 public class ExtensionServiceRequestConfiguration {
 
+  @Bean(destroyMethod = "close")
+  public CoreNatsRequestReplyClient coreNatsRequestReplyClient() {
+    var env = Environments.getEnvironment();
+    return new CoreNatsRequestReplyClient(
+        env.getNatsHost().getValueOrDefault(),
+        env.getNatsPort().getValueOrDefault(),
+        Duration.ofSeconds(2)
+    );
+  }
+
   @Bean
-  public ExtensionServiceRequestManager extensionServiceRequestManager() {
-    return new HttpExtensionServiceRequestManager();
+  public ExtensionServiceRequestManager extensionServiceRequestManager(
+      CoreNatsRequestReplyClient coreNatsRequestReplyClient
+  ) {
+    var env = Environments.getEnvironment();
+
+    var transportMode = CoreExtensionTransportMode.from(
+        env.getCoreExtensionTransportMode().getValueOrDefault()
+    );
+
+    var topicPrefix = env.getExtensionRequestTopicPrefix()
+        
.getValueOrReturn(ExtensionServiceBrokerTopics.DEFAULT_REQUEST_TOPIC_PREFIX);
+
+    return new TransportAwareExtensionServiceRequestManager(
+        new HttpExtensionServiceRequestManager(),
+        coreNatsRequestReplyClient,
+        transportMode,
+        topicPrefix
+    );
   }
 
   @Bean
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index 1bff0837dd..9517e29109 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -159,7 +159,7 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
     var env = Environments.getEnvironment();
 
     
ExtensionsServiceReportExecutor.setServiceReportFetcher(serviceRegistration -> {
-      var target = 
ExtensionServiceRequestTargets.serviceHealth(serviceRegistration, 
"serviceMonitor");
+      var target = 
ExtensionServiceRequestTargets.serviceLoad(serviceRegistration);
       var response = extensionServiceRequestManager.requestServiceLoad(target);
 
       if (!response.isSuccess()) {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreExtensionTransportMode.java
similarity index 62%
copy from 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
copy to 
streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreExtensionTransportMode.java
index 53fa966da1..45fa0b6225 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreExtensionTransportMode.java
@@ -16,24 +16,24 @@
  *
  */
 
-package org.apache.streampipes.manager.api.extensions;
+package org.apache.streampipes.service.core.extensions;
 
-public enum ExtensionServiceOperationType {
-  CONTAINER_PROVIDED_OPTIONS,
-  MIGRATION,
-  DESCRIPTION_UPDATE,
-  EXTENSION_DESCRIPTION,
-  FUNCTION_STOP,
-  ADAPTER_STATE_CHANGE,
-  RUNTIME_OPTIONS,
-  SAMPLE_DATA,
-  EXTENSION_INSTANCE_HEALTH,
-  SERVICE_HEALTH,
-  PIPELINE_ELEMENT_INVOCATION,
-  PIPELINE_ELEMENT_DETACH,
-  PIPELINE_ELEMENT_ASSETS,
-  ADAPTER_ASSETS,
-  ADAPTER_ICON_ASSET,
-  ADAPTER_DOCUMENTATION_ASSET,
-  OUTPUT_SCHEMA;
+import java.util.Locale;
+
+public enum CoreExtensionTransportMode {
+  HTTP,
+  NATS,
+  AUTO;
+
+  public static CoreExtensionTransportMode from(String value) {
+    if (value == null || value.isBlank()) {
+      return AUTO;
+    }
+
+    try {
+      return 
CoreExtensionTransportMode.valueOf(value.trim().toUpperCase(Locale.ROOT));
+    } catch (IllegalArgumentException e) {
+      return AUTO;
+    }
+  }
 }
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreNatsRequestReplyClient.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreNatsRequestReplyClient.java
new file mode 100644
index 0000000000..c7fbaa3a45
--- /dev/null
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreNatsRequestReplyClient.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.extensions;
+
+import io.nats.client.Connection;
+import io.nats.client.Message;
+import io.nats.client.Nats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+
+public class CoreNatsRequestReplyClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CoreNatsRequestReplyClient.class);
+
+  private final String natsUrl;
+  private final Duration timeout;
+  private Connection natsConnection;
+
+  public CoreNatsRequestReplyClient(String host, int port, Duration timeout) {
+    this.natsUrl = "nats://" + host + ":" + port;
+    this.timeout = timeout;
+  }
+
+  public synchronized byte[] request(String subject, byte[] payload) throws 
IOException {
+    try {
+      Message response = getConnection().request(subject, payload, timeout);
+      if (response == null) {
+        throw new IOException("No NATS response received for subject " + 
subject);
+      }
+      return response.getData();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("NATS request was interrupted for subject " + 
subject, e);
+    }
+  }
+
+  private Connection getConnection() throws IOException {
+    if (natsConnection == null || natsConnection.getStatus() != 
Connection.Status.CONNECTED) {
+      try {
+        natsConnection = Nats.connect(natsUrl);
+        LOG.info("Connected to NATS at {}", natsUrl);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Could not connect to NATS at " + natsUrl, e);
+      }
+    }
+
+    return natsConnection;
+  }
+
+  public synchronized void close() {
+    if (natsConnection != null) {
+      try {
+        natsConnection.close();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warn("Interrupted while closing NATS connection", e);
+      } finally {
+        natsConnection = null;
+      }
+    }
+  }
+}
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/TransportAwareExtensionServiceRequestManager.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/TransportAwareExtensionServiceRequestManager.java
new file mode 100644
index 0000000000..dba8764980
--- /dev/null
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/TransportAwareExtensionServiceRequestManager.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.extensions;
+
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager;
+import 
org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestTarget;
+import org.apache.streampipes.manager.util.AuthTokenUtils;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerErrorEnvelope;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerRequestEnvelope;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerResponseEnvelope;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+public class TransportAwareExtensionServiceRequestManager implements 
ExtensionServiceRequestManager {
+
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(TransportAwareExtensionServiceRequestManager.class);
+  private static final int INTERNAL_SERVER_ERROR = 500;
+
+  private final ObjectMapper objectMapper;
+  private final ExtensionServiceRequestManager httpRequestManager;
+  private final CoreNatsRequestReplyClient natsRequestReplyClient;
+  private final CoreExtensionTransportMode transportMode;
+  private final String topicPrefix;
+
+  public TransportAwareExtensionServiceRequestManager(
+      ExtensionServiceRequestManager httpRequestManager,
+      CoreNatsRequestReplyClient natsRequestReplyClient,
+      CoreExtensionTransportMode transportMode,
+      String topicPrefix
+  ) {
+    this.objectMapper = new ObjectMapper();
+    this.httpRequestManager = httpRequestManager;
+    this.natsRequestReplyClient = natsRequestReplyClient;
+    this.transportMode = transportMode;
+    this.topicPrefix = topicPrefix;
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestContainerProvidedOptions(ExtensionServiceRequestTarget target,
+                                                                         
String payload) throws IOException {
+    return httpRequestManager.requestContainerProvidedOptions(target, payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestMigration(ExtensionServiceRequestTarget target,
+                                                          String payload) 
throws IOException {
+    return httpRequestManager.requestMigration(target, payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestDescriptionUpdate(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestDescriptionUpdate(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestExtensionDescription(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestExtensionDescription(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestFunctionStop(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestFunctionStop(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestAdapterStateChange(ExtensionServiceRequestTarget target,
+                                                                   String 
elementId,
+                                                                   String 
payload) throws IOException {
+    if (useNats(target)) {
+      var authToken = AuthTokenUtils.getAuthToken(elementId);
+      return requestViaNats(target, payload, authToken);
+    }
+
+    return httpRequestManager.requestAdapterStateChange(target, elementId, 
payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestRuntimeOptions(ExtensionServiceRequestTarget target,
+                                                               String payload) 
throws IOException {
+    return httpRequestManager.requestRuntimeOptions(target, payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestSampleData(ExtensionServiceRequestTarget target,
+                                                           String payload) 
throws IOException {
+    return httpRequestManager.requestSampleData(target, payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestExtensionInstanceHealth(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestExtensionInstanceHealth(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestServiceHealth(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestServiceHealth(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestServiceLoad(ExtensionServiceRequestTarget target) throws IOException {
+    if (useNats(target)) {
+      try {
+        var response = requestServiceLoadViaNats(target);
+        if (response.isSuccess() || transportMode == 
CoreExtensionTransportMode.NATS) {
+          return response;
+        }
+
+        LOG.warn("NATS request for operation {} to service {} returned status 
{} - falling back to HTTP",
+            target.operation(), target.serviceId(), response.statusCode());
+      } catch (IOException e) {
+        if (transportMode == CoreExtensionTransportMode.NATS) {
+          throw e;
+        }
+
+        LOG.warn("NATS request for operation {} to service {} failed - falling 
back to HTTP",
+            target.operation(), target.serviceId(), e);
+      }
+    }
+
+    return httpRequestManager.requestServiceLoad(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestPipelineElementInvocation(ExtensionServiceRequestTarget target,
+                                                                          
String pipelineId,
+                                                                          
String payload) throws IOException {
+    return httpRequestManager.requestPipelineElementInvocation(target, 
pipelineId, payload);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestPipelineElementDetach(ExtensionServiceRequestTarget target,
+                                                                      String 
pipelineId) throws IOException {
+    return httpRequestManager.requestPipelineElementDetach(target, pipelineId);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestPipelineElementAssets(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestPipelineElementAssets(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestAdapterAssets(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestAdapterAssets(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestAdapterIconAsset(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestAdapterIconAsset(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestAdapterDocumentationAsset(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return httpRequestManager.requestAdapterDocumentationAsset(target);
+  }
+
+  @Override
+  public ExtensionServiceOperationResult 
requestOutputSchema(ExtensionServiceRequestTarget target,
+                                                             String payload) 
throws IOException {
+    return httpRequestManager.requestOutputSchema(target, payload);
+  }
+
+  private boolean useNats(ExtensionServiceRequestTarget target) {
+    return switch (transportMode) {
+      case HTTP -> false;
+      case NATS -> true;
+      case AUTO -> serviceSupportsNats(target);
+    };
+  }
+
+  private boolean serviceSupportsNats(ExtensionServiceRequestTarget target) {
+    var service = StorageDispatcher.INSTANCE
+        .getNoSqlStore()
+        .getExtensionsServiceStorage()
+        .getElementById(target.serviceId());
+
+    if (service == null || service.getTags() == null) {
+      return false;
+    }
+
+    return service.getTags().stream().anyMatch(tag ->
+        tag.getPrefix() == SpServiceTagPrefix.CUSTOM
+            && 
ExtensionServiceBrokerTopics.TRANSPORT_TAG_NATS.equals(tag.getValue())
+    );
+  }
+
+  private ExtensionServiceOperationResult 
requestServiceLoadViaNats(ExtensionServiceRequestTarget target)
+      throws IOException {
+    return requestViaNats(target, null, null);
+  }
+
+  private ExtensionServiceOperationResult 
requestViaNats(ExtensionServiceRequestTarget target,
+                                                         String payload,
+                                                         String authToken) 
throws IOException {
+    String topic = target.toTopic(topicPrefix);
+
+    var requestEnvelope = new ExtensionServiceBrokerRequestEnvelope(
+        UUID.randomUUID().toString(),
+        target.operation().name(),
+        payload,
+        authToken
+    );
+
+    byte[] responseBytes = natsRequestReplyClient.request(topic, 
objectMapper.writeValueAsBytes(requestEnvelope));
+    var responseEnvelope = objectMapper.readValue(responseBytes, 
ExtensionServiceBrokerResponseEnvelope.class);
+
+    return toOperationResult(responseEnvelope);
+  }
+
+  private ExtensionServiceOperationResult 
toOperationResult(ExtensionServiceBrokerResponseEnvelope responseEnvelope)
+      throws IOException {
+    byte[] body = makeBody(responseEnvelope);
+    int statusCode = responseEnvelope.getStatusCode() == 0
+        ? INTERNAL_SERVER_ERROR
+        : responseEnvelope.getStatusCode();
+
+    return new ExtensionServiceOperationResult(statusCode, body);
+  }
+
+  private byte[] makeBody(ExtensionServiceBrokerResponseEnvelope 
responseEnvelope) throws IOException {
+    if (responseEnvelope.getPayload() != null) {
+      return responseEnvelope.getPayload().getBytes(StandardCharsets.UTF_8);
+    }
+
+    ExtensionServiceBrokerErrorEnvelope error = responseEnvelope.getError();
+    if (error != null) {
+      return objectMapper.writeValueAsBytes(error);
+    }
+
+    return null;
+  }
+}
diff --git a/streampipes-service-extensions/pom.xml 
b/streampipes-service-extensions/pom.xml
index d370d673d1..4d07bc7c9c 100644
--- a/streampipes-service-extensions/pom.xml
+++ b/streampipes-service-extensions/pom.xml
@@ -61,6 +61,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-nats-extensions</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
index 995536e113..508f4d9cb8 100644
--- 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
+++ 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
@@ -36,6 +36,9 @@ import 
org.apache.streampipes.model.extensions.configuration.SpServiceConfigurat
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics;
+import 
org.apache.streampipes.model.extensions.transport.ExtensionServiceTransportMode;
+import org.apache.streampipes.nats.extensions.ExtensionBrokerRequestReceiver;
 import org.apache.streampipes.rest.shared.exception.SpRestExceptionHandler;
 import org.apache.streampipes.rest.shared.serializer.JacksonConfiguration;
 import org.apache.streampipes.service.base.BaseNetworkingConfig;
@@ -75,6 +78,9 @@ import java.util.stream.Collectors;
 public abstract class StreamPipesExtensionsServiceBase extends 
StreamPipesServiceBase {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class);
+  private final ExtensionBrokerRequestReceiver extensionBrokerRequestReceiver 
= new ExtensionBrokerRequestReceiver();
+  private ExtensionServiceTransportMode extensionTransportMode = 
ExtensionServiceTransportMode.HTTP;
+  private boolean natsBrokerReceiverActive = false;
 
   public void init() {
     SpServiceDefinition serviceDef = provideServiceDefinition();
@@ -123,6 +129,17 @@ public abstract class StreamPipesExtensionsServiceBase 
extends StreamPipesServic
   public void startExtensionsService(Class<?> serviceClass,
                                      SpServiceDefinition serviceDef,
                                      BaseNetworkingConfig networkingConfig) 
throws UnknownHostException {
+    this.extensionTransportMode = ExtensionServiceTransportMode.from(
+        
Environments.getEnvironment().getExtensionTransportMode().getValueOrDefault()
+    );
+    this.natsBrokerReceiverActive = extensionBrokerRequestReceiver.start(
+        serviceId(),
+        extensionTransportMode,
+        Environments.getEnvironment()
+            .getExtensionRequestTopicPrefix()
+            
.getValueOrReturn(ExtensionServiceBrokerTopics.DEFAULT_REQUEST_TOPIC_PREFIX)
+    );
+
     var extensions = new ExtensionItemProvider().getAllItemDescriptions();
     var req = SpServiceRegistration.from(
         DefaultSpServiceTypes.EXT,
@@ -159,10 +176,25 @@ public abstract class StreamPipesExtensionsServiceBase 
extends StreamPipesServic
           
DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup()));
     }
     tags.addAll(getExtensionsServiceTags(extensions));
+    tags.addAll(getTransportServiceTags());
     tags.addAll(new 
CustomServiceTagResolver(Environments.getEnvironment()).getCustomServiceTags());
     return tags;
   }
 
+  private Set<SpServiceTag> getTransportServiceTags() {
+    Set<SpServiceTag> tags = new HashSet<>();
+
+    if (extensionTransportMode.supportsHttp()) {
+      tags.add(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, 
ExtensionServiceBrokerTopics.TRANSPORT_TAG_HTTP));
+    }
+
+    if (extensionTransportMode.supportsNats() && natsBrokerReceiverActive) {
+      tags.add(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, 
ExtensionServiceBrokerTopics.TRANSPORT_TAG_NATS));
+    }
+
+    return tags;
+  }
+
   protected void deregisterService(String serviceId) {
     LOG.info("Deregistering service (id={})...", serviceId);
     StreamPipesClient client = new 
StreamPipesClientResolver().makeStreamPipesClientInstance();
@@ -187,6 +219,7 @@ public abstract class StreamPipesExtensionsServiceBase 
extends StreamPipesServic
 
   @PreDestroy
   public void onExit() {
+    extensionBrokerRequestReceiver.stop();
     new ExtensionsServiceShutdownHandler().onShutdown();
     deregisterService(DeclarersSingleton.getInstance().getServiceId());
   }
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
index a6a448e894..6604d9ad81 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java
@@ -47,7 +47,7 @@ public class ProtocolManager {
       return consumers.get(topicName(protocol));
     } else {
       consumers.put(topicName(protocol), makeInputCollector(protocol, 
singletonEngine));
-      LOG.info("Adding new consumer to consumer map (size=" + consumers.size() 
+ "): " + topicName(protocol));
+      LOG.debug("Adding new consumer to consumer map (size=" + 
consumers.size() + "): " + topicName(protocol));
       return consumers.get(topicName(protocol));
     }
 

Reply via email to