This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch SP-1114 in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit b5abec08cd9e937f628740df327d24ec5e93949b Author: Philipp Zehnder <[email protected]> AuthorDate: Tue Jan 17 18:31:50 2023 +0100 [#1114] Adapters can now be added without starting them --- .../management/AdapterMasterManagement.java | 31 +---- .../iiot/protocol/stream/FileStreamProtocol.java | 5 +- ui/cypress/support/utils/connect/ConnectBtns.ts | 14 ++- ui/cypress/support/utils/connect/ConnectUtils.ts | 55 +++++++-- .../adapter/createAdapterWithoutStarting.spec.ts | 47 ++++++++ ui/cypress/tests/adapter/editAdapter.smoke.spec.ts | 18 +-- .../src/lib/apis/adapter.service.ts | 8 +- .../adapter-options-panel.component.html | 1 + .../adapter-options-panel.component.ts | 5 +- .../start-adapter-configuration.component.html | 12 +- .../start-adapter-configuration.component.ts | 3 + .../adapter-started-dialog.component.html | 3 +- .../adapter-started-dialog.component.ts | 129 +++++++++++++-------- 13 files changed, 221 insertions(+), 110 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index c66701e27..dea69c4c8 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -29,19 +29,18 @@ import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription; import org.apache.streampipes.model.grounding.EventGrounding; +import org.apache.streampipes.model.util.ElementIdGenerator; import org.apache.streampipes.resource.management.AdapterResourceManager; import org.apache.streampipes.resource.management.DataStreamResourceManager; import org.apache.streampipes.resource.management.SpResourceManager; import org.apache.streampipes.storage.api.IAdapterStorage; import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl; -import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.List; -import java.util.UUID; /** * This class is responsible for managing all the adapter instances which are executed on worker nodes @@ -74,10 +73,8 @@ public class AdapterMasterManagement { throws AdapterException { // Create elementId for adapter - // TODO centralized provisioning of element id - String dataStreamElementId = "urn:streampipes.apache.org:eventstream:" + RandomStringUtils.randomAlphabetic(6); - String uuid = UUID.randomUUID().toString(); - ad.setElementId(ad.getElementId() + ":" + uuid); + String dataStreamElementId = ElementIdGenerator.makeElementId(SpDataStream.class); + ad.setElementId(ElementIdGenerator.makeElementId(ad)); ad.setCreatedAt(System.currentTimeMillis()); ad.setCorrespondingDataStreamElementId(dataStreamElementId); @@ -87,18 +84,13 @@ public class AdapterMasterManagement { String elementId = this.adapterResourceManager.encryptAndCreate(ad); - // start when stream adapter - if (ad instanceof AdapterStreamDescription) { - startStreamAdapter(elementId); - } - // Create stream SpDataStream storedDescription = new SourcesManagement().createAdapterDataStream(ad, dataStreamElementId); storedDescription.setCorrespondingAdapterId(elementId); installDataSource(storedDescription, principalSid, true); LOG.info("Install source (source URL: {} in backend", ad.getElementId()); - return storedDescription.getElementId(); + return ad.getElementId(); } public void updateAdapter(AdapterDescription ad, @@ -164,17 +156,6 @@ public class AdapterMasterManagement { return allAdapters; } - public List<AdapterDescription> getAllAdapterDescriptions() throws AdapterException { - - List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters(); - - if (allAdapters == null) { - throw new AdapterException("Could not get all adapters"); - } - - return allAdapters; - } - public void stopStreamAdapter(String elementId) throws AdapterException { AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId); @@ -212,11 +193,11 @@ public class AdapterMasterManagement { } } - private void updateDataSource(AdapterDescription ad) throws AdapterException { + private void updateDataSource(AdapterDescription ad) { // get data source SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId()); - dataStream = SourcesManagement.updateDataStream(ad, dataStream); + SourcesManagement.updateDataStream(ad, dataStream); // Update data source in database this.dataStreamResourceManager.update(dataStream); diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java index 4b588f882..fef486990 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java @@ -86,8 +86,8 @@ public class FileStreamProtocol extends Protocol { String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema()); - executor = Executors.newScheduledThreadPool(1); var eventProcessor = new LocalEventProcessor(adapterPipeline, timestampKey); + executor = Executors.newScheduledThreadPool(1); executor.scheduleAtFixedRate(() -> { try (InputStream dataInputStream = getDataFromEndpoint()) { @@ -132,7 +132,8 @@ public class FileStreamProtocol extends Protocol { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { - e.printStackTrace(); + logger.info("File stream adapter was stopped, the current replay is interuppted", e); + return false; } } } diff --git a/ui/cypress/support/utils/connect/ConnectBtns.ts b/ui/cypress/support/utils/connect/ConnectBtns.ts index 17887ff2a..f5fbf1451 100644 --- a/ui/cypress/support/utils/connect/ConnectBtns.ts +++ b/ui/cypress/support/utils/connect/ConnectBtns.ts @@ -47,8 +47,7 @@ export class ConnectBtns { .parent(); } - // ================================================================= - // Format button options + // ===================== Format button options ========================== public static json() { return cy.dataCy('connect-select-json-formats'); } @@ -82,4 +81,15 @@ export class ConnectBtns { } // ================================================================= + + // ===================== Adapter settings btns ========================== + public static adapterSettingsStartAdapter() { + return cy.dataCy('adapter-settings-start-adapter-btn'); + } + + public static startAdapterNowCheckbox() { + return cy.dataCy('start-adapter-now-checkbox'); + } + + // ======================================================================== } diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts b/ui/cypress/support/utils/connect/ConnectUtils.ts index feca0e004..88ae97032 100644 --- a/ui/cypress/support/utils/connect/ConnectUtils.ts +++ b/ui/cypress/support/utils/connect/ConnectUtils.ts @@ -30,6 +30,8 @@ import { ConnectBtns } from './ConnectBtns'; export class ConnectUtils { public static testSpecificStreamAdapter( adapterConfiguration: SpecificAdapterInput, + starting = true, + successElement = 'sp-connect-adapter-live-preview', ) { ConnectUtils.goToConnect(); @@ -53,7 +55,11 @@ export class ConnectUtils { ConnectEventSchemaUtils.finishEventSchemaConfiguration(); - ConnectUtils.startStreamAdapter(adapterConfiguration); + ConnectUtils.startStreamAdapter( + adapterConfiguration, + starting, + successElement, + ); } public static testGenericStreamAdapter( @@ -188,21 +194,27 @@ export class ConnectUtils { cy.get('#event-schema-next-button').click(); } - public static startStreamAdapter(adapterInput: AdapterInput) { - ConnectUtils.startAdapter( - adapterInput, - 'sp-connect-adapter-live-preview', - ); + public static startStreamAdapter( + adapterInput: AdapterInput, + starting = true, + successElement = 'sp-connect-adapter-live-preview', + ) { + ConnectUtils.startAdapter(adapterInput, successElement, starting); } public static startSetAdapter(adapterInput: AdapterInput) { ConnectUtils.startAdapter( adapterInput, 'sp-connect-adapter-set-success', + true, ); } - public static startAdapter(adapterInput: AdapterInput, successElement) { + public static startAdapter( + adapterInput: AdapterInput, + successElement: string, + starting: boolean, + ) { // Set adapter name cy.dataCy('sp-adapter-name').type(adapterInput.adapterName); @@ -215,8 +227,13 @@ export class ConnectUtils { .click(); } - // Start adapter - cy.get('#button-startAdapter').click(); + // Deselect auto start of adapter + if (!starting) { + ConnectBtns.startAdapterNowCheckbox().parent().click(); + } + + ConnectBtns.adapterSettingsStartAdapter().click(); + cy.dataCy(successElement, { timeout: 60000 }).should('be.visible'); this.closeAdapterPreview(); @@ -269,6 +286,26 @@ export class ConnectUtils { return adapterConfiguration; } + public static startAndValidateAdapter(amountOfProperties: number) { + ConnectBtns.startAdapter().should('not.be.disabled'); + + ConnectBtns.startAdapter().click(); + + // View data + ConnectBtns.infoAdapter().click(); + cy.get('div').contains('Values').parent().click(); + + // Validate resulting event + cy.dataCy('sp-connect-adapter-live-preview', { timeout: 10000 }).should( + 'be.visible', + ); + + // validate that three event properties + cy.get('.preview-row', { timeout: 10000 }) + .its('length') + .should('eq', amountOfProperties); + } + public static tearDownPreprocessingRuleTest( adapterConfiguration: AdapterInput, expectedFile: string, diff --git a/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts b/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts new file mode 100644 index 000000000..12ba2cced --- /dev/null +++ b/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { ConnectUtils } from '../../support/utils/connect/ConnectUtils'; +import { SpecificAdapterBuilder } from '../../support/builder/SpecificAdapterBuilder'; + +describe('Creates a new adapter without starting it', () => { + beforeEach('Setup Test', () => { + cy.initStreamPipesTest(); + }); + + it('Perform Test', () => { + const adapterInput = SpecificAdapterBuilder.create( + 'Machine_Data_Simulator', + ) + .setName('Machine Data Simulator Test') + .addInput('input', 'wait-time-ms', '1000') + .build(); + + ConnectUtils.testSpecificStreamAdapter( + adapterInput, + false, + 'sp-connect-adapter-not-started-success', + ); + + ConnectUtils.startAndValidateAdapter(7); + + ConnectUtils.closeAdapterPreview(); + + ConnectUtils.deleteAdapter(); + }); +}); diff --git a/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts b/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts index 336464f76..a6db9b08b 100644 --- a/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts +++ b/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts @@ -68,23 +68,7 @@ describe('Test Edit Adapter', () => { ConnectUtils.closeAdapterPreview(); - // Start Adapter - ConnectBtns.startAdapter().should('not.be.disabled'); - ConnectBtns.startAdapter().click(); - - // View data - ConnectBtns.infoAdapter().click(); - cy.get('div').contains('Values').parent().click(); - - // Validate resulting event - cy.dataCy('sp-connect-adapter-live-preview', { timeout: 10000 }).should( - 'be.visible', - ); - - // validate that three event properties - cy.get('.preview-row', { timeout: 10000 }) - .its('length') - .should('eq', 3); + ConnectUtils.startAndValidateAdapter(3); // Validate that name of adapter and data source cy.dataCy('adapter-name').contains(newAdapterName); diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts index 217e7c06c..436d66177 100644 --- a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts +++ b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts @@ -82,8 +82,12 @@ export class AdapterService { } startAdapter(adapter: AdapterDescriptionUnion): Observable<Message> { + return this.startAdapterByElementId(adapter.elementId); + } + + startAdapterByElementId(elementId: string): Observable<Message> { return this.http - .post(this.adapterMasterUrl + adapter.elementId + '/start', {}) + .post(this.adapterMasterUrl + elementId + '/start', {}) .pipe(map(response => Message.fromData(response as any))); } @@ -119,7 +123,7 @@ export class AdapterService { return `${this.connectPath}/master/description/${appId}/assets`; } - private get baseUrl() { + private static get baseUrl() { return '/streampipes-backend'; } } diff --git a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html index fe4fc3030..c219c181f 100644 --- a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html +++ b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html @@ -26,6 +26,7 @@ <div fxLayoutAlign="start start" class="mr-15"> <mat-checkbox (change)="optionSelectedEmitter.emit($event.checked)" + [checked]="isChecked" [attr.data-cy]="dataCy" class="large-checkbox" > diff --git a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.ts b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.ts index e18e4e380..c3599e094 100644 --- a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.ts +++ b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.ts @@ -16,7 +16,7 @@ * */ -import { Component, EventEmitter, Input, OnInit, Output } from '@angular/core'; +import { Component, EventEmitter, Input, Output } from '@angular/core'; @Component({ selector: 'sp-adapter-options-panel', @@ -36,6 +36,9 @@ export class SpAdapterOptionsPanelComponent { @Input() dataCy: string; + @Input() + isChecked = false; + @Output() optionSelectedEmitter: EventEmitter<boolean> = new EventEmitter<boolean>(); } diff --git a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.html b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.html index 6c85414eb..579285f21 100644 --- a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.html +++ b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.html @@ -47,6 +47,16 @@ </sp-basic-inner-panel> <div fxFlex="100" fxLayout="column"> + <sp-adapter-options-panel + optionTitle="Run adapter" + optionDescription="Start adapter now" + optionIcon="play_arrow" + dataCy="start-adapter-now-checkbox" + [isChecked]="startAdapterNow" + (optionSelectedEmitter)="startAdapterNow = $event" + > + </sp-adapter-options-panel> + <sp-adapter-options-panel optionTitle="Remove Duplicates" optionDescription="Avoid duplicated events within a certain time interval" @@ -150,7 +160,7 @@ *ngIf="!isEditMode" [disabled]="!startAdapterSettingsFormValid" mat-raised-button - id="button-startAdapter" + data-cy="adapter-settings-start-adapter-btn" color="accent" (click)="startAdapter()" mat-button diff --git a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts index fbff3bcb0..8f9a6f46f 100644 --- a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts +++ b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts @@ -88,6 +88,8 @@ export class StartAdapterConfigurationComponent implements OnInit { saveInDataLake = false; dataLakeTimestampField: string; + startAdapterNow = true; + constructor( private dialogService: DialogService, private shepherdService: ShepherdService, @@ -172,6 +174,7 @@ export class StartAdapterConfigurationComponent implements OnInit { saveInDataLake: this.saveInDataLake, dataLakeTimestampField: this.dataLakeTimestampField, editMode: false, + startAdapterNow: this.startAdapterNow, }, }); diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html index 4d969d9ab..bccae1295 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html @@ -77,6 +77,7 @@ > <i class="material-icons">done</i> <span + data-cy="sp-connect-adapter-not-started-success" > Your new data stream is now available in the pipeline editor.</span > @@ -98,7 +99,7 @@ </div> </div> - <div *ngIf="!isSetAdapter" fxFlex="100"> + <div *ngIf="!isSetAdapter && startAdapterNow" fxFlex="100"> <sp-pipeline-element-runtime-info [streamDescription]="streamDescription" [pollingActive]="pollingActive" diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts index aaa4941cf..bd150af65 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts @@ -67,6 +67,11 @@ export class AdapterStartedDialog implements OnInit { */ @Input() editMode = false; + /** + * This option will immediately start the adapter, when false it the adapter is only created and not started + */ + @Input() startAdapterNow = true; + constructor( public dialogRef: DialogRef<AdapterStartedDialog>, private adapterService: AdapterService, @@ -91,73 +96,97 @@ export class AdapterStartedDialog implements OnInit { } startAdapter() { - // const newAdapter = this.adapter; this.adapterService.addAdapter(this.adapter).subscribe(status => { this.adapterStatus = status; if (status.success) { - // Start preview on streams and message for sets - if ( - this.adapter instanceof GenericAdapterSetDescription || - this.adapter instanceof SpecificAdapterSetDescription - ) { - this.isSetAdapter = true; + const adapterElementId = status.notifications[0].title; + if (this.startAdapterNow) { + this.adapterService + .startAdapterByElementId(adapterElementId) + .subscribe(startStatus => { + this.showAdapterPreview( + startStatus, + adapterElementId, + ); + }); } else { - this.getLiveViewPreview(status); - } - - if (this.saveInDataLake) { - this.startSaveInDataLakePipeline(status); - } else { - this.adapterInstalled = true; + this.showAdapterPreview(status, adapterElementId); } } }); } + showAdapterPreview(status: Message, adapterElementId: string) { + // Start preview on streams and message for sets + if (status.success) { + if ( + this.adapter instanceof GenericAdapterSetDescription || + this.adapter instanceof SpecificAdapterSetDescription + ) { + this.isSetAdapter = true; + } else { + this.getLiveViewPreview(adapterElementId); + } + + if (this.saveInDataLake) { + this.startSaveInDataLakePipeline(adapterElementId); + } else { + this.adapterInstalled = true; + } + } + } + onCloseConfirm() { this.pollingActive = false; this.dialogRef.close('Confirm'); this.shepherdService.trigger('confirm_adapter_started_button'); } - private getLiveViewPreview(status: Message) { - this.restService - .getSourceDetails(status.notifications[0].title) - .subscribe(st => { - this.streamDescription = st; - this.pollingActive = true; - }); + private getLiveViewPreview(adapterElementId: string) { + this.adapterService.getAdapter(adapterElementId).subscribe(adapter => { + this.restService + .getSourceDetails(adapter.correspondingDataStreamElementId) + .subscribe(st => { + this.streamDescription = st; + this.pollingActive = true; + }); + }); } - private startSaveInDataLakePipeline(status: Message) { - const pipelineId = - 'org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate'; - this.pipelineTemplateService - .getPipelineTemplateInvocation( - status.notifications[0].title, - pipelineId, - ) - .subscribe(res => { - const pipelineName = 'Persist ' + this.adapter.name; - - const indexName = this.adapter.name; - - const pipelineInvocation = PipelineInvocationBuilder.create(res) - .setName(pipelineName) - .setTemplateId(pipelineId) - .setFreeTextStaticProperty('db_measurement', indexName) - .setMappingPropertyUnary( - 'timestamp_mapping', - 's0::' + this.dataLakeTimestampField, + private startSaveInDataLakePipeline(adapterElementId: string) { + this.adapterService.getAdapter(adapterElementId).subscribe(adapter => { + const pipelineId = + 'org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate'; + this.pipelineTemplateService + .getPipelineTemplateInvocation( + adapter.correspondingDataStreamElementId, + pipelineId, + ) + .subscribe(res => { + const pipelineName = 'Persist ' + this.adapter.name; + + const indexName = this.adapter.name; + + const pipelineInvocation = PipelineInvocationBuilder.create( + res, ) - .build(); - - this.pipelineTemplateService - .createPipelineTemplateInvocation(pipelineInvocation) - .subscribe(pipelineOperationStatus => { - this.pipelineOperationStatus = pipelineOperationStatus; - this.adapterInstalled = true; - }); - }); + .setName(pipelineName) + .setTemplateId(pipelineId) + .setFreeTextStaticProperty('db_measurement', indexName) + .setMappingPropertyUnary( + 'timestamp_mapping', + 's0::' + this.dataLakeTimestampField, + ) + .build(); + + this.pipelineTemplateService + .createPipelineTemplateInvocation(pipelineInvocation) + .subscribe(pipelineOperationStatus => { + this.pipelineOperationStatus = + pipelineOperationStatus; + this.adapterInstalled = true; + }); + }); + }); } }
