This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 669683a514 fix(#3131): Perform description migration only for
installed extensions (#3132)
669683a514 is described below
commit 669683a51400524ab4c2403807384d527f572ff2
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Aug 12 15:24:21 2024 +0200
fix(#3131): Perform description migration only for installed extensions
(#3132)
* fix(#3131): Perform description migration only for installed extensions
* Fix checkstyle
---
.../management/management/AdapterMigrationManager.java | 11 ++++++++++-
.../manager/migration/AbstractMigrationManager.java | 15 +++++++++------
.../migration/PipelineElementMigrationManager.java | 12 ++++++++++++
.../streampipes/rest/impl/admin/MigrationResource.java | 4 +++-
.../streampipes/storage/api/IDataProcessorStorage.java | 4 ++++
.../apache/streampipes/storage/api/IDataSinkStorage.java | 3 +++
.../storage/couchdb/impl/DataProcessorStorageImpl.java | 12 ++++++++++--
.../storage/couchdb/impl/DataSinkStorageImpl.java | 13 +++++++++++--
8 files changed, 62 insertions(+), 12 deletions(-)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
index e03b4bb168..b24e6494c7 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
@@ -22,6 +22,7 @@ import
org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.manager.migration.AbstractMigrationManager;
import org.apache.streampipes.manager.migration.IMigrationHandler;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.storage.api.IAdapterStorage;
@@ -36,9 +37,12 @@ public class AdapterMigrationManager extends
AbstractMigrationManager implements
private static final Logger LOG =
LoggerFactory.getLogger(AdapterMigrationManager.class);
private final IAdapterStorage adapterStorage;
+ private final IAdapterStorage adapterDescriptionStorage;
- public AdapterMigrationManager(IAdapterStorage adapterStorage) {
+ public AdapterMigrationManager(IAdapterStorage adapterStorage,
+ IAdapterStorage adapterDescriptionStorage) {
this.adapterStorage = adapterStorage;
+ this.adapterDescriptionStorage = adapterDescriptionStorage;
}
@Override
@@ -110,4 +114,9 @@ public class AdapterMigrationManager extends
AbstractMigrationManager implements
}
}
}
+
+ @Override
+ protected boolean isInstalled(SpServiceTagPrefix modelType, String appId) {
+ return !adapterDescriptionStorage.getAdaptersByAppId(appId).isEmpty();
+ }
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
index a27b19b400..1bc61b0ae2 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
@@ -23,6 +23,7 @@ import
org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
import org.apache.streampipes.model.extensions.migration.MigrationRequest;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.message.Notification;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
@@ -119,14 +120,16 @@ public abstract class AbstractMigrationManager {
)
)
.values()
- .stream()
- .peek(config -> {
- var requestUrl = getRequestUrl(config.modelType(),
config.targetAppId(), serviceUrl);
- performUpdate(requestUrl);
- })
- .toList();
+ .forEach(config -> {
+ if (isInstalled(config.modelType(), config.targetAppId())) {
+ var requestUrl = getRequestUrl(config.modelType(),
config.targetAppId(), serviceUrl);
+ performUpdate(requestUrl);
+ }
+ });
}
+ protected abstract boolean isInstalled(SpServiceTagPrefix modelType, String
appId);
+
/**
* Perform the update of the description based on the given requestUrl
*
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index 5ac2ba0a58..7c84214d83 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.manager.migration;
import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.migration.MigrationResult;
@@ -259,4 +260,15 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
}
pipelineElement.setStaticProperties(updatedStaticProperties);
}
+
+ @Override
+ protected boolean isInstalled(SpServiceTagPrefix modelType, String appId) {
+ if (modelType == SpServiceTagPrefix.DATA_PROCESSOR) {
+ return !dataProcessorStorage.getDataProcessorsByAppId(appId).isEmpty();
+ } else if (modelType == SpServiceTagPrefix.DATA_SINK) {
+ return !dataSinkStorage.getDataSinksByAppId(appId).isEmpty();
+ } else {
+ throw new RuntimeException(String.format("Wrong service tag provided:
%s", modelType.asString()));
+ }
+ }
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
index a940e9d9b0..58baea3101 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
@@ -63,6 +63,7 @@ public class MigrationResource extends
AbstractAuthGuardedRestResource {
private final CRUDStorage<SpServiceRegistration> extensionsServiceStorage =
getNoSqlStorage().getExtensionsServiceStorage();
+ private final IAdapterStorage adapterDescriptionStorage =
getNoSqlStorage().getAdapterDescriptionStorage();
private final IAdapterStorage adapterStorage =
getNoSqlStorage().getAdapterInstanceStorage();
private final IDataProcessorStorage dataProcessorStorage =
getNoSqlStorage().getDataProcessorStorage();
@@ -119,7 +120,8 @@ public class MigrationResource extends
AbstractAuthGuardedRestResource {
List.of(SpServiceTagPrefix.DATA_PROCESSOR,
SpServiceTagPrefix.DATA_SINK)
);
- new
AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig,
adapterMigrations);
+ new AdapterMigrationManager(adapterStorage,
adapterDescriptionStorage)
+ .handleMigrations(extensionsServiceConfig, adapterMigrations);
new PipelineElementMigrationManager(
pipelineStorage,
dataProcessorStorage,
diff --git
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
index 9b00f555b6..e0b055f978 100644
---
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
+++
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
@@ -19,7 +19,11 @@ package org.apache.streampipes.storage.api;
import org.apache.streampipes.model.graph.DataProcessorDescription;
+import java.util.List;
+
public interface IDataProcessorStorage extends
CRUDStorage<DataProcessorDescription> {
DataProcessorDescription getFirstDataProcessorByAppId(String appId);
+
+ List<DataProcessorDescription> getDataProcessorsByAppId(String appId);
}
diff --git
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
index 1890d1e0a2..d82c02ba6c 100644
---
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
+++
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
@@ -19,8 +19,11 @@ package org.apache.streampipes.storage.api;
import org.apache.streampipes.model.graph.DataSinkDescription;
+import java.util.List;
+
public interface IDataSinkStorage extends CRUDStorage<DataSinkDescription> {
DataSinkDescription getFirstDataSinkByAppId(String appId);
+ List<DataSinkDescription> getDataSinksByAppId(String appId);
}
diff --git
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
index 3c02040549..447c174c26 100644
---
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
+++
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
@@ -21,6 +21,7 @@ import
org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.storage.api.IDataProcessorStorage;
import org.apache.streampipes.storage.couchdb.utils.Utils;
+import java.util.List;
import java.util.NoSuchElementException;
public class DataProcessorStorageImpl extends
DefaultCrudStorage<DataProcessorDescription>
@@ -40,13 +41,20 @@ public class DataProcessorStorageImpl extends
DefaultCrudStorage<DataProcessorDe
@Override
public DataProcessorDescription getFirstDataProcessorByAppId(String appId) {
- return this.findAll()
+ return getDataProcessorsByAppId(appId)
.stream()
- .filter(p -> p.getAppId().equals(appId))
.findFirst()
.orElseThrow(NoSuchElementException::new);
}
+ @Override
+ public List<DataProcessorDescription> getDataProcessorsByAppId(String appId)
{
+ return this.findAll()
+ .stream()
+ .filter(p -> p.getAppId().equals(appId))
+ .toList();
+ }
+
private String getCurrentRev(String elementId) {
return find(elementId).get().getRev();
}
diff --git
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
index 1e3587fd2f..b53644c9e4 100644
---
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
+++
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
@@ -21,6 +21,8 @@ import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.storage.api.IDataSinkStorage;
import org.apache.streampipes.storage.couchdb.utils.Utils;
+import java.util.List;
+
public class DataSinkStorageImpl extends
DefaultCrudStorage<DataSinkDescription> implements IDataSinkStorage {
@@ -37,13 +39,20 @@ public class DataSinkStorageImpl extends
DefaultCrudStorage<DataSinkDescription>
@Override
public DataSinkDescription getFirstDataSinkByAppId(String appId) {
- return this.findAll()
+ return getDataSinksByAppId(appId)
.stream()
- .filter(s -> s.getAppId().equals(appId))
.findFirst()
.orElseThrow(IllegalArgumentException::new);
}
+ @Override
+ public List<DataSinkDescription> getDataSinksByAppId(String appId) {
+ return findAll()
+ .stream()
+ .filter(s -> s.getAppId().equals(appId))
+ .toList();
+ }
+
private String getCurrentRev(String elementId) {
return find(elementId).get().getRev();
}