Hi Faizan,

there is a method appendEnvironmentConfig which you can overwrite in case 
additional configurations are needed!
This method is called in line 184, after the environment has been created. 
But if you need a protected member, I don't think this would break anything and 
we could merge it easily.

Best,
Dominik

-----Original Message-----
From: Muhammad Faizan <[email protected]> 
Sent: Thursday, March 25, 2021 4:15 PM
To: [email protected]; Patrick Wiener <[email protected]>
Cc: Christian Prehofer <[email protected]>
Subject: RE: Streampipes für Automotive


Hi Patrick,

Thanks for the resolution. It works fine now for stopping/starting individual 
flink PE.

Btw, is there any reason why "StreamExecutionEnvironment env;" is marked as 
private member in line 64, should it be protected member so that we can access 
it in our streampipes- extension codebase and configure other things like flink 
checkpoints etc.?

[1] 
https://github.com/apache/incubator-streampipes/blob/f65b27c65b9f7be3189a51a48c3c3d684617c467/streampipes-wrapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java#L64

Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu

i.A. Muhammad Faizan
Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] / 
www.denso.com



-----Original Message-----
From: Patrick Wiener <[email protected]>
Sent: Tuesday, March 23, 2021 4:07 PM
To: [email protected]
Subject: Re: Streampipes für Automotive

Hi Faizan,

I re-triggered the build - so your changes should be their. 

Patrick

> Am 23.03.2021 um 14:07 schrieb Muhammad Faizan <[email protected]>:
> 
> Hi Patrick,
> 
> Thanks for merging the PR. Will it also trigger the rebuild of 
> streampipes extensions (flink ones)? Especially 
> pipeline-elements-all-flink? #docker-containers
> 
> 
> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
> 
> i.A. Muhammad Faizan
> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] / 
> www.denso.com
> 
> 
> 
> -----Original Message-----
> From: Muhammad Faizan <[email protected]>
> Sent: Tuesday, March 23, 2021 11:15 AM
> To: [email protected]; Patrick Wiener <[email protected]>
> Cc: Christian Prehofer <[email protected]>
> Subject: RE: Streampipes für Automotive
> 
> Hi Patrick,
> 
> Please review the PR: 
> https://github.com/apache/incubator-streampipes/pull/33
> 
> Let me know if any comments!
> 
> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
> 
> i.A. Muhammad Faizan
> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] / 
> www.denso.com
> 
> 
> 
> -----Original Message-----
> From: Patrick Wiener <[email protected]>
> Sent: Monday, March 22, 2021 6:23 PM
> To: [email protected]
> Cc: Christian Prehofer <[email protected]>
> Subject: Re: Streampipes für Automotive
> 
> Hi Faizan,
> 
> would be nice if you could provide a PR :)
> 
> Best
> Patrick
> 
>> Am 22.03.2021 um 17:18 schrieb Muhammad Faizan <[email protected]>:
>> 
>> Hi Patrick,
>> 
>> Exactly, it should also check the flink status in bindRuntime() method.
>> And also ideally, the getJobStatus() method (line 295[1]) should not have 
>> the filter for “RUNNING”, it should be returning job with any status, and 
>> rest should be handled by the caller methods.
>> 
>> Will you resolve the issue? Or should I come up with a PR?
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.A. Muhammad Faizan
>> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Patrick Wiener <[email protected] <mailto:[email protected]>>
>> Sent: Monday, March 22, 2021 3:50 PM
>> To: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]>>; [email protected] 
>> <mailto:[email protected]>
>> Cc: Christian Prehofer <[email protected] 
>> <mailto:[email protected]>>
>> Subject: Re: Streampipes für Automotive
>> 
>> Hi Faizan,
>> 
>> looking at this part of the codebase I would suggest to handle this as part 
>> of the FlinkRuntime class (so option 2).
>> 
>> What I stumbled upon is that in the case of starting a Flink PE we 
>> also currently not explicitly  check the Flink state as part of the 
>> bindRuntime() method (Line 218 in [1]).
>> 
>> Maybe this can be combined when resolving this issue. What do you think?
>> 
>> Patrick
>> 
>> [1]
>> https://github.com/apache/incubator-streampipes/blob/dev/streampipes-
>> w
>> rapper-flink/src/main/java/org/apache/streampipes/wrapper/flink/Flink
>> R
>> untime.java
>> 
>> 
>> Am 22.03.2021 um 15:13 schrieb Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] 
>> <mailto:[email protected]>>>:
>> 
>> Hi,
>> 
>> I have already reported the issue on Jira 
>> (https://issues.apache.org/jira/projects/STREAMPIPES/issues/STREAMPIPES-321?filter=allopenissues
>>  
>> <https://issues.apache.org/jira/projects/STREAMPIPES/issues/STREAMPIPES-321?filter=allopenissues>).
>>  Please give it a read before continuing.
>> 
>> I am currently implementing a service management layer on-top of StreamPipes 
>> and Flink for better controllability over the individual pieces of the 
>> pipelines.
>> This issue is a blocking point for me right now. So I am trying to figure 
>> out a way around.
>> 
>> As the code is inside the streampipes scope, so we need to change it there 
>> to fix the issue. Right now, I see two approaches:
>> 1.     Put the line 141 
>> (https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf[…]streampipes/container/api/InvocablePipelineElementResource.java<https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf7e4b144d6fe/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java#L141>
>>  
>> <https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf[%E2%80%A6]streampipes/container/api/InvocablePipelineElementResource.java%3Chttps://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf7e4b144d6fe/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java#L141>>)
>>  inside a try/catch block, and execute line 144 in any case.
>> 
>> 
>> 1.  Or, make a change in FlinkRuntime.java 
>> (https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf[…]ain/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java<https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf%5b%E2%80%A6%5dain/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java>
>>  
>> <https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf[%E2%80%A6]ain/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java%3Chttps://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf%5b%E2%80%A6%5dain/java/org/apache/streampipes/wrapper/flink/FlinkRuntime.java%3E>)
>>  in method discardRuntime() to also check if its present and its state is 
>> running then cancel the job, else check if its status is CANCELLED or 
>> FAILED, then return and don't throw exception. Otherwise throw exception.
>> 
>> Let me know what do you think?
>> 
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.A. Muhammad Faizan
>> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Sent: Monday, March 22, 2021 2:13 PM
>> To: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Cc: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Philipp Zehnder <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: Re: Streampipes für Automotive
>> 
>> btw: the JobStatusMessage class of the Flink runtime contains a status enum 
>> with the following options:
>> 
>> 
>> CREATED(JobStatus.TerminalState.NON_TERMINAL),
>> RUNNING(JobStatus.TerminalState.NON_TERMINAL),
>> FAILING(JobStatus.TerminalState.NON_TERMINAL),
>> FAILED(JobStatus.TerminalState.GLOBALLY),
>> CANCELLING(JobStatus.TerminalState.NON_TERMINAL),
>> CANCELED(JobStatus.TerminalState.GLOBALLY),
>> FINISHED(JobStatus.TerminalState.GLOBALLY),
>> RESTARTING(JobStatus.TerminalState.NON_TERMINAL),
>> SUSPENDED(JobStatus.TerminalState.LOCALLY),
>> RECONCILING(JobStatus.TerminalState.NON_TERMINAL);
>> So as mentioned in the previous mail you could check for any of the 
>> required states and execute the necessary logic accordingly.
>> 
>> 
>> 
>> 
>> 
>> Am 22.03.2021 um 14:10 schrieb Patrick Wiener <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>:
>> 
>> Hi Faizan,
>> 
>> at best you’d be given a list of all jobs by the flink rest api including 
>> the failed/stopped one (see Line 298 in FlinkRuntime.java):
>> 
>> 
>> RestClusterClient<String> restClient = getRestClient(); 
>> CompletableFuture<Collection<JobStatusMessage>> jobs = 
>> restClient.listJobs(); Here, we filter based on the condition 
>> „RUNNING“. My hope would be that you could similarly check for 
>> „FAILED/CANCELLED“ or whatever state the flink jobmanager API returns.
>> 
>> Did you already debug this part of the code?
>> 
>> Patrick
>> 
>> 
>> 
>> 
>> Am 22.03.2021 um 13:54 schrieb Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] 
>> <mailto:[email protected]>>>:
>> 
>> Hi Patrick,
>> Yes, I am on SNAPSHOT.
>> 
>> So, as a workaround I am trying to delete the PE by sending a DELETE Rest 
>> request as in line 136 
>> (https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf7e4b144d6fe/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java#L136
>>  
>> <https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf7e4b144d6fe/streampipes-container/src/main/java/org/apache/streampipes/container/api/InvocablePipelineElementResource.java#L136>).
>> 
>> It works fine in normal case when the PE is running, I can use this delete 
>> call to stop the PE and start again using my last mentioned approach.
>> The problem is when I manually cancel the Flink job, then this delete 
>> request doesn’t find the Flink job and neither deletes the running instance 
>> Id from its in-memory data.
>> <image002.jpg>
>> 
>> This is very similar to another problem, for which I have opened this 
>> Jira ticket:
>> https://issues.apache.org/jira/projects/STREAMPIPES/issues/STREAMPIPE
>> S
>> -321?filter=allopenissues
>> <https://issues.apache.org/jira/projects/STREAMPIPES/issues/STREAMPIP
>> E
>> S-321?filter=allopenissues>
>> 
>> Is it possible to resolve this issue? Also, if it seems small fix, 
>> then you can also guide me to provide you a PR 😊
>> 
>> Thanks again for your support!
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.A. Muhammad Faizan
>> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Sent: Friday, March 19, 2021 4:12 PM
>> To: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Cc: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Philipp Zehnder <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: Re: Streampipes für Automotive
>> 
>> I assume you use the SNAPSHOT version?
>> 
>> We store a running instance id in-memory for every processor/sink 
>> that has been deployed - if you cancel the job and re-sent the graph via the 
>> GraphSubmitter it is checked upon receiving HTTP request at the element 
>> service itself wether that instance id is still there, because, well it 
>> never got terminated (see line 71 onwards) [1].
>> 
>> Maybe you can find a workaround here. To date, we don’t have such a check.
>> 
>> 
>> Patrick
>> 
>> 
>> [1]
>> https://github.com/apache/incubator-streampipes/blob/dev/streampipes-
>> c
>> ontainer/src/main/java/org/apache/streampipes/container/api/Invocable
>> P
>> ipelineElementResource.java
>> <https://github.com/apache/incubator-streampipes/blob/dev/streampipes
>> -
>> container/src/main/java/org/apache/streampipes/container/api/Invocabl
>> e
>> PipelineElementResource.java>
>> 
>> 
>> 
>> 
>> 
>> Am 19.03.2021 um 15:53 schrieb Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] 
>> <mailto:[email protected]>>>:
>> 
>> Thanks Patrick and Dominik.
>> 
>> Now, I have tried this approach, and the invocation request is received by 
>> “pipeline-elements-all-flink”, but its somehow detecting that the job is 
>> still running and skipping the invocation. See the following log:
>> _______________
>> 14:39:11.137 SP [XNIO-1 task-1] INFO  
>> o.a.s.c.a.InvocablePipelineElementResource - Pipeline element Aggregation 
>> with id 
>> 309829d0-3cec-4038-88a7-05bbe43962e8-org.apache.streampipes.examples.waterlevel-aggregation-0
>>  seems to be already running, skipping invocation request.
>> ______________
>> 
>> Any suggestions, How or from where I need to reset the job status? Although 
>> the job status on flink is CANCELLED.
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.A. Muhammad Faizan
>> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Sent: Friday, March 19, 2021 12:38 PM
>> To: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Cc: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Philipp Zehnder <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: RE: Streampipes für Automotive
>> 
>> Hi,
>> that’s correct – I think the updated groupId could also be removed right now 
>> – we have an open issue to let the backend decide on Kafka settings [1], the 
>> idea here is to allow also for “pausing” of pipelines so that consumers can 
>> also start at an earlier offset. But so far, the groupId is newly created so 
>> that the pipeline always picks up the latest offset. This doesn’t have any 
>> effect on topic creation, the topic remains the same.
>> 
>> Hope this clarifies the current approach!
>> 
>> Dominik
>> 
>> 
>> [1] https://issues.apache.org/jira/browse/STREAMPIPES-15
>> <https://issues.apache.org/jira/browse/STREAMPIPES-15>
>> 
>> From: Patrick Wiener
>> Sent: Friday, March 19, 2021 10:20 AM
>> To: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Cc: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Philipp Zehnder <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: Re: Streampipes für Automotive
>> 
>> Topics are auto-created only once at initial start and reused - 
>> consumer and producer threads are created/stopped with pipeline start/stop.
>> 
>> 
>> .........................................................
>> M.Sc. Patrick Wiener
>> Wissenschaftlicher Mitarbeiter | Research Scientist Information 
>> Process Engineering (IPE)
>> 
>> FZI Forschungszentrum Informatik
>> Haid-und-Neu-Str. 10–14
>> 76131 Karlsruhe, Germany
>> Tel.: +49 721 9654-822
>> 
>> [email protected] <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> www.fzi.de/mitarbeiter/wiener 
>> <http://www.fzi.de/mitarbeiter/wiener><http://www.fzi.de/mitarbeiter/
>> w iener <http://www.fzi.de/mitarbeiter/wiener>>
>> 
>> .........................................................
>> FZI Forschungszentrum Informatik
>> Stiftung des bürgerlichen Rechts
>> Stiftung Az: 14-0563.1 Regierungspräsidium Karlsruhe
>> Vorstand: Prof. Dr. Andreas Oberweis, Jan Wiesenberger, Prof. Dr.-Ing. 
>> J. Marius Zöllner Vorsitzender des Kuratoriums: Ministerialdirigent 
>> Günther Leßnerkraus .........................................................
>> 
>> Am 19.03.2021 um 09:57 schrieb Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] 
>> <mailto:[email protected]>>>:
>> 
>> Thanks Patrick for the information.
>> 
>> @Dominik Riemer<mailto:[email protected] <mailto:[email protected]>> Please let me 
>> know about the updatedGroupIds? Also, does it mean that every time we stop 
>> and start a pipeline, all the communication channels (i.e. kafka topics) are 
>> created newly again?
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.A. Muhammad Faizan
>> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Sent: Thursday, March 18, 2021 7:41 PM
>> To: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Cc: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Philipp Zehnder <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: Re: Streampipes für Automotive
>> 
>> Hi Faizan,
>> 
>> your case sounds interesting and you’re already in the right module/package.
>> 
>> As you correctly pointed out, the PipelineExecutor handles the pipeline life 
>> cycle, i.e., start and stop individual pipeline elements.
>> Regarding the notation, we refer to all elements that are used in a 
>> pipeline as pipeline elements. Thus data streams originating from adapters 
>> are logically also denoted as pipeline elements.
>> 
>> While streams are already existing at modeling time, neither of the so 
>> called InvocableStreamPipesEntity types are. That are:
>> 
>> 
>> *   Data Processors
>> *   Data Sinks
>> 
>> We refer to the notation of Opher Etzion:
>> All data processors are considered event processing agents (EPA), sometimes 
>> semantic EPA (S-EPA).
>> All data sinks are considered event consumers (EC) or semantic EC (S-EC).
>> 
>> You will often find this in the code base.
>> 
>> Thus each pipeline may carry a List<DataProcessorInvokation>, 
>> List<DataSinkInvokation>. These entities are often referred to 
>> „graphs“, short for invokation graphs, as they carry user-defined static 
>> properties selected in the UI. These graphs are build in the 
>> InvokationGraphBuilder class.
>> 
>> These very graph descriptions are handed over to the GraphSubmitter 
>> class, that actually sends the descriptions to the corresponding pipeline 
>> element service endpoint, in your case the Flink service PE that you defined.
>> 
>> After all its basically sending a post request to the corresponding API 
>> endpoint of the PE service.
>> 
>> Hope this helps to get going :)
>> 
>> Maybe @Dominik can tell you about the updatedGroupIds for Kafka 
>> because I’m currently not quite sure about this
>> 
>> Cheers
>> Patrick
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Am 18.03.2021 um 18:55 schrieb Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] 
>> <mailto:[email protected]>>>:
>> 
>> Hi Patrick & Dominik,
>> 
>> I have been taking a look into DataProcessorInvocation and StreamPipes 
>> source code. I have few questions.
>> 
>> My use case is: [Steps]
>> 
>> 1.  Start a pipeline (with two flink PE’s)  2.  Go to Flink 
>> dashboard, and cancel a single PE from this pipeline.
>> 3.  All other PE’s are running except the cancelled PE.
>> 4.  Now, I am trying to start this single PE separately.
>> 
>> To do this I found 
>> PipelineExecuter<https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf7e4b144d6fe/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java#L57
>>  
>> <https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf7e4b144d6fe/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java#L57>>
>>  class which has the logic to start a pipeline. What I am trying to do is to 
>> create a pipeline object with only single PE (i.e. sepa) and no actions, no 
>> streams. And then start this pipeline to start the single PE.
>> Do you think it’s a correct approach? Please take a look at below code:
>> 
>> <image002.jpg>
>> 
>> 
>> Also, what is the purpose of this line 
>> “pipeline.getSepas().forEach(this::updateGroupIds);<https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf7e4b144d6fe/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java#L59
>>  
>> <https://github.com/apache/incubator-streampipes/blob/2f10a17b77a93eee48527e4e88baf7e4b144d6fe/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java#L59>>”?
>>  Does updateGroupIds method changes the kafka topics and stuff? Will it 
>> effect in my use case?
>> 
>> Thank you for your time & help!
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.A. Muhammad Faizan
>> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Muhammad Faizan
>> Sent: Saturday, March 13, 2021 4:25 PM
>> To: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Patrick Wiener <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Cc: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Philipp Zehnder <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: RE: Streampipes für Automotive
>> 
>> Hi Dominik,
>> 
>> Thanks very much for your detailed information.
>> 
>> I have now upgraded my extensions and deployment to 0.68.0-SNAPSHOT. I will 
>> now try out the StreamPipes-client, swagger API’s and especially 
>> theDataProcessorInvocation. I will let you know if any questions.
>> 
>> Regarding the health checks of PE’s. I am wondering if I get some kind of 
>> status of the each PE of a running pipeline like (#flink) whether it is 
>> Running properly, or is it crashed and yet to be re-scheduled by jobmanager, 
>> or is it failed. In these cases I can then add some logic in my service 
>> management layer to fix the issue either manually (by user input) or auto. 
>> Also, in case of failure on flink, how to handle the checkpoints/savepoints 
>> through the management layer.
>> 
>> As we can have different runtime for PE (Flink / JVM), it would be good if 
>> StreamPipes is providing an abstraction over getting the health status of 
>> PE’s.
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.A. Muhammad Faizan
>> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Sent: Wednesday, March 10, 2021 9:14 PM
>> To: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Patrick Wiener <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Cc: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Philipp Zehnder <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: RE: Streampipes für Automotive
>> 
>> Hi Muhammad,
>> 
>> sounds cool, having a management layer API would be great!
>> 
>> The client and REST API are not yet officially released, but if you are 
>> running the latest snapshot version from dev, you can already play around 
>> with it.
>> Although there is not yet a documentation, you can find some examples here 
>> [1] and there is a (yet incomplete) swagger documentation that can be opened 
>> from the login page. In order to use the API, you need to create an API key 
>> from the UI by clicking the user icon in the top-right corner and then 
>> navigate to “Profile->API keys”. In general, all endpoints (also the 
>> undocumented) are accessible with a valid API key.
>> 
>> Some of your requirements are already supported by the API, so you can get 
>> all pipelines which also includes the PEs and their configuration. Also, 
>> pipeline element templates can be fetched and there is a feature in the 
>> client that instantiates a DataProcessorInvocation or a DataSinkInvocation 
>> with a provided pipeline element template config. As there is not yet a 
>> convenient way to create a pipeline element template, you can create one in 
>> the UI and inspect the config in the database (the DB is called 
>> pipelineelementtemplate and you can access the CouchDB instance on port 
>> 5984, there is a UI at path /_utils.
>> 
>> There might be some things that do not yet fully work, so if you find any 
>> bugs, we’ll be glad to fix them.
>> 
>> Regarding the health status, there is not yet anything in the UI, but it’s 
>> some of the next things on my roadmap, so it would be great to discuss your 
>> ideas and requirements.
>> What information would you expect from the health status message? I guess 
>> information on the general node health, but probably also on errors and 
>> exception that occur at runtime. The conceptual question behind I’m 
>> currently thinking about is whether the Pes should provide an endpoint that 
>> holds health information and is called by the core if health status is 
>> requested from anywhere, or, as an alternative, if there should be some push 
>> communication from the PEs to the core that provides health information.
>> I tend to prefer option 1, but what is your opinion?
>> 
>> Best,
>> Dominik
>> 
>> [1]
>> https://github.com/apache/incubator-streampipes-examples/blob/dev/str
>> e
>> ampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/a
>> p ache/streampipes/client/example/StreamPipesClientExample.java
>> <https://github.com/apache/incubator-streampipes-examples/blob/dev/st
>> r
>> eampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/
>> a pache/streampipes/client/example/StreamPipesClientExample.java>
>> 
>> 
>> From: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Sent: Wednesday, March 10, 2021 10:55 AM
>> To: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Dominik Riemer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Cc: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: RE: Streampipes für Automotive
>> 
>> Hi Patrick & Dominik,
>> 
>> Hope you are you doing good!
>> 
>> I was thinking about implementing a small management layer API’s for better 
>> application specific controllability.
>> I have the following list of operations in mind which I am thinking of 
>> achieving:
>> 
>> *   Get list of running pipelines.
>> *   Get list of pipeline elements (PE) of a certain pipeline.
>> *   Get health status of each PE.
>> *   Option to restart an individual PE of a running pipeline.
>> *   See configurations of individual PE.
>> *   Export a pipeline template.
>> *   Create new pipeline from template.
>> 
>> Do you think, is it possible right now to access these controllability 
>> through StreamPipes using either StreamPipes Client or REST API’s. If yes, 
>> can you share any documentation or info?
>> 
>> Thank you very much for your time!
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.A. Muhammad Faizan
>> Student / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 201 / NiceNet: 5033-201 / [email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Sent: Monday, February 1, 2021 3:17 PM
>> To: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Cc: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: RE: Streampipes für Automotive
>> 
>> Hi All, here you go with the paper.
>> 
>> And thanks a lot for this open discussion – was really helpful for us!
>> 
>> Christian
>> 
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.V. Dr. Christian Prehofer
>> Director / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 776 / Mobile: +49 152 04962246 / NiceNet: 
>> 5033-776 / [email protected]
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Sent: Monday, 1 February 2021 15:14
>> To: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Cc: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: Re: Streampipes für Automotive
>> 
>> Hi all,
>> 
>> @Christian: Unfortunately I was not able to download the paper in Teams :/ 
>> could you send it to me via mail?
>> 
>> @Faizan: Please see the latest changes to the k8s installation here: 
>> https://github.com/apache/incubator-streampipes-installer/tree/dev/k8
>> s
>> <https://github.com/apache/incubator-streampipes-installer/tree/dev/k
>> 8
>> s> Here we mount ${HOME}/streampipes-k8s:/streampipes-k8s as the host
>> volume to persist the configuration + db’s.
>> 
>> Best
>> Patrick
>> 
>> 
>> .........................................................
>> M.Sc. Patrick Wiener
>> Wissenschaftlicher Mitarbeiter | Research Scientist Information 
>> Process Engineering (IPE)
>> 
>> FZI Forschungszentrum Informatik
>> Haid-und-Neu-Str. 10–14
>> 76131 Karlsruhe, Germany
>> Tel.: +49 721 9654-822
>> 
>> [email protected] <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> www.fzi.de/mitarbeiter/wiener 
>> <http://www.fzi.de/mitarbeiter/wiener><http://www.fzi.de/mitarbeiter/
>> w iener <http://www.fzi.de/mitarbeiter/wiener>>
>> 
>> .........................................................
>> FZI Forschungszentrum Informatik
>> Stiftung des bürgerlichen Rechts
>> Stiftung Az: 14-0563.1 Regierungspräsidium Karlsruhe
>> Vorstand: Prof. Dr. Andreas Oberweis, Jan Wiesenberger, Prof. Dr.-Ing. 
>> J. Marius Zöllner Vorsitzender des Kuratoriums: Ministerialdirigent 
>> Günther Leßnerkraus .........................................................
>> 
>> Am 28.01.2021 um 11:31 schrieb Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] 
>> <mailto:[email protected]>>>:
>> 
>> Dear Dominik,
>> 
>> thanks for quick reply, and sorry for delay.  From our side, Monday 14:00 or 
>> Tuesday at 15:00 would work.
>> 
>> I will just send an invite for Monday
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.V. Dr. Christian Prehofer
>> Director / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 776 / Mobile: +49 152 04962246 / NiceNet: 
>> 5033-776 / [email protected]
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Sent: Wednesday, 27 January 2021 06:58
>> To: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>; Patrick Wiener <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Cc: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: RE: Streampipes für Automotive
>> 
>> Dear Christian,
>> 
>> sure we can have a talk and discuss your questions!
>> What about early next week, e.g., Monday 14:00, Tuesday 14:00 (or later) or 
>> Wednesday morning?
>> 
>> Best,
>> Dominik
>> 
>> 
>> .........................................................
>> Dr.-Ing. Dominik Riemer
>> Bereichsleiter | Division Manager
>> Information Process Engineering (IPE)
>> 
>> FZI Forschungszentrum Informatik
>> Haid-und-Neu-Str. 10–14
>> 76131 Karlsruhe, Germany
>> Tel.: +49 721 9654-724
>> 
>> [email protected] <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> www.fzi.de
>> <http://www.fzi.de/><http://www.fzi.de/ <http://www.fzi.de/>>
>> 
>> 
>> .........................................................
>> FZI Forschungszentrum Informatik
>> Stiftung des bürgerlichen Rechts
>> Stiftung Az: 14-0563.1 Regierungspräsidium Karlsruhe
>> Vorstand: Prof. Dr. Andreas Oberweis, Jan Wiesenberger, Prof. Dr.-Ing. 
>> J. Marius Zöllner Vorsitzender des Kuratoriums: Ministerialdirigent 
>> Günther Leßnerkraus .........................................................
>> 
>> 
>> 
>> From: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Sent: Monday, January 25, 2021 3:17 PM
>> To: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>; 
>> Dominik Riemer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Cc: Muhammad Faizan <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Subject: RE: Streampipes für Automotive
>> 
>> Dear Patricl, Dominik,
>> 
>> Nice to talk to you in October.
>> 
>> Just heard that you were in contact with Muhammad (see CC) discussing some 
>> issues with Streampipes. He is one of the MS thesis students currently 
>> working with me, and has already setup a nice case study in Streampipes.
>> 
>> We are currently discussing how to proceed – mainly to evaluate and possibly 
>> extend Streampipes to more automotive use cases. We have some questions for 
>> some points on your agenda. See slide below.
>> A main point of interest would be the management of pipeline elements and 
>> also fault tolerance – the second and third last items in your list. He had 
>> also some issues with Kubernetes setup.
>> 
>> Would you have a minute to talk about these items? We would like to find a 
>> topic and plan such that he can complete a nice thesis in the remaining 
>> months.
>> 
>> Best regards, Christian Prehofer
>> 
>> 
>> <image001.jpg>
>> 
>> Mit freundlichen Grüßen / Best regards / Yoroshiku Onegai Shimasu
>> 
>> i.V. Dr. Christian Prehofer
>> Director / Corporate R&D / DENSO AUTOMOTIVE Deutschland GmbH
>> Phone: +49 8165 944 776 / Mobile: +49 152 04962246 / NiceNet: 
>> 5033-776 / [email protected]
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>> / www.denso.com 
>> <http://www.denso.com/><http://www.denso.com/
>> <http://www.denso.com/>>
>> 
>> 
>> 
>> From: Patrick Wiener <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Sent: Thursday, 22 October 2020 10:49
>> To: Christian Prehofer <[email protected] 
>> <mailto:[email protected]><mailto:[email protected]
>> <mailto:[email protected]>>>
>> Cc: Dominik Riemer <[email protected]
>> <mailto:[email protected]><mailto:[email protected] <mailto:[email protected]>>>
>> Subject: Re: Streampipes für Automotive
>> 
>> Hallo Herr Prehofer,
>> 
>> Danke für den netten Austausch.
>> 
>> Vllt für sie auch interessant, falls noch nicht bekannt: „Musketeer“
>> 
>> http://people.csail.mit.edu/malte/pub/papers/2015-eurosys-musketeer.p
>> d
>> f
>> 
>> Viele Grüße
>> Patrick Wiener
>> 
>> .........................................................
>> M.Sc. Patrick Wiener
>> Wissenschaftlicher Mitarbeiter | Research Scientist Information 
>> Process Engineering (IPE)
>> 
>> FZI Forschungszentrum Informatik
>> Haid-und-Neu-Str. 10–14
>> 76131 Karlsruhe, Germany
>> Tel.: +49 721 9654-822
>> 
>> [email protected]<mailto:[email protected]>
>> www.fzi.de/mitarbeiter/wiener<http://www.fzi.de/mitarbeiter/wiener>
>> 
>> .........................................................
>> FZI Forschungszentrum Informatik
>> Stiftung des bürgerlichen Rechts
>> Stiftung Az: 14-0563.1 Regierungspräsidium Karlsruhe
>> Vorstand: Prof. Dr. Andreas Oberweis, Jan Wiesenberger, Prof. Dr.-Ing. 
>> J. Marius Zöllner Vorsitzender des Kuratoriums: Ministerialdirigent 
>> Günther Leßnerkraus .........................................................
>> 
>> Am 16.10.2020 um 11:19 schrieb Patrick Wiener 
>> <[email protected]<mailto:[email protected]>>:
>> 
>> Hallo Herr Prehofer,
>> 
>> das ist kein Problem.
>> 
>> Viele Grüße
>> Patrick Wiener
>> 
>> .........................................................
>> M.Sc. Patrick Wiener
>> Wissenschaftlicher Mitarbeiter | Research Scientist Information 
>> Process Engineering (IPE)
>> 
>> FZI Forschungszentrum Informatik
>> Haid-und-Neu-Str. 10–14
>> 76131 Karlsruhe, Germany
>> Tel.: +49 721 9654-822
>> 
>> [email protected]<mailto:[email protected]>
>> www.fzi.de/mitarbeiter/wiener<http://www.fzi.de/mitarbeiter/wiener>
>> 
>> .........................................................
>> FZI Forschungszentrum Informatik
>> Stiftung des bürgerlichen Rechts
>> Stiftung Az: 14-0563.1 Regierungspräsidium Karlsruhe
>> Vorstand: Prof. Dr. Andreas Oberweis, Jan Wiesenberger, Prof. Dr.-Ing. 
>> J. Marius Zöllner Vorsitzender des Kuratoriums: Ministerialdirigent 
>> Günther Leßnerkraus .........................................................
>> 
>> Am 16.10.2020 um 11:18 schrieb Christian Prehofer 
>> <[email protected]<mailto:[email protected]>>:
>> 
>> Hallo, Termin wie besprochen, hoffe MS Teams ist ok.
>> 
>> Viele Grüsse, Christian Prehofer
>> _____________________________________________________________________
>> _
>> __________ Join Microsoft Teams
>> Meeting<https://teams.microsoft.com/l/meetup-join/19%3ameeting_NGUyNG
>> N
>> kZmEtNTZhOS00OWJhLWI3MGYtYTJmZjhmNmUwMWNh%40thread.v2/0?context=%7b%2
>> 2
>> Tid%22%3a%2269405920-b673-4f7c-8845-e124e9d08af2%22%2c%22Oid%22%3a%22
>> 2 b1355a3-3eac-4fac-a83a-b2569192f6d4%22%7d>
>> +49 69 365057908<tel:+49%2069%20365057908,,535068127>   Germany, Frankfurt 
>> am Main (Toll)
>> Conference ID: 535 068 127#
>> Local
>> numbers<https://dialin.teams.microsoft.com/86a44213-e985-4164-9704-d9
>> c
>> c945f5413?id=535068127> | Reset
>> PIN<https://mysettings.lync.com/pstnconferencing> | Learn more about 
>> Teams<https://aka.ms/JoinTeamsMeeting> | Meeting
>> options<https://teams.microsoft.com/meetingOptions/?organizerId=2b135
>> 5
>> a3-3eac-4fac-a83a-b2569192f6d4&tenantId=69405920-b673-4f7c-8845-e124e
>> 9
>> d08af2&threadId=19_meeting_NGUyNGNkZmEtNTZhOS00OWJhLWI3MGYtYTJmZjhmNm
>> U [email protected]&messageId=0&language=en-US>
>> _____________________________________________________________________
>> _
>> __________
> 

Reply via email to