This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1114
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit b5abec08cd9e937f628740df327d24ec5e93949b
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Jan 17 18:31:50 2023 +0100

    [#1114] Adapters can now be added without starting them
---
 .../management/AdapterMasterManagement.java        |  31 +----
 .../iiot/protocol/stream/FileStreamProtocol.java   |   5 +-
 ui/cypress/support/utils/connect/ConnectBtns.ts    |  14 ++-
 ui/cypress/support/utils/connect/ConnectUtils.ts   |  55 +++++++--
 .../adapter/createAdapterWithoutStarting.spec.ts   |  47 ++++++++
 ui/cypress/tests/adapter/editAdapter.smoke.spec.ts |  18 +--
 .../src/lib/apis/adapter.service.ts                |   8 +-
 .../adapter-options-panel.component.html           |   1 +
 .../adapter-options-panel.component.ts             |   5 +-
 .../start-adapter-configuration.component.html     |  12 +-
 .../start-adapter-configuration.component.ts       |   3 +
 .../adapter-started-dialog.component.html          |   3 +-
 .../adapter-started-dialog.component.ts            | 129 +++++++++++++--------
 13 files changed, 221 insertions(+), 110 deletions(-)

diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index c66701e27..dea69c4c8 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -29,19 +29,18 @@ import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
 import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.util.ElementIdGenerator;
 import org.apache.streampipes.resource.management.AdapterResourceManager;
 import org.apache.streampipes.resource.management.DataStreamResourceManager;
 import org.apache.streampipes.resource.management.SpResourceManager;
 import org.apache.streampipes.storage.api.IAdapterStorage;
 import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;
 
-import org.apache.commons.lang3.RandomStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 import java.util.List;
-import java.util.UUID;
 
 /**
  * This class is responsible for managing all the adapter instances which are 
executed on worker nodes
@@ -74,10 +73,8 @@ public class AdapterMasterManagement {
       throws AdapterException {
 
     // Create elementId for adapter
-    // TODO centralized provisioning of element id
-    String dataStreamElementId = "urn:streampipes.apache.org:eventstream:" + 
RandomStringUtils.randomAlphabetic(6);
-    String uuid = UUID.randomUUID().toString();
-    ad.setElementId(ad.getElementId() + ":" + uuid);
+    String dataStreamElementId = 
ElementIdGenerator.makeElementId(SpDataStream.class);
+    ad.setElementId(ElementIdGenerator.makeElementId(ad));
     ad.setCreatedAt(System.currentTimeMillis());
     ad.setCorrespondingDataStreamElementId(dataStreamElementId);
 
@@ -87,18 +84,13 @@ public class AdapterMasterManagement {
 
     String elementId = this.adapterResourceManager.encryptAndCreate(ad);
 
-    // start when stream adapter
-    if (ad instanceof AdapterStreamDescription) {
-      startStreamAdapter(elementId);
-    }
-
     // Create stream
     SpDataStream storedDescription = new 
SourcesManagement().createAdapterDataStream(ad, dataStreamElementId);
     storedDescription.setCorrespondingAdapterId(elementId);
     installDataSource(storedDescription, principalSid, true);
     LOG.info("Install source (source URL: {} in backend", ad.getElementId());
 
-    return storedDescription.getElementId();
+    return ad.getElementId();
   }
 
   public void updateAdapter(AdapterDescription ad,
@@ -164,17 +156,6 @@ public class AdapterMasterManagement {
     return allAdapters;
   }
 
-  public List<AdapterDescription> getAllAdapterDescriptions() throws 
AdapterException {
-
-    List<AdapterDescription> allAdapters = 
adapterInstanceStorage.getAllAdapters();
-
-    if (allAdapters == null) {
-      throw new AdapterException("Could not get all adapters");
-    }
-
-    return allAdapters;
-  }
-
   public void stopStreamAdapter(String elementId) throws AdapterException {
     AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);
 
@@ -212,11 +193,11 @@ public class AdapterMasterManagement {
     }
   }
 
-  private void updateDataSource(AdapterDescription ad) throws AdapterException 
{
+  private void updateDataSource(AdapterDescription ad) {
     // get data source
     SpDataStream dataStream = 
this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId());
 
-    dataStream = SourcesManagement.updateDataStream(ad, dataStream);
+    SourcesManagement.updateDataStream(ad, dataStream);
 
     // Update data source in database
     this.dataStreamResourceManager.update(dataStream);
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 4b588f882..fef486990 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -86,8 +86,8 @@ public class FileStreamProtocol extends Protocol {
     String timestampKey = 
getTimestampKey(adapterPipeline.getResultingEventSchema());
 
 
-    executor = Executors.newScheduledThreadPool(1);
     var eventProcessor = new LocalEventProcessor(adapterPipeline, 
timestampKey);
+    executor = Executors.newScheduledThreadPool(1);
 
     executor.scheduleAtFixedRate(() -> {
       try (InputStream dataInputStream = getDataFromEndpoint()) {
@@ -132,7 +132,8 @@ public class FileStreamProtocol extends Protocol {
             try {
               Thread.sleep(sleepTime);
             } catch (InterruptedException e) {
-              e.printStackTrace();
+              logger.info("File stream adapter was stopped, the current replay 
is interuppted", e);
+              return false;
             }
           }
         }
diff --git a/ui/cypress/support/utils/connect/ConnectBtns.ts 
b/ui/cypress/support/utils/connect/ConnectBtns.ts
index 17887ff2a..f5fbf1451 100644
--- a/ui/cypress/support/utils/connect/ConnectBtns.ts
+++ b/ui/cypress/support/utils/connect/ConnectBtns.ts
@@ -47,8 +47,7 @@ export class ConnectBtns {
             .parent();
     }
 
-    // =================================================================
-    // Format button options
+    // =====================  Format button options  ==========================
     public static json() {
         return cy.dataCy('connect-select-json-formats');
     }
@@ -82,4 +81,15 @@ export class ConnectBtns {
     }
 
     // =================================================================
+
+    // =====================  Adapter settings btns  ==========================
+    public static adapterSettingsStartAdapter() {
+        return cy.dataCy('adapter-settings-start-adapter-btn');
+    }
+
+    public static startAdapterNowCheckbox() {
+        return cy.dataCy('start-adapter-now-checkbox');
+    }
+
+    // ========================================================================
 }
diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts 
b/ui/cypress/support/utils/connect/ConnectUtils.ts
index feca0e004..88ae97032 100644
--- a/ui/cypress/support/utils/connect/ConnectUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectUtils.ts
@@ -30,6 +30,8 @@ import { ConnectBtns } from './ConnectBtns';
 export class ConnectUtils {
     public static testSpecificStreamAdapter(
         adapterConfiguration: SpecificAdapterInput,
+        starting = true,
+        successElement = 'sp-connect-adapter-live-preview',
     ) {
         ConnectUtils.goToConnect();
 
@@ -53,7 +55,11 @@ export class ConnectUtils {
 
         ConnectEventSchemaUtils.finishEventSchemaConfiguration();
 
-        ConnectUtils.startStreamAdapter(adapterConfiguration);
+        ConnectUtils.startStreamAdapter(
+            adapterConfiguration,
+            starting,
+            successElement,
+        );
     }
 
     public static testGenericStreamAdapter(
@@ -188,21 +194,27 @@ export class ConnectUtils {
         cy.get('#event-schema-next-button').click();
     }
 
-    public static startStreamAdapter(adapterInput: AdapterInput) {
-        ConnectUtils.startAdapter(
-            adapterInput,
-            'sp-connect-adapter-live-preview',
-        );
+    public static startStreamAdapter(
+        adapterInput: AdapterInput,
+        starting = true,
+        successElement = 'sp-connect-adapter-live-preview',
+    ) {
+        ConnectUtils.startAdapter(adapterInput, successElement, starting);
     }
 
     public static startSetAdapter(adapterInput: AdapterInput) {
         ConnectUtils.startAdapter(
             adapterInput,
             'sp-connect-adapter-set-success',
+            true,
         );
     }
 
-    public static startAdapter(adapterInput: AdapterInput, successElement) {
+    public static startAdapter(
+        adapterInput: AdapterInput,
+        successElement: string,
+        starting: boolean,
+    ) {
         // Set adapter name
         cy.dataCy('sp-adapter-name').type(adapterInput.adapterName);
 
@@ -215,8 +227,13 @@ export class ConnectUtils {
                 .click();
         }
 
-        // Start adapter
-        cy.get('#button-startAdapter').click();
+        // Deselect auto start of adapter
+        if (!starting) {
+            ConnectBtns.startAdapterNowCheckbox().parent().click();
+        }
+
+        ConnectBtns.adapterSettingsStartAdapter().click();
+
         cy.dataCy(successElement, { timeout: 60000 }).should('be.visible');
 
         this.closeAdapterPreview();
@@ -269,6 +286,26 @@ export class ConnectUtils {
         return adapterConfiguration;
     }
 
+    public static startAndValidateAdapter(amountOfProperties: number) {
+        ConnectBtns.startAdapter().should('not.be.disabled');
+
+        ConnectBtns.startAdapter().click();
+
+        // View data
+        ConnectBtns.infoAdapter().click();
+        cy.get('div').contains('Values').parent().click();
+
+        // Validate resulting event
+        cy.dataCy('sp-connect-adapter-live-preview', { timeout: 10000 
}).should(
+            'be.visible',
+        );
+
+        // validate that three event properties
+        cy.get('.preview-row', { timeout: 10000 })
+            .its('length')
+            .should('eq', amountOfProperties);
+    }
+
     public static tearDownPreprocessingRuleTest(
         adapterConfiguration: AdapterInput,
         expectedFile: string,
diff --git a/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts 
b/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts
new file mode 100644
index 000000000..12ba2cced
--- /dev/null
+++ b/ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { ConnectUtils } from '../../support/utils/connect/ConnectUtils';
+import { SpecificAdapterBuilder } from 
'../../support/builder/SpecificAdapterBuilder';
+
+describe('Creates a new adapter without starting it', () => {
+    beforeEach('Setup Test', () => {
+        cy.initStreamPipesTest();
+    });
+
+    it('Perform Test', () => {
+        const adapterInput = SpecificAdapterBuilder.create(
+            'Machine_Data_Simulator',
+        )
+            .setName('Machine Data Simulator Test')
+            .addInput('input', 'wait-time-ms', '1000')
+            .build();
+
+        ConnectUtils.testSpecificStreamAdapter(
+            adapterInput,
+            false,
+            'sp-connect-adapter-not-started-success',
+        );
+
+        ConnectUtils.startAndValidateAdapter(7);
+
+        ConnectUtils.closeAdapterPreview();
+
+        ConnectUtils.deleteAdapter();
+    });
+});
diff --git a/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts 
b/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts
index 336464f76..a6db9b08b 100644
--- a/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts
+++ b/ui/cypress/tests/adapter/editAdapter.smoke.spec.ts
@@ -68,23 +68,7 @@ describe('Test Edit Adapter', () => {
 
         ConnectUtils.closeAdapterPreview();
 
-        // Start Adapter
-        ConnectBtns.startAdapter().should('not.be.disabled');
-        ConnectBtns.startAdapter().click();
-
-        // View data
-        ConnectBtns.infoAdapter().click();
-        cy.get('div').contains('Values').parent().click();
-
-        // Validate resulting event
-        cy.dataCy('sp-connect-adapter-live-preview', { timeout: 10000 
}).should(
-            'be.visible',
-        );
-
-        // validate that three event properties
-        cy.get('.preview-row', { timeout: 10000 })
-            .its('length')
-            .should('eq', 3);
+        ConnectUtils.startAndValidateAdapter(3);
 
         // Validate that name of adapter and data source
         cy.dataCy('adapter-name').contains(newAdapterName);
diff --git 
a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts 
b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
index 217e7c06c..436d66177 100644
--- a/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/apis/adapter.service.ts
@@ -82,8 +82,12 @@ export class AdapterService {
     }
 
     startAdapter(adapter: AdapterDescriptionUnion): Observable<Message> {
+        return this.startAdapterByElementId(adapter.elementId);
+    }
+
+    startAdapterByElementId(elementId: string): Observable<Message> {
         return this.http
-            .post(this.adapterMasterUrl + adapter.elementId + '/start', {})
+            .post(this.adapterMasterUrl + elementId + '/start', {})
             .pipe(map(response => Message.fromData(response as any)));
     }
 
@@ -119,7 +123,7 @@ export class AdapterService {
         return `${this.connectPath}/master/description/${appId}/assets`;
     }
 
-    private get baseUrl() {
+    private static get baseUrl() {
         return '/streampipes-backend';
     }
 }
diff --git 
a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html
 
b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html
index fe4fc3030..c219c181f 100644
--- 
a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html
+++ 
b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.html
@@ -26,6 +26,7 @@
                 <div fxLayoutAlign="start start" class="mr-15">
                     <mat-checkbox
                         (change)="optionSelectedEmitter.emit($event.checked)"
+                        [checked]="isChecked"
                         [attr.data-cy]="dataCy"
                         class="large-checkbox"
                     >
diff --git 
a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.ts
 
b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.ts
index e18e4e380..c3599e094 100644
--- 
a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.ts
+++ 
b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/adapter-options-panel/adapter-options-panel.component.ts
@@ -16,7 +16,7 @@
  *
  */
 
-import { Component, EventEmitter, Input, OnInit, Output } from '@angular/core';
+import { Component, EventEmitter, Input, Output } from '@angular/core';
 
 @Component({
     selector: 'sp-adapter-options-panel',
@@ -36,6 +36,9 @@ export class SpAdapterOptionsPanelComponent {
     @Input()
     dataCy: string;
 
+    @Input()
+    isChecked = false;
+
     @Output()
     optionSelectedEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
 }
diff --git 
a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.html
 
b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.html
index 6c85414eb..579285f21 100644
--- 
a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.html
+++ 
b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.html
@@ -47,6 +47,16 @@
     </sp-basic-inner-panel>
 
     <div fxFlex="100" fxLayout="column">
+        <sp-adapter-options-panel
+            optionTitle="Run adapter"
+            optionDescription="Start adapter now"
+            optionIcon="play_arrow"
+            dataCy="start-adapter-now-checkbox"
+            [isChecked]="startAdapterNow"
+            (optionSelectedEmitter)="startAdapterNow = $event"
+        >
+        </sp-adapter-options-panel>
+
         <sp-adapter-options-panel
             optionTitle="Remove Duplicates"
             optionDescription="Avoid duplicated events within a certain time 
interval"
@@ -150,7 +160,7 @@
             *ngIf="!isEditMode"
             [disabled]="!startAdapterSettingsFormValid"
             mat-raised-button
-            id="button-startAdapter"
+            data-cy="adapter-settings-start-adapter-btn"
             color="accent"
             (click)="startAdapter()"
             mat-button
diff --git 
a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts
 
b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts
index fbff3bcb0..8f9a6f46f 100644
--- 
a/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts
+++ 
b/ui/src/app/connect/components/adapter-configuration/start-adapter-configuration/start-adapter-configuration.component.ts
@@ -88,6 +88,8 @@ export class StartAdapterConfigurationComponent implements 
OnInit {
     saveInDataLake = false;
     dataLakeTimestampField: string;
 
+    startAdapterNow = true;
+
     constructor(
         private dialogService: DialogService,
         private shepherdService: ShepherdService,
@@ -172,6 +174,7 @@ export class StartAdapterConfigurationComponent implements 
OnInit {
                 saveInDataLake: this.saveInDataLake,
                 dataLakeTimestampField: this.dataLakeTimestampField,
                 editMode: false,
+                startAdapterNow: this.startAdapterNow,
             },
         });
 
diff --git 
a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html
 
b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html
index 4d969d9ab..bccae1295 100644
--- 
a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html
+++ 
b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.html
@@ -77,6 +77,7 @@
                         >
                             <i class="material-icons">done</i>
                             <span
+                                
data-cy="sp-connect-adapter-not-started-success"
                                 >&nbsp;Your new data stream is now available in
                                 the pipeline editor.</span
                             >
@@ -98,7 +99,7 @@
                         </div>
                     </div>
 
-                    <div *ngIf="!isSetAdapter" fxFlex="100">
+                    <div *ngIf="!isSetAdapter && startAdapterNow" fxFlex="100">
                         <sp-pipeline-element-runtime-info
                             [streamDescription]="streamDescription"
                             [pollingActive]="pollingActive"
diff --git 
a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts 
b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
index aaa4941cf..bd150af65 100644
--- 
a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
+++ 
b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
@@ -67,6 +67,11 @@ export class AdapterStartedDialog implements OnInit {
      */
     @Input() editMode = false;
 
+    /**
+     * This option will immediately start the adapter, when false it the 
adapter is only created and not started
+     */
+    @Input() startAdapterNow = true;
+
     constructor(
         public dialogRef: DialogRef<AdapterStartedDialog>,
         private adapterService: AdapterService,
@@ -91,73 +96,97 @@ export class AdapterStartedDialog implements OnInit {
     }
 
     startAdapter() {
-        // const newAdapter = this.adapter;
         this.adapterService.addAdapter(this.adapter).subscribe(status => {
             this.adapterStatus = status;
             if (status.success) {
-                // Start preview on streams and message for sets
-                if (
-                    this.adapter instanceof GenericAdapterSetDescription ||
-                    this.adapter instanceof SpecificAdapterSetDescription
-                ) {
-                    this.isSetAdapter = true;
+                const adapterElementId = status.notifications[0].title;
+                if (this.startAdapterNow) {
+                    this.adapterService
+                        .startAdapterByElementId(adapterElementId)
+                        .subscribe(startStatus => {
+                            this.showAdapterPreview(
+                                startStatus,
+                                adapterElementId,
+                            );
+                        });
                 } else {
-                    this.getLiveViewPreview(status);
-                }
-
-                if (this.saveInDataLake) {
-                    this.startSaveInDataLakePipeline(status);
-                } else {
-                    this.adapterInstalled = true;
+                    this.showAdapterPreview(status, adapterElementId);
                 }
             }
         });
     }
 
+    showAdapterPreview(status: Message, adapterElementId: string) {
+        // Start preview on streams and message for sets
+        if (status.success) {
+            if (
+                this.adapter instanceof GenericAdapterSetDescription ||
+                this.adapter instanceof SpecificAdapterSetDescription
+            ) {
+                this.isSetAdapter = true;
+            } else {
+                this.getLiveViewPreview(adapterElementId);
+            }
+
+            if (this.saveInDataLake) {
+                this.startSaveInDataLakePipeline(adapterElementId);
+            } else {
+                this.adapterInstalled = true;
+            }
+        }
+    }
+
     onCloseConfirm() {
         this.pollingActive = false;
         this.dialogRef.close('Confirm');
         this.shepherdService.trigger('confirm_adapter_started_button');
     }
 
-    private getLiveViewPreview(status: Message) {
-        this.restService
-            .getSourceDetails(status.notifications[0].title)
-            .subscribe(st => {
-                this.streamDescription = st;
-                this.pollingActive = true;
-            });
+    private getLiveViewPreview(adapterElementId: string) {
+        this.adapterService.getAdapter(adapterElementId).subscribe(adapter => {
+            this.restService
+                .getSourceDetails(adapter.correspondingDataStreamElementId)
+                .subscribe(st => {
+                    this.streamDescription = st;
+                    this.pollingActive = true;
+                });
+        });
     }
 
-    private startSaveInDataLakePipeline(status: Message) {
-        const pipelineId =
-            
'org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate';
-        this.pipelineTemplateService
-            .getPipelineTemplateInvocation(
-                status.notifications[0].title,
-                pipelineId,
-            )
-            .subscribe(res => {
-                const pipelineName = 'Persist ' + this.adapter.name;
-
-                const indexName = this.adapter.name;
-
-                const pipelineInvocation = 
PipelineInvocationBuilder.create(res)
-                    .setName(pipelineName)
-                    .setTemplateId(pipelineId)
-                    .setFreeTextStaticProperty('db_measurement', indexName)
-                    .setMappingPropertyUnary(
-                        'timestamp_mapping',
-                        's0::' + this.dataLakeTimestampField,
+    private startSaveInDataLakePipeline(adapterElementId: string) {
+        this.adapterService.getAdapter(adapterElementId).subscribe(adapter => {
+            const pipelineId =
+                
'org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate';
+            this.pipelineTemplateService
+                .getPipelineTemplateInvocation(
+                    adapter.correspondingDataStreamElementId,
+                    pipelineId,
+                )
+                .subscribe(res => {
+                    const pipelineName = 'Persist ' + this.adapter.name;
+
+                    const indexName = this.adapter.name;
+
+                    const pipelineInvocation = 
PipelineInvocationBuilder.create(
+                        res,
                     )
-                    .build();
-
-                this.pipelineTemplateService
-                    .createPipelineTemplateInvocation(pipelineInvocation)
-                    .subscribe(pipelineOperationStatus => {
-                        this.pipelineOperationStatus = pipelineOperationStatus;
-                        this.adapterInstalled = true;
-                    });
-            });
+                        .setName(pipelineName)
+                        .setTemplateId(pipelineId)
+                        .setFreeTextStaticProperty('db_measurement', indexName)
+                        .setMappingPropertyUnary(
+                            'timestamp_mapping',
+                            's0::' + this.dataLakeTimestampField,
+                        )
+                        .build();
+
+                    this.pipelineTemplateService
+                        .createPipelineTemplateInvocation(pipelineInvocation)
+                        .subscribe(pipelineOperationStatus => {
+                            this.pipelineOperationStatus =
+                                pipelineOperationStatus;
+                            this.adapterInstalled = true;
+                        });
+                });
+        });
     }
 }

Reply via email to