luoluoyuyu commented on code in PR #3795:
URL: https://github.com/apache/streampipes/pull/3795#discussion_r2463840824
##########
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/loadbalance/unit/ResourceUnitScanner.java:
##########
@@ -0,0 +1,347 @@
+package org.apache.streampipes.manager.loadbalance.unit;
+
+import org.apache.streampipes.manager.pipeline.PipelineManager;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.loadbalancer.LoadBalanceResourceUnit;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Resource unit scanner for on-demand discovery of pipeline elements
+ * Does not cache data in memory, instead scans pipelines and adapters when
needed
+ */
+public class ResourceUnitScanner {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ResourceUnitScanner.class);
+
+ /**
+ * Result of scanning a service for resource units
+ */
+ public static class ServiceResourceUnits {
+ private final List<PipelineElementPartitioner.PartitionResult>
pipelineUnits;
+ private final
List<PipelineElementPartitioner.AdapterResourceUnitWithServices> adapterUnits;
+
+ public ServiceResourceUnits(
+ List<PipelineElementPartitioner.PartitionResult> pipelineUnits,
+ List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
adapterUnits) {
+ this.pipelineUnits = pipelineUnits;
+ this.adapterUnits = adapterUnits;
+ }
+
+ public List<PipelineElementPartitioner.PartitionResult> getPipelineUnits()
{
+ return pipelineUnits;
+ }
+
+ public List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
getAdapterUnits() {
+ return adapterUnits;
+ }
+
+ public int getTotalUnits() {
+ return pipelineUnits.size() + adapterUnits.size();
+ }
+
+ public boolean isEmpty() {
+ return pipelineUnits.isEmpty() && adapterUnits.isEmpty();
+ }
+ }
+
+ /**
+ * Scan service and generate partitioned resource units
+ * This is the main method that combines scanning with
PipelineElementPartitioner logic
+ * @param service Service registration
+ * @return Service resource units containing partitioned pipeline and
adapter units
+ */
+ public static ServiceResourceUnits
scanAndPartitionService(SpServiceRegistration service) {
+ logger.info("Scanning and partitioning service {}", service.getSvcId());
+
+ // Scan and partition pipeline elements
+ List<PipelineElementPartitioner.PartitionResult> pipelineUnits =
+ scanAndPartitionPipeline(service);
+
+ // Scan and create adapter units
+ List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
adapterUnits =
+ scanAndCreateAdapter(service);
+
+ logger.info("Service {} has {} pipeline units and {} adapter units",
+ service.getSvcId(), pipelineUnits.size(), adapterUnits.size());
+
+ return new ServiceResourceUnits(pipelineUnits, adapterUnits);
+ }
+
+ /**
+ * Scan service and partition pipeline elements according to
PipelineElementPartitioner rules
+ * @param service Service registration
+ * @return List of partitioned resource units
+ */
+ private static List<LoadBalanceResourceUnit<InvocableStreamPipesEntity>>
scanAndPartitionPipelineElements(
+ SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ List<Pipeline> allPipelines = PipelineManager.getAllPipelines();
+
+ // Find pipelines using this service
+ List<Pipeline> relevantPipelines = allPipelines.stream()
+ .filter(pipeline -> pipelineUsesService(pipeline, serviceUrl))
+ .toList();
Review Comment:
Please filter out Pipelines that are not in the running state
##########
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/loadbalance/unit/ResourceUnitScanner.java:
##########
@@ -0,0 +1,347 @@
+package org.apache.streampipes.manager.loadbalance.unit;
+
+import org.apache.streampipes.manager.pipeline.PipelineManager;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.loadbalancer.LoadBalanceResourceUnit;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Resource unit scanner for on-demand discovery of pipeline elements
+ * Does not cache data in memory, instead scans pipelines and adapters when
needed
+ */
+public class ResourceUnitScanner {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ResourceUnitScanner.class);
+
+ /**
+ * Result of scanning a service for resource units
+ */
+ public static class ServiceResourceUnits {
+ private final List<PipelineElementPartitioner.PartitionResult>
pipelineUnits;
+ private final
List<PipelineElementPartitioner.AdapterResourceUnitWithServices> adapterUnits;
+
+ public ServiceResourceUnits(
+ List<PipelineElementPartitioner.PartitionResult> pipelineUnits,
+ List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
adapterUnits) {
+ this.pipelineUnits = pipelineUnits;
+ this.adapterUnits = adapterUnits;
+ }
+
+ public List<PipelineElementPartitioner.PartitionResult> getPipelineUnits()
{
+ return pipelineUnits;
+ }
+
+ public List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
getAdapterUnits() {
+ return adapterUnits;
+ }
+
+ public int getTotalUnits() {
+ return pipelineUnits.size() + adapterUnits.size();
+ }
+
+ public boolean isEmpty() {
+ return pipelineUnits.isEmpty() && adapterUnits.isEmpty();
+ }
+ }
+
+ /**
+ * Scan service and generate partitioned resource units
+ * This is the main method that combines scanning with
PipelineElementPartitioner logic
+ * @param service Service registration
+ * @return Service resource units containing partitioned pipeline and
adapter units
+ */
+ public static ServiceResourceUnits
scanAndPartitionService(SpServiceRegistration service) {
+ logger.info("Scanning and partitioning service {}", service.getSvcId());
+
+ // Scan and partition pipeline elements
+ List<PipelineElementPartitioner.PartitionResult> pipelineUnits =
+ scanAndPartitionPipeline(service);
+
+ // Scan and create adapter units
+ List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
adapterUnits =
+ scanAndCreateAdapter(service);
+
+ logger.info("Service {} has {} pipeline units and {} adapter units",
+ service.getSvcId(), pipelineUnits.size(), adapterUnits.size());
+
+ return new ServiceResourceUnits(pipelineUnits, adapterUnits);
+ }
+
+ /**
+ * Scan service and partition pipeline elements according to
PipelineElementPartitioner rules
+ * @param service Service registration
+ * @return List of partitioned resource units
+ */
+ private static List<LoadBalanceResourceUnit<InvocableStreamPipesEntity>>
scanAndPartitionPipelineElements(
+ SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ List<Pipeline> allPipelines = PipelineManager.getAllPipelines();
+
+ // Find pipelines using this service
+ List<Pipeline> relevantPipelines = allPipelines.stream()
+ .filter(pipeline -> pipelineUsesService(pipeline, serviceUrl))
+ .toList();
+
+ logger.debug("Found {} pipelines using service {}",
relevantPipelines.size(), service.getSvcId());
+
+ List<LoadBalanceResourceUnit<InvocableStreamPipesEntity>> resourceUnits =
new ArrayList<>();
+
+ // Process each pipeline
+ for (Pipeline pipeline : relevantPipelines) {
+ // Extract elements running on this service
+ List<DataSinkInvocation> serviceSinks = extractServiceSinks(pipeline,
serviceUrl);
+ List<DataProcessorInvocation> serviceProcessors =
extractServiceProcessors(pipeline, serviceUrl);
+
+ if (serviceSinks.isEmpty() && serviceProcessors.isEmpty()) {
+ continue;
+ }
+
+ // Use PipelineElementPartitioner to partition these elements
+ PipelineElementPartitioner.PartitionResult partitionResult =
+ PipelineElementPartitioner.partitionElements(serviceSinks,
serviceProcessors, pipeline.getLabels());
+
+ // Add partitioned units with service ID set
+ for (PipelineElementPartitioner.ResourceUnitWithServices
unitWithServices : partitionResult.getResourceUnits()) {
+ LoadBalanceResourceUnit<InvocableStreamPipesEntity> unit =
unitWithServices.getResourceUnit();
+ unit.setServiceId(service.getSvcId());
+ unit.setPipelineId(pipeline.getPipelineId());
+ resourceUnits.add(unit);
+ }
+ }
+
+ return resourceUnits;
+ }
+
+ /**
+ * Scan service and partition pipeline elements according to
PipelineElementPartitioner rules
+ * @param service Service registration
+ * @return List of partitioned resource units
+ */
+ private static List<PipelineElementPartitioner.PartitionResult>
scanAndPartitionPipeline(SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ List<Pipeline> allPipelines = PipelineManager.getAllPipelines();
+
+ // Find pipelines using this service
+ List<Pipeline> relevantPipelines = allPipelines.stream()
+ .filter(pipeline -> pipelineUsesService(pipeline, serviceUrl))
+ .toList();
Review Comment:
Please filter out Pipelines that are not in the running state
##########
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/loadbalance/unit/ResourceUnitScanner.java:
##########
@@ -0,0 +1,347 @@
+package org.apache.streampipes.manager.loadbalance.unit;
+
+import org.apache.streampipes.manager.pipeline.PipelineManager;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.loadbalancer.LoadBalanceResourceUnit;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Resource unit scanner for on-demand discovery of pipeline elements
+ * Does not cache data in memory, instead scans pipelines and adapters when
needed
+ */
+public class ResourceUnitScanner {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ResourceUnitScanner.class);
+
+ /**
+ * Result of scanning a service for resource units
+ */
+ public static class ServiceResourceUnits {
+ private final List<PipelineElementPartitioner.PartitionResult>
pipelineUnits;
+ private final
List<PipelineElementPartitioner.AdapterResourceUnitWithServices> adapterUnits;
+
+ public ServiceResourceUnits(
+ List<PipelineElementPartitioner.PartitionResult> pipelineUnits,
+ List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
adapterUnits) {
+ this.pipelineUnits = pipelineUnits;
+ this.adapterUnits = adapterUnits;
+ }
+
+ public List<PipelineElementPartitioner.PartitionResult> getPipelineUnits()
{
+ return pipelineUnits;
+ }
+
+ public List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
getAdapterUnits() {
+ return adapterUnits;
+ }
+
+ public int getTotalUnits() {
+ return pipelineUnits.size() + adapterUnits.size();
+ }
+
+ public boolean isEmpty() {
+ return pipelineUnits.isEmpty() && adapterUnits.isEmpty();
+ }
+ }
+
+ /**
+ * Scan service and generate partitioned resource units
+ * This is the main method that combines scanning with
PipelineElementPartitioner logic
+ * @param service Service registration
+ * @return Service resource units containing partitioned pipeline and
adapter units
+ */
+ public static ServiceResourceUnits
scanAndPartitionService(SpServiceRegistration service) {
+ logger.info("Scanning and partitioning service {}", service.getSvcId());
+
+ // Scan and partition pipeline elements
+ List<PipelineElementPartitioner.PartitionResult> pipelineUnits =
+ scanAndPartitionPipeline(service);
+
+ // Scan and create adapter units
+ List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
adapterUnits =
+ scanAndCreateAdapter(service);
+
+ logger.info("Service {} has {} pipeline units and {} adapter units",
+ service.getSvcId(), pipelineUnits.size(), adapterUnits.size());
+
+ return new ServiceResourceUnits(pipelineUnits, adapterUnits);
+ }
+
+ /**
+ * Scan service and partition pipeline elements according to
PipelineElementPartitioner rules
+ * @param service Service registration
+ * @return List of partitioned resource units
+ */
+ private static List<LoadBalanceResourceUnit<InvocableStreamPipesEntity>>
scanAndPartitionPipelineElements(
+ SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ List<Pipeline> allPipelines = PipelineManager.getAllPipelines();
+
+ // Find pipelines using this service
+ List<Pipeline> relevantPipelines = allPipelines.stream()
+ .filter(pipeline -> pipelineUsesService(pipeline, serviceUrl))
+ .toList();
+
+ logger.debug("Found {} pipelines using service {}",
relevantPipelines.size(), service.getSvcId());
+
+ List<LoadBalanceResourceUnit<InvocableStreamPipesEntity>> resourceUnits =
new ArrayList<>();
+
+ // Process each pipeline
+ for (Pipeline pipeline : relevantPipelines) {
+ // Extract elements running on this service
+ List<DataSinkInvocation> serviceSinks = extractServiceSinks(pipeline,
serviceUrl);
+ List<DataProcessorInvocation> serviceProcessors =
extractServiceProcessors(pipeline, serviceUrl);
+
+ if (serviceSinks.isEmpty() && serviceProcessors.isEmpty()) {
+ continue;
+ }
+
+ // Use PipelineElementPartitioner to partition these elements
+ PipelineElementPartitioner.PartitionResult partitionResult =
+ PipelineElementPartitioner.partitionElements(serviceSinks,
serviceProcessors, pipeline.getLabels());
+
+ // Add partitioned units with service ID set
+ for (PipelineElementPartitioner.ResourceUnitWithServices
unitWithServices : partitionResult.getResourceUnits()) {
+ LoadBalanceResourceUnit<InvocableStreamPipesEntity> unit =
unitWithServices.getResourceUnit();
+ unit.setServiceId(service.getSvcId());
+ unit.setPipelineId(pipeline.getPipelineId());
+ resourceUnits.add(unit);
+ }
+ }
+
+ return resourceUnits;
+ }
+
+ /**
+ * Scan service and partition pipeline elements according to
PipelineElementPartitioner rules
+ * @param service Service registration
+ * @return List of partitioned resource units
+ */
+ private static List<PipelineElementPartitioner.PartitionResult>
scanAndPartitionPipeline(SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ List<Pipeline> allPipelines = PipelineManager.getAllPipelines();
+
+ // Find pipelines using this service
+ List<Pipeline> relevantPipelines = allPipelines.stream()
+ .filter(pipeline -> pipelineUsesService(pipeline, serviceUrl))
+ .toList();
+
+ logger.debug("Found {} pipelines using service {}",
relevantPipelines.size(), service.getSvcId());
+
+ List<PipelineElementPartitioner.PartitionResult> resourceUnits = new
ArrayList<>();
+
+ // Process each pipeline
+ for (Pipeline pipeline : relevantPipelines) {
+ // Extract elements running on this service
+ List<DataSinkInvocation> serviceSinks = extractServiceSinks(pipeline,
serviceUrl);
+ List<DataProcessorInvocation> serviceProcessors =
extractServiceProcessors(pipeline, serviceUrl);
+
+ if (serviceSinks.isEmpty() && serviceProcessors.isEmpty()) {
+ continue;
+ }
+
+ // Use PipelineElementPartitioner to partition these elements
+ PipelineElementPartitioner.PartitionResult partitionResult =
+ PipelineElementPartitioner.partitionElements(serviceSinks,
serviceProcessors, pipeline.getLabels());
+
+ resourceUnits.add(partitionResult);
+ }
+
+ return resourceUnits;
+ }
+
+ /**
+ * Scan service and create adapter resource units
+ * @param service Service registration
+ * @return List of adapter resource units
+ */
+ private static List<LoadBalanceResourceUnit<AdapterDescription>>
scanAndCreateAdapterUnits(
+ SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ var adapterStorage =
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterDescriptionStorage();
+ List<AdapterDescription> allAdapters = adapterStorage.findAll();
+
+ // Find adapters running on this service
+ List<AdapterDescription> serviceAdapters = allAdapters.stream()
+ .filter(adapter -> serviceUrl != null &&
serviceUrl.equals(adapter.getSelectedEndpointUrl()))
+ .toList();
Review Comment:
Please filter out adapters that are not in the running state
##########
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/loadbalance/unit/ResourceUnitScanner.java:
##########
@@ -0,0 +1,347 @@
+package org.apache.streampipes.manager.loadbalance.unit;
+
+import org.apache.streampipes.manager.pipeline.PipelineManager;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.loadbalancer.LoadBalanceResourceUnit;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Resource unit scanner for on-demand discovery of pipeline elements
+ * Does not cache data in memory, instead scans pipelines and adapters when
needed
+ */
+public class ResourceUnitScanner {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ResourceUnitScanner.class);
+
+ /**
+ * Result of scanning a service for resource units
+ */
+ public static class ServiceResourceUnits {
+ private final List<PipelineElementPartitioner.PartitionResult>
pipelineUnits;
+ private final
List<PipelineElementPartitioner.AdapterResourceUnitWithServices> adapterUnits;
+
+ public ServiceResourceUnits(
+ List<PipelineElementPartitioner.PartitionResult> pipelineUnits,
+ List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
adapterUnits) {
+ this.pipelineUnits = pipelineUnits;
+ this.adapterUnits = adapterUnits;
+ }
+
+ public List<PipelineElementPartitioner.PartitionResult> getPipelineUnits()
{
+ return pipelineUnits;
+ }
+
+ public List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
getAdapterUnits() {
+ return adapterUnits;
+ }
+
+ public int getTotalUnits() {
+ return pipelineUnits.size() + adapterUnits.size();
+ }
+
+ public boolean isEmpty() {
+ return pipelineUnits.isEmpty() && adapterUnits.isEmpty();
+ }
+ }
+
+ /**
+ * Scan service and generate partitioned resource units
+ * This is the main method that combines scanning with
PipelineElementPartitioner logic
+ * @param service Service registration
+ * @return Service resource units containing partitioned pipeline and
adapter units
+ */
+ public static ServiceResourceUnits
scanAndPartitionService(SpServiceRegistration service) {
+ logger.info("Scanning and partitioning service {}", service.getSvcId());
+
+ // Scan and partition pipeline elements
+ List<PipelineElementPartitioner.PartitionResult> pipelineUnits =
+ scanAndPartitionPipeline(service);
+
+ // Scan and create adapter units
+ List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
adapterUnits =
+ scanAndCreateAdapter(service);
+
+ logger.info("Service {} has {} pipeline units and {} adapter units",
+ service.getSvcId(), pipelineUnits.size(), adapterUnits.size());
+
+ return new ServiceResourceUnits(pipelineUnits, adapterUnits);
+ }
+
+ /**
+ * Scan service and partition pipeline elements according to
PipelineElementPartitioner rules
+ * @param service Service registration
+ * @return List of partitioned resource units
+ */
+ private static List<LoadBalanceResourceUnit<InvocableStreamPipesEntity>>
scanAndPartitionPipelineElements(
+ SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ List<Pipeline> allPipelines = PipelineManager.getAllPipelines();
+
+ // Find pipelines using this service
+ List<Pipeline> relevantPipelines = allPipelines.stream()
+ .filter(pipeline -> pipelineUsesService(pipeline, serviceUrl))
+ .toList();
+
+ logger.debug("Found {} pipelines using service {}",
relevantPipelines.size(), service.getSvcId());
+
+ List<LoadBalanceResourceUnit<InvocableStreamPipesEntity>> resourceUnits =
new ArrayList<>();
+
+ // Process each pipeline
+ for (Pipeline pipeline : relevantPipelines) {
+ // Extract elements running on this service
+ List<DataSinkInvocation> serviceSinks = extractServiceSinks(pipeline,
serviceUrl);
+ List<DataProcessorInvocation> serviceProcessors =
extractServiceProcessors(pipeline, serviceUrl);
+
+ if (serviceSinks.isEmpty() && serviceProcessors.isEmpty()) {
+ continue;
+ }
+
+ // Use PipelineElementPartitioner to partition these elements
+ PipelineElementPartitioner.PartitionResult partitionResult =
+ PipelineElementPartitioner.partitionElements(serviceSinks,
serviceProcessors, pipeline.getLabels());
+
+ // Add partitioned units with service ID set
+ for (PipelineElementPartitioner.ResourceUnitWithServices
unitWithServices : partitionResult.getResourceUnits()) {
+ LoadBalanceResourceUnit<InvocableStreamPipesEntity> unit =
unitWithServices.getResourceUnit();
+ unit.setServiceId(service.getSvcId());
+ unit.setPipelineId(pipeline.getPipelineId());
+ resourceUnits.add(unit);
+ }
+ }
+
+ return resourceUnits;
+ }
+
+ /**
+ * Scan service and partition pipeline elements according to
PipelineElementPartitioner rules
+ * @param service Service registration
+ * @return List of partitioned resource units
+ */
+ private static List<PipelineElementPartitioner.PartitionResult>
scanAndPartitionPipeline(SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ List<Pipeline> allPipelines = PipelineManager.getAllPipelines();
+
+ // Find pipelines using this service
+ List<Pipeline> relevantPipelines = allPipelines.stream()
+ .filter(pipeline -> pipelineUsesService(pipeline, serviceUrl))
+ .toList();
+
+ logger.debug("Found {} pipelines using service {}",
relevantPipelines.size(), service.getSvcId());
+
+ List<PipelineElementPartitioner.PartitionResult> resourceUnits = new
ArrayList<>();
+
+ // Process each pipeline
+ for (Pipeline pipeline : relevantPipelines) {
+ // Extract elements running on this service
+ List<DataSinkInvocation> serviceSinks = extractServiceSinks(pipeline,
serviceUrl);
+ List<DataProcessorInvocation> serviceProcessors =
extractServiceProcessors(pipeline, serviceUrl);
+
+ if (serviceSinks.isEmpty() && serviceProcessors.isEmpty()) {
+ continue;
+ }
+
+ // Use PipelineElementPartitioner to partition these elements
+ PipelineElementPartitioner.PartitionResult partitionResult =
+ PipelineElementPartitioner.partitionElements(serviceSinks,
serviceProcessors, pipeline.getLabels());
+
+ resourceUnits.add(partitionResult);
+ }
+
+ return resourceUnits;
+ }
+
+ /**
+ * Scan service and create adapter resource units
+ * @param service Service registration
+ * @return List of adapter resource units
+ */
+ private static List<LoadBalanceResourceUnit<AdapterDescription>>
scanAndCreateAdapterUnits(
+ SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ var adapterStorage =
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterDescriptionStorage();
+ List<AdapterDescription> allAdapters = adapterStorage.findAll();
+
+ // Find adapters running on this service
+ List<AdapterDescription> serviceAdapters = allAdapters.stream()
+ .filter(adapter -> serviceUrl != null &&
serviceUrl.equals(adapter.getSelectedEndpointUrl()))
+ .toList();
+
+ logger.debug("Found {} adapters running on service {}",
serviceAdapters.size(), service.getSvcId());
+
+ // Create resource unit for each adapter (adapters don't need partitioning)
+ List<LoadBalanceResourceUnit<AdapterDescription>> adapterUnits = new
ArrayList<>();
+ for (AdapterDescription adapter : serviceAdapters) {
+ LoadBalanceResourceUnit<AdapterDescription> unit = new
LoadBalanceResourceUnit<>();
+ unit.setPipelineId(adapter.getElementId());
+ unit.setServiceId(service.getSvcId());
+ unit.setLabels(Collections.EMPTY_LIST);
+ unit.addElement(adapter);
+ adapterUnits.add(unit);
+ }
+
+ return adapterUnits;
+ }
+
+
+ /**
+ * Scan service and create adapter resource units
+ * @param service Service registration
+ * @return List of adapter resource units
+ */
+ private static
List<PipelineElementPartitioner.AdapterResourceUnitWithServices>
scanAndCreateAdapter(
+ SpServiceRegistration service) {
+
+ String serviceUrl = service.getServiceUrl();
+ var adapterStorage =
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterDescriptionStorage();
+ List<AdapterDescription> allAdapters = adapterStorage.findAll();
+
+ // Find adapters running on this service
+ List<AdapterDescription> serviceAdapters = allAdapters.stream()
+ .filter(adapter -> serviceUrl != null &&
serviceUrl.equals(adapter.getSelectedEndpointUrl()))
+ .toList();
Review Comment:
Please filter out adapters that are not in the running state
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]