This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 669683a514 fix(#3131): Perform description migration only for 
installed extensions (#3132)
669683a514 is described below

commit 669683a51400524ab4c2403807384d527f572ff2
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Aug 12 15:24:21 2024 +0200

    fix(#3131): Perform description migration only for installed extensions 
(#3132)
    
    * fix(#3131): Perform description migration only for installed extensions
    
    * Fix checkstyle
---
 .../management/management/AdapterMigrationManager.java    | 11 ++++++++++-
 .../manager/migration/AbstractMigrationManager.java       | 15 +++++++++------
 .../migration/PipelineElementMigrationManager.java        | 12 ++++++++++++
 .../streampipes/rest/impl/admin/MigrationResource.java    |  4 +++-
 .../streampipes/storage/api/IDataProcessorStorage.java    |  4 ++++
 .../apache/streampipes/storage/api/IDataSinkStorage.java  |  3 +++
 .../storage/couchdb/impl/DataProcessorStorageImpl.java    | 12 ++++++++++--
 .../storage/couchdb/impl/DataSinkStorageImpl.java         | 13 +++++++++++--
 8 files changed, 62 insertions(+), 12 deletions(-)

diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
index e03b4bb168..b24e6494c7 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
@@ -22,6 +22,7 @@ import 
org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.manager.migration.AbstractMigrationManager;
 import org.apache.streampipes.manager.migration.IMigrationHandler;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.migration.ModelMigratorConfig;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 
@@ -36,9 +37,12 @@ public class AdapterMigrationManager extends 
AbstractMigrationManager implements
   private static final Logger LOG = 
LoggerFactory.getLogger(AdapterMigrationManager.class);
 
   private final IAdapterStorage adapterStorage;
+  private final IAdapterStorage adapterDescriptionStorage;
 
-  public AdapterMigrationManager(IAdapterStorage adapterStorage) {
+  public AdapterMigrationManager(IAdapterStorage adapterStorage,
+                                 IAdapterStorage adapterDescriptionStorage) {
     this.adapterStorage = adapterStorage;
+    this.adapterDescriptionStorage = adapterDescriptionStorage;
   }
 
   @Override
@@ -110,4 +114,9 @@ public class AdapterMigrationManager extends 
AbstractMigrationManager implements
       }
     }
   }
+
+  @Override
+  protected boolean isInstalled(SpServiceTagPrefix modelType, String appId) {
+    return !adapterDescriptionStorage.getAdaptersByAppId(appId).isEmpty();
+  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
index a27b19b400..1bc61b0ae2 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
@@ -23,6 +23,7 @@ import 
org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
 import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
 import org.apache.streampipes.model.extensions.migration.MigrationRequest;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.message.Notification;
 import org.apache.streampipes.model.migration.MigrationResult;
 import org.apache.streampipes.model.migration.ModelMigratorConfig;
@@ -119,14 +120,16 @@ public abstract class AbstractMigrationManager {
             )
         )
         .values()
-        .stream()
-        .peek(config -> {
-          var requestUrl = getRequestUrl(config.modelType(), 
config.targetAppId(), serviceUrl);
-          performUpdate(requestUrl);
-        })
-        .toList();
+        .forEach(config -> {
+          if (isInstalled(config.modelType(), config.targetAppId())) {
+            var requestUrl = getRequestUrl(config.modelType(), 
config.targetAppId(), serviceUrl);
+            performUpdate(requestUrl);
+          }
+        });
   }
 
+  protected abstract boolean isInstalled(SpServiceTagPrefix modelType, String 
appId);
+
   /**
    * Perform the update of the description based on the given requestUrl
    *
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index 5ac2ba0a58..7c84214d83 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.manager.migration;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.migration.MigrationResult;
@@ -259,4 +260,15 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
     }
     pipelineElement.setStaticProperties(updatedStaticProperties);
   }
+
+  @Override
+  protected boolean isInstalled(SpServiceTagPrefix modelType, String appId) {
+    if (modelType == SpServiceTagPrefix.DATA_PROCESSOR) {
+      return !dataProcessorStorage.getDataProcessorsByAppId(appId).isEmpty();
+    } else if (modelType == SpServiceTagPrefix.DATA_SINK) {
+      return !dataSinkStorage.getDataSinksByAppId(appId).isEmpty();
+    } else {
+      throw new RuntimeException(String.format("Wrong service tag provided: 
%s", modelType.asString()));
+    }
+  }
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
index a940e9d9b0..58baea3101 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
@@ -63,6 +63,7 @@ public class MigrationResource extends 
AbstractAuthGuardedRestResource {
 
   private final CRUDStorage<SpServiceRegistration> extensionsServiceStorage =
       getNoSqlStorage().getExtensionsServiceStorage();
+  private final IAdapterStorage adapterDescriptionStorage = 
getNoSqlStorage().getAdapterDescriptionStorage();
   private final IAdapterStorage adapterStorage = 
getNoSqlStorage().getAdapterInstanceStorage();
 
   private final IDataProcessorStorage dataProcessorStorage = 
getNoSqlStorage().getDataProcessorStorage();
@@ -119,7 +120,8 @@ public class MigrationResource extends 
AbstractAuthGuardedRestResource {
                 List.of(SpServiceTagPrefix.DATA_PROCESSOR, 
SpServiceTagPrefix.DATA_SINK)
             );
 
-            new 
AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig,
 adapterMigrations);
+            new AdapterMigrationManager(adapterStorage, 
adapterDescriptionStorage)
+              .handleMigrations(extensionsServiceConfig, adapterMigrations);
             new PipelineElementMigrationManager(
                 pipelineStorage,
                 dataProcessorStorage,
diff --git 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
index 9b00f555b6..e0b055f978 100644
--- 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
+++ 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
@@ -19,7 +19,11 @@ package org.apache.streampipes.storage.api;
 
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 
+import java.util.List;
+
 public interface IDataProcessorStorage extends 
CRUDStorage<DataProcessorDescription> {
 
   DataProcessorDescription getFirstDataProcessorByAppId(String appId);
+
+  List<DataProcessorDescription> getDataProcessorsByAppId(String appId);
 }
diff --git 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
index 1890d1e0a2..d82c02ba6c 100644
--- 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
+++ 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
@@ -19,8 +19,11 @@ package org.apache.streampipes.storage.api;
 
 import org.apache.streampipes.model.graph.DataSinkDescription;
 
+import java.util.List;
+
 public interface IDataSinkStorage extends CRUDStorage<DataSinkDescription> {
 
   DataSinkDescription getFirstDataSinkByAppId(String appId);
 
+  List<DataSinkDescription> getDataSinksByAppId(String appId);
 }
diff --git 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
index 3c02040549..447c174c26 100644
--- 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
+++ 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
@@ -21,6 +21,7 @@ import 
org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.storage.api.IDataProcessorStorage;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
 
+import java.util.List;
 import java.util.NoSuchElementException;
 
 public class DataProcessorStorageImpl extends 
DefaultCrudStorage<DataProcessorDescription>
@@ -40,13 +41,20 @@ public class DataProcessorStorageImpl extends 
DefaultCrudStorage<DataProcessorDe
 
   @Override
   public DataProcessorDescription getFirstDataProcessorByAppId(String appId) {
-    return this.findAll()
+    return getDataProcessorsByAppId(appId)
         .stream()
-        .filter(p -> p.getAppId().equals(appId))
         .findFirst()
         .orElseThrow(NoSuchElementException::new);
   }
 
+  @Override
+  public List<DataProcessorDescription> getDataProcessorsByAppId(String appId) 
{
+    return this.findAll()
+      .stream()
+      .filter(p -> p.getAppId().equals(appId))
+      .toList();
+  }
+
   private String getCurrentRev(String elementId) {
     return find(elementId).get().getRev();
   }
diff --git 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
index 1e3587fd2f..b53644c9e4 100644
--- 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
+++ 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
@@ -21,6 +21,8 @@ import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.storage.api.IDataSinkStorage;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
 
+import java.util.List;
+
 public class DataSinkStorageImpl extends 
DefaultCrudStorage<DataSinkDescription> implements IDataSinkStorage {
 
 
@@ -37,13 +39,20 @@ public class DataSinkStorageImpl extends 
DefaultCrudStorage<DataSinkDescription>
 
   @Override
   public DataSinkDescription getFirstDataSinkByAppId(String appId) {
-    return this.findAll()
+    return getDataSinksByAppId(appId)
         .stream()
-        .filter(s -> s.getAppId().equals(appId))
         .findFirst()
         .orElseThrow(IllegalArgumentException::new);
   }
 
+  @Override
+  public List<DataSinkDescription> getDataSinksByAppId(String appId) {
+    return findAll()
+      .stream()
+      .filter(s -> s.getAppId().equals(appId))
+      .toList();
+  }
+
   private String getCurrentRev(String elementId) {
     return find(elementId).get().getRev();
   }

Reply via email to