This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 
3131-migration-breaks-core-when-trying-to-migrate-non-existing-extension
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3131-migration-breaks-core-when-trying-to-migrate-non-existing-extension
 by this push:
     new 5bd7baf03a fix(#3131): Perform description migration only for 
installed extensions
5bd7baf03a is described below

commit 5bd7baf03a7241aa7bf6b643f39ccc5d72c75c18
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Aug 12 12:52:53 2024 +0200

    fix(#3131): Perform description migration only for installed extensions
---
 .../management/AdapterMigrationManager.java        |  11 +-
 .../migration/AbstractMigrationManager.java        |  15 ++-
 .../migration/PipelineElementMigrationManager.java | 126 +++++++++++----------
 .../rest/impl/admin/MigrationResource.java         |   4 +-
 .../storage/api/IDataProcessorStorage.java         |   4 +
 .../streampipes/storage/api/IDataSinkStorage.java  |   3 +
 .../couchdb/impl/DataProcessorStorageImpl.java     |  12 +-
 .../storage/couchdb/impl/DataSinkStorageImpl.java  |  13 ++-
 8 files changed, 119 insertions(+), 69 deletions(-)

diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
index e03b4bb168..b24e6494c7 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
@@ -22,6 +22,7 @@ import 
org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.manager.migration.AbstractMigrationManager;
 import org.apache.streampipes.manager.migration.IMigrationHandler;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.migration.ModelMigratorConfig;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 
@@ -36,9 +37,12 @@ public class AdapterMigrationManager extends 
AbstractMigrationManager implements
   private static final Logger LOG = 
LoggerFactory.getLogger(AdapterMigrationManager.class);
 
   private final IAdapterStorage adapterStorage;
+  private final IAdapterStorage adapterDescriptionStorage;
 
-  public AdapterMigrationManager(IAdapterStorage adapterStorage) {
+  public AdapterMigrationManager(IAdapterStorage adapterStorage,
+                                 IAdapterStorage adapterDescriptionStorage) {
     this.adapterStorage = adapterStorage;
+    this.adapterDescriptionStorage = adapterDescriptionStorage;
   }
 
   @Override
@@ -110,4 +114,9 @@ public class AdapterMigrationManager extends 
AbstractMigrationManager implements
       }
     }
   }
+
+  @Override
+  protected boolean isInstalled(SpServiceTagPrefix modelType, String appId) {
+    return !adapterDescriptionStorage.getAdaptersByAppId(appId).isEmpty();
+  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
index a27b19b400..1bc61b0ae2 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
@@ -23,6 +23,7 @@ import 
org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
 import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
 import org.apache.streampipes.model.extensions.migration.MigrationRequest;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.message.Notification;
 import org.apache.streampipes.model.migration.MigrationResult;
 import org.apache.streampipes.model.migration.ModelMigratorConfig;
@@ -119,14 +120,16 @@ public abstract class AbstractMigrationManager {
             )
         )
         .values()
-        .stream()
-        .peek(config -> {
-          var requestUrl = getRequestUrl(config.modelType(), 
config.targetAppId(), serviceUrl);
-          performUpdate(requestUrl);
-        })
-        .toList();
+        .forEach(config -> {
+          if (isInstalled(config.modelType(), config.targetAppId())) {
+            var requestUrl = getRequestUrl(config.modelType(), 
config.targetAppId(), serviceUrl);
+            performUpdate(requestUrl);
+          }
+        });
   }
 
+  protected abstract boolean isInstalled(SpServiceTagPrefix modelType, String 
appId);
+
   /**
    * Perform the update of the description based on the given requestUrl
    *
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index 5ac2ba0a58..2c9bd8c4f5 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.manager.migration;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.migration.MigrationResult;
@@ -67,12 +68,12 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
       LOG.info("Pipeline element descriptions are up to date.");
 
       LOG.info("Received {} pipeline element migrations from extension service 
{}.",
-          migrationConfigs.size(),
-          extensionsServiceConfig.getServiceUrl());
+        migrationConfigs.size(),
+        extensionsServiceConfig.getServiceUrl());
       var availablePipelines = pipelineStorage.findAll();
       if (!availablePipelines.isEmpty()) {
         LOG.info("Found {} available pipelines. Checking pipelines for 
applicable migrations...",
-            availablePipelines.size()
+          availablePipelines.size()
         );
       }
 
@@ -81,45 +82,45 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
           List<MigrationResult<?>> failedMigrations = new ArrayList<>();
 
           var migratedDataProcessors = pipeline.getSepas()
-              .stream()
-              .map(processor -> {
-                if (getApplicableMigration(processor, 
migrationConfigs).isPresent()) {
-                  return migratePipelineElement(
-                      processor,
-                      migrationConfigs,
-                      String.format("%s/%s/processor",
-                          extensionsServiceConfig.getServiceUrl(),
-                          MIGRATION_ENDPOINT
-                      ),
-                      failedMigrations
-                  );
-                } else {
-                  LOG.info("No migration applicable for data processor '{}'.", 
processor.getElementId());
-                  return processor;
-                }
-              })
-              .toList();
+            .stream()
+            .map(processor -> {
+              if (getApplicableMigration(processor, 
migrationConfigs).isPresent()) {
+                return migratePipelineElement(
+                  processor,
+                  migrationConfigs,
+                  String.format("%s/%s/processor",
+                    extensionsServiceConfig.getServiceUrl(),
+                    MIGRATION_ENDPOINT
+                  ),
+                  failedMigrations
+                );
+              } else {
+                LOG.info("No migration applicable for data processor '{}'.", 
processor.getElementId());
+                return processor;
+              }
+            })
+            .toList();
           pipeline.setSepas(migratedDataProcessors);
 
           var migratedDataSinks = pipeline.getActions()
-              .stream()
-              .map(sink -> {
-                if (getApplicableMigration(sink, 
migrationConfigs).isPresent()) {
-                  return migratePipelineElement(
-                      sink,
-                      migrationConfigs,
-                      String.format("%s/%s/sink",
-                          extensionsServiceConfig.getServiceUrl(),
-                          MIGRATION_ENDPOINT
-                      ),
-                      failedMigrations
-                  );
-                } else {
-                  LOG.info("No migration applicable for data sink '{}'.", 
sink.getElementId());
-                  return sink;
-                }
-              })
-              .toList();
+            .stream()
+            .map(sink -> {
+              if (getApplicableMigration(sink, migrationConfigs).isPresent()) {
+                return migratePipelineElement(
+                  sink,
+                  migrationConfigs,
+                  String.format("%s/%s/sink",
+                    extensionsServiceConfig.getServiceUrl(),
+                    MIGRATION_ENDPOINT
+                  ),
+                  failedMigrations
+                );
+              } else {
+                LOG.info("No migration applicable for data sink '{}'.", 
sink.getElementId());
+                return sink;
+              }
+            })
+            .toList();
           pipeline.setActions(migratedDataSinks);
 
           pipelineStorage.updateElement(pipeline);
@@ -140,8 +141,8 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
   private boolean shouldMigratePipeline(Pipeline pipeline,
                                         List<ModelMigratorConfig> 
migrationConfigs) {
     return Stream
-        .concat(pipeline.getSepas().stream(), pipeline.getActions().stream())
-        .anyMatch(element -> getApplicableMigration(element, 
migrationConfigs).isPresent());
+      .concat(pipeline.getSepas().stream(), pipeline.getActions().stream())
+      .anyMatch(element -> getApplicableMigration(element, 
migrationConfigs).isPresent());
   }
 
   /**
@@ -159,10 +160,10 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
    */
   protected void handleFailedMigrations(Pipeline pipeline, 
List<MigrationResult<?>> failedMigrations) {
     LOG.error("Failures in migration detected - The following pipeline 
elements could to be migrated:\n"
-        + 
StringUtils.join(failedMigrations.stream().map(Record::toString).toList()), 
"\n");
+      + 
StringUtils.join(failedMigrations.stream().map(Record::toString).toList()), 
"\n");
 
     pipeline.setPipelineNotifications(failedMigrations.stream().map(
-        failedMigration -> "Failed migration of pipeline element: 
%s".formatted(failedMigration.message())
+      failedMigration -> "Failed migration of pipeline element: 
%s".formatted(failedMigration.message())
     ).toList());
     pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
 
@@ -200,10 +201,10 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
    * @return the migrated (or - in case of a failure - updated) pipeline 
element
    */
   protected <T extends InvocableStreamPipesEntity> T migratePipelineElement(
-      T pipelineElement,
-      List<ModelMigratorConfig> modelMigrations,
-      String url,
-      List<MigrationResult<?>> failedMigrations
+    T pipelineElement,
+    List<ModelMigratorConfig> modelMigrations,
+    String url,
+    List<MigrationResult<?>> failedMigrations
   ) {
 
     // loop until no migrations are available anymore
@@ -213,15 +214,15 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
 
       var migrationConfig = getApplicableMigration(pipelineElement, 
modelMigrations).get();
       LOG.info(
-          "Found applicable migration for pipeline element '{}': {}",
-          pipelineElement.getElementId(),
-          migrationConfig
+        "Found applicable migration for pipeline element '{}': {}",
+        pipelineElement.getElementId(),
+        migrationConfig
       );
 
       var migrationResult = performMigration(
-          pipelineElement,
-          migrationConfig,
-          url
+        pipelineElement,
+        migrationConfig,
+        url
       );
 
       if (migrationResult.success()) {
@@ -250,13 +251,24 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
     List<StaticProperty> updatedStaticProperties = new ArrayList<>();
     if (pipelineElement instanceof DataProcessorInvocation) {
       updatedStaticProperties = dataProcessorStorage
-          .getFirstDataProcessorByAppId(pipelineElement.getAppId())
-          .getStaticProperties();
+        .getFirstDataProcessorByAppId(pipelineElement.getAppId())
+        .getStaticProperties();
     } else if (pipelineElement instanceof DataSinkInvocation) {
       updatedStaticProperties = dataSinkStorage
-          .getFirstDataSinkByAppId(pipelineElement.getAppId())
-          .getStaticProperties();
+        .getFirstDataSinkByAppId(pipelineElement.getAppId())
+        .getStaticProperties();
     }
     pipelineElement.setStaticProperties(updatedStaticProperties);
   }
+
+  @Override
+  protected boolean isInstalled(SpServiceTagPrefix modelType, String appId) {
+    if (modelType == SpServiceTagPrefix.DATA_PROCESSOR) {
+      return !dataProcessorStorage.getDataProcessorsByAppId(appId).isEmpty();
+    } else if (modelType == SpServiceTagPrefix.DATA_SINK) {
+      return !dataSinkStorage.getDataSinksByAppId(appId).isEmpty();
+    } else {
+      throw new RuntimeException(String.format("Wrong service tag provided: 
%s", modelType.asString()));
+    }
+  }
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
index a940e9d9b0..58baea3101 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
@@ -63,6 +63,7 @@ public class MigrationResource extends 
AbstractAuthGuardedRestResource {
 
   private final CRUDStorage<SpServiceRegistration> extensionsServiceStorage =
       getNoSqlStorage().getExtensionsServiceStorage();
+  private final IAdapterStorage adapterDescriptionStorage = 
getNoSqlStorage().getAdapterDescriptionStorage();
   private final IAdapterStorage adapterStorage = 
getNoSqlStorage().getAdapterInstanceStorage();
 
   private final IDataProcessorStorage dataProcessorStorage = 
getNoSqlStorage().getDataProcessorStorage();
@@ -119,7 +120,8 @@ public class MigrationResource extends 
AbstractAuthGuardedRestResource {
                 List.of(SpServiceTagPrefix.DATA_PROCESSOR, 
SpServiceTagPrefix.DATA_SINK)
             );
 
-            new 
AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig,
 adapterMigrations);
+            new AdapterMigrationManager(adapterStorage, 
adapterDescriptionStorage)
+              .handleMigrations(extensionsServiceConfig, adapterMigrations);
             new PipelineElementMigrationManager(
                 pipelineStorage,
                 dataProcessorStorage,
diff --git 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
index 9b00f555b6..e0b055f978 100644
--- 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
+++ 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
@@ -19,7 +19,11 @@ package org.apache.streampipes.storage.api;
 
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 
+import java.util.List;
+
 public interface IDataProcessorStorage extends 
CRUDStorage<DataProcessorDescription> {
 
   DataProcessorDescription getFirstDataProcessorByAppId(String appId);
+
+  List<DataProcessorDescription> getDataProcessorsByAppId(String appId);
 }
diff --git 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
index 1890d1e0a2..d82c02ba6c 100644
--- 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
+++ 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
@@ -19,8 +19,11 @@ package org.apache.streampipes.storage.api;
 
 import org.apache.streampipes.model.graph.DataSinkDescription;
 
+import java.util.List;
+
 public interface IDataSinkStorage extends CRUDStorage<DataSinkDescription> {
 
   DataSinkDescription getFirstDataSinkByAppId(String appId);
 
+  List<DataSinkDescription> getDataSinksByAppId(String appId);
 }
diff --git 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
index 3c02040549..447c174c26 100644
--- 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
+++ 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
@@ -21,6 +21,7 @@ import 
org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.storage.api.IDataProcessorStorage;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
 
+import java.util.List;
 import java.util.NoSuchElementException;
 
 public class DataProcessorStorageImpl extends 
DefaultCrudStorage<DataProcessorDescription>
@@ -40,13 +41,20 @@ public class DataProcessorStorageImpl extends 
DefaultCrudStorage<DataProcessorDe
 
   @Override
   public DataProcessorDescription getFirstDataProcessorByAppId(String appId) {
-    return this.findAll()
+    return getDataProcessorsByAppId(appId)
         .stream()
-        .filter(p -> p.getAppId().equals(appId))
         .findFirst()
         .orElseThrow(NoSuchElementException::new);
   }
 
+  @Override
+  public List<DataProcessorDescription> getDataProcessorsByAppId(String appId) 
{
+    return this.findAll()
+      .stream()
+      .filter(p -> p.getAppId().equals(appId))
+      .toList();
+  }
+
   private String getCurrentRev(String elementId) {
     return find(elementId).get().getRev();
   }
diff --git 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
index 1e3587fd2f..b53644c9e4 100644
--- 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
+++ 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
@@ -21,6 +21,8 @@ import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.storage.api.IDataSinkStorage;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
 
+import java.util.List;
+
 public class DataSinkStorageImpl extends 
DefaultCrudStorage<DataSinkDescription> implements IDataSinkStorage {
 
 
@@ -37,13 +39,20 @@ public class DataSinkStorageImpl extends 
DefaultCrudStorage<DataSinkDescription>
 
   @Override
   public DataSinkDescription getFirstDataSinkByAppId(String appId) {
-    return this.findAll()
+    return getDataSinksByAppId(appId)
         .stream()
-        .filter(s -> s.getAppId().equals(appId))
         .findFirst()
         .orElseThrow(IllegalArgumentException::new);
   }
 
+  @Override
+  public List<DataSinkDescription> getDataSinksByAppId(String appId) {
+    return findAll()
+      .stream()
+      .filter(s -> s.getAppId().equals(appId))
+      .toList();
+  }
+
   private String getCurrentRev(String elementId) {
     return find(elementId).get().getRev();
   }

Reply via email to