This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch
3131-migration-breaks-core-when-trying-to-migrate-non-existing-extension
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3131-migration-breaks-core-when-trying-to-migrate-non-existing-extension
by this push:
new 5bd7baf03a fix(#3131): Perform description migration only for
installed extensions
5bd7baf03a is described below
commit 5bd7baf03a7241aa7bf6b643f39ccc5d72c75c18
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Aug 12 12:52:53 2024 +0200
fix(#3131): Perform description migration only for installed extensions
---
.../management/AdapterMigrationManager.java | 11 +-
.../migration/AbstractMigrationManager.java | 15 ++-
.../migration/PipelineElementMigrationManager.java | 126 +++++++++++----------
.../rest/impl/admin/MigrationResource.java | 4 +-
.../storage/api/IDataProcessorStorage.java | 4 +
.../streampipes/storage/api/IDataSinkStorage.java | 3 +
.../couchdb/impl/DataProcessorStorageImpl.java | 12 +-
.../storage/couchdb/impl/DataSinkStorageImpl.java | 13 ++-
8 files changed, 119 insertions(+), 69 deletions(-)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
index e03b4bb168..b24e6494c7 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMigrationManager.java
@@ -22,6 +22,7 @@ import
org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.manager.migration.AbstractMigrationManager;
import org.apache.streampipes.manager.migration.IMigrationHandler;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.storage.api.IAdapterStorage;
@@ -36,9 +37,12 @@ public class AdapterMigrationManager extends
AbstractMigrationManager implements
private static final Logger LOG =
LoggerFactory.getLogger(AdapterMigrationManager.class);
private final IAdapterStorage adapterStorage;
+ private final IAdapterStorage adapterDescriptionStorage;
- public AdapterMigrationManager(IAdapterStorage adapterStorage) {
+ public AdapterMigrationManager(IAdapterStorage adapterStorage,
+ IAdapterStorage adapterDescriptionStorage) {
this.adapterStorage = adapterStorage;
+ this.adapterDescriptionStorage = adapterDescriptionStorage;
}
@Override
@@ -110,4 +114,9 @@ public class AdapterMigrationManager extends
AbstractMigrationManager implements
}
}
}
+
+ @Override
+ protected boolean isInstalled(SpServiceTagPrefix modelType, String appId) {
+ return !adapterDescriptionStorage.getAdaptersByAppId(appId).isEmpty();
+ }
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
index a27b19b400..1bc61b0ae2 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
@@ -23,6 +23,7 @@ import
org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
import org.apache.streampipes.model.extensions.migration.MigrationRequest;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.message.Notification;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
@@ -119,14 +120,16 @@ public abstract class AbstractMigrationManager {
)
)
.values()
- .stream()
- .peek(config -> {
- var requestUrl = getRequestUrl(config.modelType(),
config.targetAppId(), serviceUrl);
- performUpdate(requestUrl);
- })
- .toList();
+ .forEach(config -> {
+ if (isInstalled(config.modelType(), config.targetAppId())) {
+ var requestUrl = getRequestUrl(config.modelType(),
config.targetAppId(), serviceUrl);
+ performUpdate(requestUrl);
+ }
+ });
}
+ protected abstract boolean isInstalled(SpServiceTagPrefix modelType, String
appId);
+
/**
* Perform the update of the description based on the given requestUrl
*
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index 5ac2ba0a58..2c9bd8c4f5 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.manager.migration;
import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.migration.MigrationResult;
@@ -67,12 +68,12 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
LOG.info("Pipeline element descriptions are up to date.");
LOG.info("Received {} pipeline element migrations from extension service
{}.",
- migrationConfigs.size(),
- extensionsServiceConfig.getServiceUrl());
+ migrationConfigs.size(),
+ extensionsServiceConfig.getServiceUrl());
var availablePipelines = pipelineStorage.findAll();
if (!availablePipelines.isEmpty()) {
LOG.info("Found {} available pipelines. Checking pipelines for
applicable migrations...",
- availablePipelines.size()
+ availablePipelines.size()
);
}
@@ -81,45 +82,45 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
List<MigrationResult<?>> failedMigrations = new ArrayList<>();
var migratedDataProcessors = pipeline.getSepas()
- .stream()
- .map(processor -> {
- if (getApplicableMigration(processor,
migrationConfigs).isPresent()) {
- return migratePipelineElement(
- processor,
- migrationConfigs,
- String.format("%s/%s/processor",
- extensionsServiceConfig.getServiceUrl(),
- MIGRATION_ENDPOINT
- ),
- failedMigrations
- );
- } else {
- LOG.info("No migration applicable for data processor '{}'.",
processor.getElementId());
- return processor;
- }
- })
- .toList();
+ .stream()
+ .map(processor -> {
+ if (getApplicableMigration(processor,
migrationConfigs).isPresent()) {
+ return migratePipelineElement(
+ processor,
+ migrationConfigs,
+ String.format("%s/%s/processor",
+ extensionsServiceConfig.getServiceUrl(),
+ MIGRATION_ENDPOINT
+ ),
+ failedMigrations
+ );
+ } else {
+ LOG.info("No migration applicable for data processor '{}'.",
processor.getElementId());
+ return processor;
+ }
+ })
+ .toList();
pipeline.setSepas(migratedDataProcessors);
var migratedDataSinks = pipeline.getActions()
- .stream()
- .map(sink -> {
- if (getApplicableMigration(sink,
migrationConfigs).isPresent()) {
- return migratePipelineElement(
- sink,
- migrationConfigs,
- String.format("%s/%s/sink",
- extensionsServiceConfig.getServiceUrl(),
- MIGRATION_ENDPOINT
- ),
- failedMigrations
- );
- } else {
- LOG.info("No migration applicable for data sink '{}'.",
sink.getElementId());
- return sink;
- }
- })
- .toList();
+ .stream()
+ .map(sink -> {
+ if (getApplicableMigration(sink, migrationConfigs).isPresent()) {
+ return migratePipelineElement(
+ sink,
+ migrationConfigs,
+ String.format("%s/%s/sink",
+ extensionsServiceConfig.getServiceUrl(),
+ MIGRATION_ENDPOINT
+ ),
+ failedMigrations
+ );
+ } else {
+ LOG.info("No migration applicable for data sink '{}'.",
sink.getElementId());
+ return sink;
+ }
+ })
+ .toList();
pipeline.setActions(migratedDataSinks);
pipelineStorage.updateElement(pipeline);
@@ -140,8 +141,8 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
private boolean shouldMigratePipeline(Pipeline pipeline,
List<ModelMigratorConfig>
migrationConfigs) {
return Stream
- .concat(pipeline.getSepas().stream(), pipeline.getActions().stream())
- .anyMatch(element -> getApplicableMigration(element,
migrationConfigs).isPresent());
+ .concat(pipeline.getSepas().stream(), pipeline.getActions().stream())
+ .anyMatch(element -> getApplicableMigration(element,
migrationConfigs).isPresent());
}
/**
@@ -159,10 +160,10 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
*/
protected void handleFailedMigrations(Pipeline pipeline,
List<MigrationResult<?>> failedMigrations) {
LOG.error("Failures in migration detected - The following pipeline
elements could to be migrated:\n"
- +
StringUtils.join(failedMigrations.stream().map(Record::toString).toList()),
"\n");
+ +
StringUtils.join(failedMigrations.stream().map(Record::toString).toList()),
"\n");
pipeline.setPipelineNotifications(failedMigrations.stream().map(
- failedMigration -> "Failed migration of pipeline element:
%s".formatted(failedMigration.message())
+ failedMigration -> "Failed migration of pipeline element:
%s".formatted(failedMigration.message())
).toList());
pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
@@ -200,10 +201,10 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
* @return the migrated (or - in case of a failure - updated) pipeline
element
*/
protected <T extends InvocableStreamPipesEntity> T migratePipelineElement(
- T pipelineElement,
- List<ModelMigratorConfig> modelMigrations,
- String url,
- List<MigrationResult<?>> failedMigrations
+ T pipelineElement,
+ List<ModelMigratorConfig> modelMigrations,
+ String url,
+ List<MigrationResult<?>> failedMigrations
) {
// loop until no migrations are available anymore
@@ -213,15 +214,15 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
var migrationConfig = getApplicableMigration(pipelineElement,
modelMigrations).get();
LOG.info(
- "Found applicable migration for pipeline element '{}': {}",
- pipelineElement.getElementId(),
- migrationConfig
+ "Found applicable migration for pipeline element '{}': {}",
+ pipelineElement.getElementId(),
+ migrationConfig
);
var migrationResult = performMigration(
- pipelineElement,
- migrationConfig,
- url
+ pipelineElement,
+ migrationConfig,
+ url
);
if (migrationResult.success()) {
@@ -250,13 +251,24 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
List<StaticProperty> updatedStaticProperties = new ArrayList<>();
if (pipelineElement instanceof DataProcessorInvocation) {
updatedStaticProperties = dataProcessorStorage
- .getFirstDataProcessorByAppId(pipelineElement.getAppId())
- .getStaticProperties();
+ .getFirstDataProcessorByAppId(pipelineElement.getAppId())
+ .getStaticProperties();
} else if (pipelineElement instanceof DataSinkInvocation) {
updatedStaticProperties = dataSinkStorage
- .getFirstDataSinkByAppId(pipelineElement.getAppId())
- .getStaticProperties();
+ .getFirstDataSinkByAppId(pipelineElement.getAppId())
+ .getStaticProperties();
}
pipelineElement.setStaticProperties(updatedStaticProperties);
}
+
+ @Override
+ protected boolean isInstalled(SpServiceTagPrefix modelType, String appId) {
+ if (modelType == SpServiceTagPrefix.DATA_PROCESSOR) {
+ return !dataProcessorStorage.getDataProcessorsByAppId(appId).isEmpty();
+ } else if (modelType == SpServiceTagPrefix.DATA_SINK) {
+ return !dataSinkStorage.getDataSinksByAppId(appId).isEmpty();
+ } else {
+ throw new RuntimeException(String.format("Wrong service tag provided:
%s", modelType.asString()));
+ }
+ }
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
index a940e9d9b0..58baea3101 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java
@@ -63,6 +63,7 @@ public class MigrationResource extends
AbstractAuthGuardedRestResource {
private final CRUDStorage<SpServiceRegistration> extensionsServiceStorage =
getNoSqlStorage().getExtensionsServiceStorage();
+ private final IAdapterStorage adapterDescriptionStorage =
getNoSqlStorage().getAdapterDescriptionStorage();
private final IAdapterStorage adapterStorage =
getNoSqlStorage().getAdapterInstanceStorage();
private final IDataProcessorStorage dataProcessorStorage =
getNoSqlStorage().getDataProcessorStorage();
@@ -119,7 +120,8 @@ public class MigrationResource extends
AbstractAuthGuardedRestResource {
List.of(SpServiceTagPrefix.DATA_PROCESSOR,
SpServiceTagPrefix.DATA_SINK)
);
- new
AdapterMigrationManager(adapterStorage).handleMigrations(extensionsServiceConfig,
adapterMigrations);
+ new AdapterMigrationManager(adapterStorage,
adapterDescriptionStorage)
+ .handleMigrations(extensionsServiceConfig, adapterMigrations);
new PipelineElementMigrationManager(
pipelineStorage,
dataProcessorStorage,
diff --git
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
index 9b00f555b6..e0b055f978 100644
---
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
+++
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataProcessorStorage.java
@@ -19,7 +19,11 @@ package org.apache.streampipes.storage.api;
import org.apache.streampipes.model.graph.DataProcessorDescription;
+import java.util.List;
+
public interface IDataProcessorStorage extends
CRUDStorage<DataProcessorDescription> {
DataProcessorDescription getFirstDataProcessorByAppId(String appId);
+
+ List<DataProcessorDescription> getDataProcessorsByAppId(String appId);
}
diff --git
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
index 1890d1e0a2..d82c02ba6c 100644
---
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
+++
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataSinkStorage.java
@@ -19,8 +19,11 @@ package org.apache.streampipes.storage.api;
import org.apache.streampipes.model.graph.DataSinkDescription;
+import java.util.List;
+
public interface IDataSinkStorage extends CRUDStorage<DataSinkDescription> {
DataSinkDescription getFirstDataSinkByAppId(String appId);
+ List<DataSinkDescription> getDataSinksByAppId(String appId);
}
diff --git
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
index 3c02040549..447c174c26 100644
---
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
+++
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataProcessorStorageImpl.java
@@ -21,6 +21,7 @@ import
org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.storage.api.IDataProcessorStorage;
import org.apache.streampipes.storage.couchdb.utils.Utils;
+import java.util.List;
import java.util.NoSuchElementException;
public class DataProcessorStorageImpl extends
DefaultCrudStorage<DataProcessorDescription>
@@ -40,13 +41,20 @@ public class DataProcessorStorageImpl extends
DefaultCrudStorage<DataProcessorDe
@Override
public DataProcessorDescription getFirstDataProcessorByAppId(String appId) {
- return this.findAll()
+ return getDataProcessorsByAppId(appId)
.stream()
- .filter(p -> p.getAppId().equals(appId))
.findFirst()
.orElseThrow(NoSuchElementException::new);
}
+ @Override
+ public List<DataProcessorDescription> getDataProcessorsByAppId(String appId)
{
+ return this.findAll()
+ .stream()
+ .filter(p -> p.getAppId().equals(appId))
+ .toList();
+ }
+
private String getCurrentRev(String elementId) {
return find(elementId).get().getRev();
}
diff --git
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
index 1e3587fd2f..b53644c9e4 100644
---
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
+++
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataSinkStorageImpl.java
@@ -21,6 +21,8 @@ import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.storage.api.IDataSinkStorage;
import org.apache.streampipes.storage.couchdb.utils.Utils;
+import java.util.List;
+
public class DataSinkStorageImpl extends
DefaultCrudStorage<DataSinkDescription> implements IDataSinkStorage {
@@ -37,13 +39,20 @@ public class DataSinkStorageImpl extends
DefaultCrudStorage<DataSinkDescription>
@Override
public DataSinkDescription getFirstDataSinkByAppId(String appId) {
- return this.findAll()
+ return getDataSinksByAppId(appId)
.stream()
- .filter(s -> s.getAppId().equals(appId))
.findFirst()
.orElseThrow(IllegalArgumentException::new);
}
+ @Override
+ public List<DataSinkDescription> getDataSinksByAppId(String appId) {
+ return findAll()
+ .stream()
+ .filter(s -> s.getAppId().equals(appId))
+ .toList();
+ }
+
private String getCurrentRev(String elementId) {
return find(elementId).get().getRev();
}