Re: Enhance batch support - batch demarcation
Hi All, Starting with the implementation, we are planning to take care of a single batch job first. We will take up the scheduling aspect later. The first requirement is the following: A batch job is an Apex application which picks up data from the source, and processes it. Once the data is completely processed, it should detect the end of batch and shutdown the application. This will help separate the scheduling aspect from the Apex batch job. We have the following options to shut down the application - - First option is to throw a shutdown() exception from the input operator - End of batch can be detected by the input operator depending on source specific details. - Once the end of batch is detected, the operator itself can shutdown the application by throwing a shutdown signal. - Problem with this approach is that even though the batch has ended, the downstream (output) operators might have finalization pending which is usually done in calls like committed(). Waiting for the committed call in the input operator may also not help since this call may happen for the output operator in a different window. - Another issue might be with multiple input operators where each may send the shutdown signal independently. - Second, allow the engine to identify whether the application is a batch application and if so, poll for ```isBatchDone()``` on the input operator until it is true. Once it returns true, we can wait for the committed() call and end the application via the Input operator. We can have an interface BatchInput which would be implemented by the Input Operator of a batch application. ``` public interface BatchInput extends InputOperator { boolean isBatchDone(); } ``` The isBatchDone() method can be implemented by the developer which can identify when a batch has ended. This could be called by the engine to identify whether the batch has ended and shut down the application. Allow the engine to identify whether the application is a batch application and if so, poll for ```isBatchDone()``` on the input operator until it is true. Once it returns true, we can wait for the committed() call and end the application via the Input operator. We can have an interface BatchInput which would be implemented by the Input Operator of a batch application. - Third, use a shared Stats Listener to identify the end of a batch using some metric in the input operator and remove operators via a dag change request. The dag change request can be enabled by APEXCORE-408 which is in progress. Please suggest. ~ Bhupesh On Fri, Sep 16, 2016 at 2:10 PM, Bhupesh Chawda wrote: > Hi All, > > Resuming the discussion. > > After some discussion, I have created a document which captures the > requirements and a high level design for supporting batch applications. The > document consolidates different threads of discussion and aspects which are > relevant to batch support. > > This is in no way a design document, but just captures the high level > steps. I have tried to keep it very brief and to the point. I will keep > refining the document depending on the comments to ultimately convert it to > a design document. > > Here is the link to the document: https://docs.google.com/document/d/ > 1qlyQJP80dOlWZeHwICMFA3D3jGG_T2NLhMfzScbuTwQ/edit?usp=sharing > > Please provide your valuable feedback. > > ~ Bhupesh > > On Tue, Feb 23, 2016 at 7:24 AM, David Yan wrote: > >> For batch applications without checkpointing or iteration loops, what >> would >> be the significance of streaming windows and application windows? >> >> >> On Sun, Feb 14, 2016 at 10:33 PM, Thomas Weise >> wrote: >> >> > Time to resume this discussion. I think it makes sense to look at the >> batch >> > as execution of a DAG from setup to teardown for all its operators, as >> > suggested by Bhupesh and Sandeep. The DAG comes into existence when the >> > batch begins and terminates when it is done. >> > >> > We have also seen from customers that there is demand for having the >> > scheduler function built in, when there is no external component already >> > present. For example, a file or set of files could be identified as >> > "batch". As the application is idle, there is only a scheduler operator >> > which polls for files. Once work is ready, that operator would launch >> the >> > DAG for processing (within same application, but not connected through >> > stream). When processing is complete, that DAG terminates and returns >> the >> > resources. >> > >> > As discussed, there is the need to be able to turn off checkpointing, >> which >> > is different from setting a large checkpoint window. No checkpointing >> means >> > no incremental recovery and hence no need to keep data in buffers. >> > >> > There is also the need to relay begin/end signal through the entire DAG. >> > This is different from setup/shutdown. It is more like begin/endWindow, >> but >> >
Re: example applications in malhar
Thanks for the suggestions and I am working on the process to migrate the examples with the guidelines you mentioned. I will send out a list of examples and the destination modules very soon. On Thu, Oct 27, 2016 at 1:43 PM, Thomas Weise wrote: > Maybe a good first step would be to identify which examples to bring over > and where appropriate how to structure them in Malhar (for example, I see > multiple hdfs related apps that could go into the same Maven module). > > > On Tue, Oct 25, 2016 at 1:00 PM, Thomas Weise wrote: > > > That would be great. There are a few things to consider when working on > it: > > > > * preserve attribtion > > * ensure there is a test that runs the application in the CI > > * check that dependencies are compatible license > > * maybe extract common boilerplate code from pom.xml > > > > etc. > > > > Existing examples are under https://github.com/apache/ > > apex-malhar/tree/master/demos > > > > Perhaps we should rename it to "examples" > > > > I also propose that each app has a README and we add those for existing > > apps as well. > > > > Thanks, > > Thomas > > > > > > > > > > On Tue, Oct 25, 2016 at 12:49 PM, Lakshmi Velineni < > > laks...@datatorrent.com> wrote: > > > >> Can i work on this? > >> > >> Thanks > >> Lakshmi Prasanna > >> > >> On Mon, Sep 12, 2016 at 9:41 PM, Ashwin Chandra Putta < > >> ashwinchand...@gmail.com> wrote: > >> > >> > Here is the JIRA: https://issues.apache.org/ > jira/browse/APEXMALHAR-2233 > >> > > >> > On Tue, Sep 6, 2016 at 10:20 PM, Amol Kekre > >> wrote: > >> > > >> > > Good idea to consolidate them into Malhar. We should bring in as > many > >> > from > >> > > this gitHub as possible. > >> > > > >> > > Thks > >> > > Amol > >> > > > >> > > > >> > > On Tue, Sep 6, 2016 at 6:02 PM, Thomas Weise < > tho...@datatorrent.com> > >> > > wrote: > >> > > > >> > > > I'm also for consolidating these different example locations. We > >> should > >> > > > also look if all of it is still relevant. > >> > > > > >> > > > The stuff from the DT repository needs to be brought into shape > wrt > >> > > > licensing, checkstyle, CI support etc. > >> > > > > >> > > > > >> > > > On Tue, Sep 6, 2016 at 4:34 PM, Pramod Immaneni < > >> > pra...@datatorrent.com> > >> > > > wrote: > >> > > > > >> > > > > Sounds like a good idea. How about merging demos with apps as > >> well? > >> > > > > > >> > > > > On Tue, Sep 6, 2016 at 4:30 PM, Ashwin Chandra Putta < > >> > > > > ashwinchand...@gmail.com> wrote: > >> > > > > > >> > > > > > Hi All, > >> > > > > > > >> > > > > > We have a lot of examples for apex malhar operators in the > >> > following > >> > > > > > repository which resides outside of malhar. > >> > > > > > https://github.com/DataTorrent/examples/tree/master/tutorials > >> > > > > > > >> > > > > > Now that it has grown quite a bit, does it make sense to bring > >> some > >> > > of > >> > > > > the > >> > > > > > most common examples to malhar repository? Probably under apps > >> > > > directory? > >> > > > > > > >> > > > > > That way folks looking at malhar repository will have some > >> samples > >> > to > >> > > > > look > >> > > > > > at without having to search elsewhere. > >> > > > > > > >> > > > > > -- > >> > > > > > > >> > > > > > Regards, > >> > > > > > Ashwin. > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > > >> > Regards, > >> > Ashwin. > >> > > >> > > > > >
[jira] [Commented] (APEXCORE-526) Publish javadoc for releases on ASF infrastructure
[ https://issues.apache.org/jira/browse/APEXCORE-526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15631520#comment-15631520 ] Munagala V. Ramanath commented on APEXCORE-526: --- https://ci.apache.org/projects/apex-malhar/apex-malhar-javadoc-release-3.5/ The Malhar javadocs are now available there. > Publish javadoc for releases on ASF infrastructure > --- > > Key: APEXCORE-526 > URL: https://issues.apache.org/jira/browse/APEXCORE-526 > Project: Apache Apex Core > Issue Type: Documentation >Reporter: Thomas Weise > > Every release should have the javadocs published and we should have it linked > from the download page, as is the case with user docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630848#comment-15630848 ] David Yan edited comment on APEXCORE-570 at 11/2/16 11:11 PM: -- [~PramodSSImmaneni] Maybe we can't block it but can't we suspend the call to emitTuples and also suspend the window from being progressed until a commit happens if the spool reaches capacity? was (Author: davidyan): [~PramodSSImmaneni] Maybe we can't block it but can't we suspend the call to emitTuples and also suspend the window from being progressed until a commit happens if the spool reaches the capacity? > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630848#comment-15630848 ] David Yan commented on APEXCORE-570: [~PramodSSImmaneni] Maybe we can't block it but can't we suspend the call to emitTuples and also suspend the window from being progressed until a commit happens if the spool reaches the capacity? > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630806#comment-15630806 ] Pramod Immaneni commented on APEXCORE-570: -- [~davidyan] that would lead to deadlock because old data cannot be deleted till committed. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630749#comment-15630749 ] Munagala V. Ramanath commented on APEXCORE-570: --- I think part of the problem is that sometimes applications are written where upstream operators maintain state that will stay a reasonable size as long as data flows through the pipeline normally but if the downstream slows down, state starts to get bigger and bigger to the point where the operator is deemed hung and killed perhaps because the checkpointing is taking too long, or perhaps because it blows memory limits because spilling to disk is not happening fast enough. In such cases, this sort of throttling mechanism can be a useful coping mechanism. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630694#comment-15630694 ] David Yan commented on APEXCORE-570: If the spooling capacity limit is reached, would the operator just get blocked? If so, wouldn't that solve the back pressure problem at least partially? > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Custom Control Tuples
Suppose I am processing data in a file and I want to do something at the end of a file at the output operator, I would send an end file control tuple and act on it when I receive it at the output. In a single window I may end up processing multiple files and if I don't have multiple ports and logical paths through the DAG (multiple partitions are ok). I can process end of each file immediately and also know what file was closed without sending extra identification information in the end file which I would need if I am collecting all of them and processing at the end of the window. On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise wrote: > The use cases listed in the original discussion don't call for option 2. It > seems to come with additional complexity and implementation cost. > > Can those in favor of option 2 please also provide the use case for it. > > Thanks, > Thomas > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua > wrote: > > > I will vote for approach 1. > > > > First of all that one sounds easier to do to me. And I think idempotency > is > > important. It may run at the cost of higher latency but I think it is ok > > > > And in addition, when in the future if users do need realtime control > tuple > > processing, we can always add the option on top of it. > > > > So I vote for 1 > > > > Thanks, > > Siyuan > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi > wrote: > > > > > As a rule of thumb in any real time operating system, control tuples > > should > > > always be handled using Priority Queues. > > > > > > We may try to control priorities by defining levels. And shall not > > > be delivered at window boundaries. > > > > > > In short, control tuples shall never be treated as any other tuples in > > real > > > time systems. > > > > > > On Thursday, November 3, 2016, David Yan > wrote: > > > > > > > Hi all, > > > > > > > > I would like to renew the discussion of control tuples. > > > > > > > > Last time, we were in a debate about whether: > > > > > > > > 1) the platform should enforce that control tuples are delivered at > > > window > > > > boundaries only > > > > > > > > or: > > > > > > > > 2) the platform should deliver control tuples just as other tuples > and > > > it's > > > > the operator developers' choice whether to handle the control tuples > as > > > > they arrive or delay the processing till the next window boundary. > > > > > > > > To summarize the pros and cons: > > > > > > > > Approach 1: If processing control tuples results in changes of the > > > behavior > > > > of the operator, if idempotency needs to be preserved, the processing > > > must > > > > be done at window boundaries. This approach will save the operator > > > > developers headache to ensure that. However, this will take away the > > > > choices from the operator developer if they just need to process the > > > > control tuples as soon as possible. > > > > > > > > Approach 2: The operator has a chance to immediately process control > > > > tuples. This would be useful if latency is more valued than > > correctness. > > > > However, if this would open the possibility for operator developers > to > > > > shoot themselves in the foot. This is especially true if there are > > > multiple > > > > input ports. as there is no easy way to guarantee processing order > for > > > > multiple input ports. > > > > > > > > We would like to arrive to a consensus and close this discussion soon > > > this > > > > time so we can start the work on this important feature. > > > > > > > > Thanks! > > > > > > > > David > > > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov < > v.ro...@datatorrent.com > > > > > > > > > wrote: > > > > > > > > > It is not clear how operator will emit custom control tuple at > window > > > > > boundaries. One way is to cache/accumulate control tuples in the > > > operator > > > > > output port till window closes (END_WINDOW is inserted into the > > output > > > > > sink) or only allow an operator to emit control tuples inside the > > > > > endWindow(). The later is a slight variation of the operator output > > > port > > > > > caching behavior with the only difference that now the operator > > itself > > > is > > > > > responsible for caching/accumulating control tuples. Note that in > > many > > > > > cases it will be necessary to postpone emitting payload tuples that > > > > > logically come after the custom control tuple till the next window > > > > begins. > > > > > > > > > > IMO, that too restrictive and in a case where input operator uses a > > > push > > > > > instead of a poll (for example, it provides an end point where > remote > > > > > agents may connect and publish/push data), control tuples may be > used > > > for > > > > > connect/disconnect/watermark broadcast to (partitioned) downstream > > > > > operators. In this case the platform just need to guarantee order > > > barrier > > > > > (any tuple emitted prior to a control tuple needs to be delivered > > prior > > > > to > > > >
Re: [DISCUSSION] Custom Control Tuples
With option 2, users can still do idempotent processing by delaying their processing of the control tuples to end window. They have the flexibility with this option. In the usual scenarios, you will have one port and given that control tuples will be sent to all partitions, all the data sent before the control tuple will arrive before the control tuple for all downstream partitions and users can still do idempotent processing. For the case with multiple ports you can delay processing till end window. On Wed, Nov 2, 2016 at 2:33 PM, Amol Kekre wrote: > A feature that incurs risk with processing order, and more so with > idempotency is a big enough reason to worry about with option 2. Is there > is a critical use case that needs this feature? > > Thks > Amol > > > On Wed, Nov 2, 2016 at 1:25 PM, Pramod Immaneni > wrote: > > > I like approach 2 as it gives more flexibility and also allows for > > low-latency options. I think the following are important as well. > > > > 1. Delivering control tuples to all downstream partitions. > > 2. What mechanism will the operator developer use to send the control > > tuple? Will it be an additional mehod on the output port? > > > > Thanks > > > > On Wed, Nov 2, 2016 at 1:16 PM, David Yan wrote: > > > > > Hi all, > > > > > > I would like to renew the discussion of control tuples. > > > > > > Last time, we were in a debate about whether: > > > > > > 1) the platform should enforce that control tuples are delivered at > > window > > > boundaries only > > > > > > or: > > > > > > 2) the platform should deliver control tuples just as other tuples and > > it's > > > the operator developers' choice whether to handle the control tuples as > > > they arrive or delay the processing till the next window boundary. > > > > > > To summarize the pros and cons: > > > > > > Approach 1: If processing control tuples results in changes of the > > behavior > > > of the operator, if idempotency needs to be preserved, the processing > > must > > > be done at window boundaries. This approach will save the operator > > > developers headache to ensure that. However, this will take away the > > > choices from the operator developer if they just need to process the > > > control tuples as soon as possible. > > > > > > Approach 2: The operator has a chance to immediately process control > > > tuples. This would be useful if latency is more valued than > correctness. > > > However, if this would open the possibility for operator developers to > > > shoot themselves in the foot. This is especially true if there are > > multiple > > > input ports. as there is no easy way to guarantee processing order for > > > multiple input ports. > > > > > > We would like to arrive to a consensus and close this discussion soon > > this > > > time so we can start the work on this important feature. > > > > > > Thanks! > > > > > > David > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov > > > wrote: > > > > > > > It is not clear how operator will emit custom control tuple at window > > > > boundaries. One way is to cache/accumulate control tuples in the > > operator > > > > output port till window closes (END_WINDOW is inserted into the > output > > > > sink) or only allow an operator to emit control tuples inside the > > > > endWindow(). The later is a slight variation of the operator output > > port > > > > caching behavior with the only difference that now the operator > itself > > is > > > > responsible for caching/accumulating control tuples. Note that in > many > > > > cases it will be necessary to postpone emitting payload tuples that > > > > logically come after the custom control tuple till the next window > > > begins. > > > > > > > > IMO, that too restrictive and in a case where input operator uses a > > push > > > > instead of a poll (for example, it provides an end point where remote > > > > agents may connect and publish/push data), control tuples may be used > > for > > > > connect/disconnect/watermark broadcast to (partitioned) downstream > > > > operators. In this case the platform just need to guarantee order > > barrier > > > > (any tuple emitted prior to a control tuple needs to be delivered > prior > > > to > > > > the control tuple). > > > > > > > > Thank you, > > > > > > > > Vlad > > > > > > > > > > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > > > > > >> I agree with David. Allowing control tuples within a window (along > > with > > > >> data tuples) creates very dangerous situation where guarantees are > > > >> impacted. It is much safer to enable control tuples (send/receive) > at > > > >> window boundaries (after END_WINDOW of window N, and before > > BEGIN_WINDOW > > > >> for window N+1). My take on David's list is > > > >> > > > >> 1. -> window boundaries -> Strong +1; there will be a big issue with > > > >> guarantees for operators with multiple ports. (see Thomas's > response) > > > >> 2. -> All downstream windows -> +1, but there are situations; a > caveat > > > >> could
Re: [DISCUSSION] Custom Control Tuples
The use cases listed in the original discussion don't call for option 2. It seems to come with additional complexity and implementation cost. Can those in favor of option 2 please also provide the use case for it. Thanks, Thomas On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua wrote: > I will vote for approach 1. > > First of all that one sounds easier to do to me. And I think idempotency is > important. It may run at the cost of higher latency but I think it is ok > > And in addition, when in the future if users do need realtime control tuple > processing, we can always add the option on top of it. > > So I vote for 1 > > Thanks, > Siyuan > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi wrote: > > > As a rule of thumb in any real time operating system, control tuples > should > > always be handled using Priority Queues. > > > > We may try to control priorities by defining levels. And shall not > > be delivered at window boundaries. > > > > In short, control tuples shall never be treated as any other tuples in > real > > time systems. > > > > On Thursday, November 3, 2016, David Yan wrote: > > > > > Hi all, > > > > > > I would like to renew the discussion of control tuples. > > > > > > Last time, we were in a debate about whether: > > > > > > 1) the platform should enforce that control tuples are delivered at > > window > > > boundaries only > > > > > > or: > > > > > > 2) the platform should deliver control tuples just as other tuples and > > it's > > > the operator developers' choice whether to handle the control tuples as > > > they arrive or delay the processing till the next window boundary. > > > > > > To summarize the pros and cons: > > > > > > Approach 1: If processing control tuples results in changes of the > > behavior > > > of the operator, if idempotency needs to be preserved, the processing > > must > > > be done at window boundaries. This approach will save the operator > > > developers headache to ensure that. However, this will take away the > > > choices from the operator developer if they just need to process the > > > control tuples as soon as possible. > > > > > > Approach 2: The operator has a chance to immediately process control > > > tuples. This would be useful if latency is more valued than > correctness. > > > However, if this would open the possibility for operator developers to > > > shoot themselves in the foot. This is especially true if there are > > multiple > > > input ports. as there is no easy way to guarantee processing order for > > > multiple input ports. > > > > > > We would like to arrive to a consensus and close this discussion soon > > this > > > time so we can start the work on this important feature. > > > > > > Thanks! > > > > > > David > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov > > > > > > wrote: > > > > > > > It is not clear how operator will emit custom control tuple at window > > > > boundaries. One way is to cache/accumulate control tuples in the > > operator > > > > output port till window closes (END_WINDOW is inserted into the > output > > > > sink) or only allow an operator to emit control tuples inside the > > > > endWindow(). The later is a slight variation of the operator output > > port > > > > caching behavior with the only difference that now the operator > itself > > is > > > > responsible for caching/accumulating control tuples. Note that in > many > > > > cases it will be necessary to postpone emitting payload tuples that > > > > logically come after the custom control tuple till the next window > > > begins. > > > > > > > > IMO, that too restrictive and in a case where input operator uses a > > push > > > > instead of a poll (for example, it provides an end point where remote > > > > agents may connect and publish/push data), control tuples may be used > > for > > > > connect/disconnect/watermark broadcast to (partitioned) downstream > > > > operators. In this case the platform just need to guarantee order > > barrier > > > > (any tuple emitted prior to a control tuple needs to be delivered > prior > > > to > > > > the control tuple). > > > > > > > > Thank you, > > > > > > > > Vlad > > > > > > > > > > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > > > > > >> I agree with David. Allowing control tuples within a window (along > > with > > > >> data tuples) creates very dangerous situation where guarantees are > > > >> impacted. It is much safer to enable control tuples (send/receive) > at > > > >> window boundaries (after END_WINDOW of window N, and before > > BEGIN_WINDOW > > > >> for window N+1). My take on David's list is > > > >> > > > >> 1. -> window boundaries -> Strong +1; there will be a big issue with > > > >> guarantees for operators with multiple ports. (see Thomas's > response) > > > >> 2. -> All downstream windows -> +1, but there are situations; a > caveat > > > >> could be "only to operators that implement control tuple > > > >> interface/listeners", which could effectively translates to "all > > > >>
Re: [DISCUSSION] Custom Control Tuples
I will vote for approach 1. First of all that one sounds easier to do to me. And I think idempotency is important. It may run at the cost of higher latency but I think it is ok And in addition, when in the future if users do need realtime control tuple processing, we can always add the option on top of it. So I vote for 1 Thanks, Siyuan On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi wrote: > As a rule of thumb in any real time operating system, control tuples should > always be handled using Priority Queues. > > We may try to control priorities by defining levels. And shall not > be delivered at window boundaries. > > In short, control tuples shall never be treated as any other tuples in real > time systems. > > On Thursday, November 3, 2016, David Yan wrote: > > > Hi all, > > > > I would like to renew the discussion of control tuples. > > > > Last time, we were in a debate about whether: > > > > 1) the platform should enforce that control tuples are delivered at > window > > boundaries only > > > > or: > > > > 2) the platform should deliver control tuples just as other tuples and > it's > > the operator developers' choice whether to handle the control tuples as > > they arrive or delay the processing till the next window boundary. > > > > To summarize the pros and cons: > > > > Approach 1: If processing control tuples results in changes of the > behavior > > of the operator, if idempotency needs to be preserved, the processing > must > > be done at window boundaries. This approach will save the operator > > developers headache to ensure that. However, this will take away the > > choices from the operator developer if they just need to process the > > control tuples as soon as possible. > > > > Approach 2: The operator has a chance to immediately process control > > tuples. This would be useful if latency is more valued than correctness. > > However, if this would open the possibility for operator developers to > > shoot themselves in the foot. This is especially true if there are > multiple > > input ports. as there is no easy way to guarantee processing order for > > multiple input ports. > > > > We would like to arrive to a consensus and close this discussion soon > this > > time so we can start the work on this important feature. > > > > Thanks! > > > > David > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov > > > > wrote: > > > > > It is not clear how operator will emit custom control tuple at window > > > boundaries. One way is to cache/accumulate control tuples in the > operator > > > output port till window closes (END_WINDOW is inserted into the output > > > sink) or only allow an operator to emit control tuples inside the > > > endWindow(). The later is a slight variation of the operator output > port > > > caching behavior with the only difference that now the operator itself > is > > > responsible for caching/accumulating control tuples. Note that in many > > > cases it will be necessary to postpone emitting payload tuples that > > > logically come after the custom control tuple till the next window > > begins. > > > > > > IMO, that too restrictive and in a case where input operator uses a > push > > > instead of a poll (for example, it provides an end point where remote > > > agents may connect and publish/push data), control tuples may be used > for > > > connect/disconnect/watermark broadcast to (partitioned) downstream > > > operators. In this case the platform just need to guarantee order > barrier > > > (any tuple emitted prior to a control tuple needs to be delivered prior > > to > > > the control tuple). > > > > > > Thank you, > > > > > > Vlad > > > > > > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > > > >> I agree with David. Allowing control tuples within a window (along > with > > >> data tuples) creates very dangerous situation where guarantees are > > >> impacted. It is much safer to enable control tuples (send/receive) at > > >> window boundaries (after END_WINDOW of window N, and before > BEGIN_WINDOW > > >> for window N+1). My take on David's list is > > >> > > >> 1. -> window boundaries -> Strong +1; there will be a big issue with > > >> guarantees for operators with multiple ports. (see Thomas's response) > > >> 2. -> All downstream windows -> +1, but there are situations; a caveat > > >> could be "only to operators that implement control tuple > > >> interface/listeners", which could effectively translates to "all > > >> interested > > >> downstream operators" > > >> 3. Only Input operator can create control tuples -> -1; is restrictive > > >> even > > >> though most likely 95% of the time it will be input operators > > >> > > >> Thks, > > >> Amol > > >> > > >> > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise > > > > >> wrote: > > >> > > >> The windowing we discuss here is in general event time based, arrival > > time > > >>> is a special case of it. > > >>> > > >>> I don't think state changes can be made independent of the streaming > > >>> window
Re: [DISCUSSION] Custom Control Tuples
Pramod, To answer your questions, the control tuples will be delivered to all downstream partitions, and an additional emitControl method (actual name TBD) can be added to DefaultOutputPort without breaking backward compatibility. Also, to clarify, each operator should have the ability to block further propagation of incoming control tuples, and create control tuples on its own (by calling emitControl). David On Wed, Nov 2, 2016 at 1:25 PM, Pramod Immaneni wrote: > I like approach 2 as it gives more flexibility and also allows for > low-latency options. I think the following are important as well. > > 1. Delivering control tuples to all downstream partitions. > 2. What mechanism will the operator developer use to send the control > tuple? Will it be an additional mehod on the output port? > > Thanks > > On Wed, Nov 2, 2016 at 1:16 PM, David Yan wrote: > > > Hi all, > > > > I would like to renew the discussion of control tuples. > > > > Last time, we were in a debate about whether: > > > > 1) the platform should enforce that control tuples are delivered at > window > > boundaries only > > > > or: > > > > 2) the platform should deliver control tuples just as other tuples and > it's > > the operator developers' choice whether to handle the control tuples as > > they arrive or delay the processing till the next window boundary. > > > > To summarize the pros and cons: > > > > Approach 1: If processing control tuples results in changes of the > behavior > > of the operator, if idempotency needs to be preserved, the processing > must > > be done at window boundaries. This approach will save the operator > > developers headache to ensure that. However, this will take away the > > choices from the operator developer if they just need to process the > > control tuples as soon as possible. > > > > Approach 2: The operator has a chance to immediately process control > > tuples. This would be useful if latency is more valued than correctness. > > However, if this would open the possibility for operator developers to > > shoot themselves in the foot. This is especially true if there are > multiple > > input ports. as there is no easy way to guarantee processing order for > > multiple input ports. > > > > We would like to arrive to a consensus and close this discussion soon > this > > time so we can start the work on this important feature. > > > > Thanks! > > > > David > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov > > wrote: > > > > > It is not clear how operator will emit custom control tuple at window > > > boundaries. One way is to cache/accumulate control tuples in the > operator > > > output port till window closes (END_WINDOW is inserted into the output > > > sink) or only allow an operator to emit control tuples inside the > > > endWindow(). The later is a slight variation of the operator output > port > > > caching behavior with the only difference that now the operator itself > is > > > responsible for caching/accumulating control tuples. Note that in many > > > cases it will be necessary to postpone emitting payload tuples that > > > logically come after the custom control tuple till the next window > > begins. > > > > > > IMO, that too restrictive and in a case where input operator uses a > push > > > instead of a poll (for example, it provides an end point where remote > > > agents may connect and publish/push data), control tuples may be used > for > > > connect/disconnect/watermark broadcast to (partitioned) downstream > > > operators. In this case the platform just need to guarantee order > barrier > > > (any tuple emitted prior to a control tuple needs to be delivered prior > > to > > > the control tuple). > > > > > > Thank you, > > > > > > Vlad > > > > > > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > > > >> I agree with David. Allowing control tuples within a window (along > with > > >> data tuples) creates very dangerous situation where guarantees are > > >> impacted. It is much safer to enable control tuples (send/receive) at > > >> window boundaries (after END_WINDOW of window N, and before > BEGIN_WINDOW > > >> for window N+1). My take on David's list is > > >> > > >> 1. -> window boundaries -> Strong +1; there will be a big issue with > > >> guarantees for operators with multiple ports. (see Thomas's response) > > >> 2. -> All downstream windows -> +1, but there are situations; a caveat > > >> could be "only to operators that implement control tuple > > >> interface/listeners", which could effectively translates to "all > > >> interested > > >> downstream operators" > > >> 3. Only Input operator can create control tuples -> -1; is restrictive > > >> even > > >> though most likely 95% of the time it will be input operators > > >> > > >> Thks, > > >> Amol > > >> > > >> > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise > > > >> wrote: > > >> > > >> The windowing we discuss here is in general event time based, arrival > > time > > >>> is a special case of it. > > >>> > > >>
Re: [DISCUSSION] Custom Control Tuples
A feature that incurs risk with processing order, and more so with idempotency is a big enough reason to worry about with option 2. Is there is a critical use case that needs this feature? Thks Amol On Wed, Nov 2, 2016 at 1:25 PM, Pramod Immaneni wrote: > I like approach 2 as it gives more flexibility and also allows for > low-latency options. I think the following are important as well. > > 1. Delivering control tuples to all downstream partitions. > 2. What mechanism will the operator developer use to send the control > tuple? Will it be an additional mehod on the output port? > > Thanks > > On Wed, Nov 2, 2016 at 1:16 PM, David Yan wrote: > > > Hi all, > > > > I would like to renew the discussion of control tuples. > > > > Last time, we were in a debate about whether: > > > > 1) the platform should enforce that control tuples are delivered at > window > > boundaries only > > > > or: > > > > 2) the platform should deliver control tuples just as other tuples and > it's > > the operator developers' choice whether to handle the control tuples as > > they arrive or delay the processing till the next window boundary. > > > > To summarize the pros and cons: > > > > Approach 1: If processing control tuples results in changes of the > behavior > > of the operator, if idempotency needs to be preserved, the processing > must > > be done at window boundaries. This approach will save the operator > > developers headache to ensure that. However, this will take away the > > choices from the operator developer if they just need to process the > > control tuples as soon as possible. > > > > Approach 2: The operator has a chance to immediately process control > > tuples. This would be useful if latency is more valued than correctness. > > However, if this would open the possibility for operator developers to > > shoot themselves in the foot. This is especially true if there are > multiple > > input ports. as there is no easy way to guarantee processing order for > > multiple input ports. > > > > We would like to arrive to a consensus and close this discussion soon > this > > time so we can start the work on this important feature. > > > > Thanks! > > > > David > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov > > wrote: > > > > > It is not clear how operator will emit custom control tuple at window > > > boundaries. One way is to cache/accumulate control tuples in the > operator > > > output port till window closes (END_WINDOW is inserted into the output > > > sink) or only allow an operator to emit control tuples inside the > > > endWindow(). The later is a slight variation of the operator output > port > > > caching behavior with the only difference that now the operator itself > is > > > responsible for caching/accumulating control tuples. Note that in many > > > cases it will be necessary to postpone emitting payload tuples that > > > logically come after the custom control tuple till the next window > > begins. > > > > > > IMO, that too restrictive and in a case where input operator uses a > push > > > instead of a poll (for example, it provides an end point where remote > > > agents may connect and publish/push data), control tuples may be used > for > > > connect/disconnect/watermark broadcast to (partitioned) downstream > > > operators. In this case the platform just need to guarantee order > barrier > > > (any tuple emitted prior to a control tuple needs to be delivered prior > > to > > > the control tuple). > > > > > > Thank you, > > > > > > Vlad > > > > > > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > > > >> I agree with David. Allowing control tuples within a window (along > with > > >> data tuples) creates very dangerous situation where guarantees are > > >> impacted. It is much safer to enable control tuples (send/receive) at > > >> window boundaries (after END_WINDOW of window N, and before > BEGIN_WINDOW > > >> for window N+1). My take on David's list is > > >> > > >> 1. -> window boundaries -> Strong +1; there will be a big issue with > > >> guarantees for operators with multiple ports. (see Thomas's response) > > >> 2. -> All downstream windows -> +1, but there are situations; a caveat > > >> could be "only to operators that implement control tuple > > >> interface/listeners", which could effectively translates to "all > > >> interested > > >> downstream operators" > > >> 3. Only Input operator can create control tuples -> -1; is restrictive > > >> even > > >> though most likely 95% of the time it will be input operators > > >> > > >> Thks, > > >> Amol > > >> > > >> > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise > > > >> wrote: > > >> > > >> The windowing we discuss here is in general event time based, arrival > > time > > >>> is a special case of it. > > >>> > > >>> I don't think state changes can be made independent of the streaming > > >>> window > > >>> boundary as it would prevent idempotent processing and transitively > > >>> exactly > > >>> once. For that to work, tuples
[jira] [Commented] (APEXMALHAR-2321) Improve Buckets memory management
[ https://issues.apache.org/jira/browse/APEXMALHAR-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630594#comment-15630594 ] ASF GitHub Bot commented on APEXMALHAR-2321: GitHub user brightchen opened a pull request: https://github.com/apache/apex-malhar/pull/481 APEXMALHAR-2321 #resolve #comment Improve Buckets memory management You can merge this pull request into a Git repository by running: $ git pull https://github.com/brightchen/apex-malhar APEXMALHAR-2321 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #481 commit d16ce8b5f6364a3562a9e5c94dfb7763597ca8a0 Author: brightchen Date: 2016-11-01T21:30:55Z APEXMALHAR-2321 #resolve #comment Improve Buckets memory management > Improve Buckets memory management > - > > Key: APEXMALHAR-2321 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2321 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: bright chen >Assignee: bright chen > > Currently buckets were managed as an array. Each bucket have memory > limitation, and free memory will be triggered if the bucket memory usage over > the limitation. > - For ManagedTimeUnifiedStateImpl, the default bucket number is 345600, which > probably too large. But it can be changed by set > Context.OperatorContext.APPLICATION_WINDOW_COUNT > - The default maxMemorySize is zero. It's better to give a default reasonable > value to avoid too much garbage collection -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #481: APEXMALHAR-2321 #resolve #comment Improve Buc...
GitHub user brightchen opened a pull request: https://github.com/apache/apex-malhar/pull/481 APEXMALHAR-2321 #resolve #comment Improve Buckets memory management You can merge this pull request into a Git repository by running: $ git pull https://github.com/brightchen/apex-malhar APEXMALHAR-2321 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #481 commit d16ce8b5f6364a3562a9e5c94dfb7763597ca8a0 Author: brightchen Date: 2016-11-01T21:30:55Z APEXMALHAR-2321 #resolve #comment Improve Buckets memory management --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSSION] Custom Control Tuples
As a rule of thumb in any real time operating system, control tuples should always be handled using Priority Queues. We may try to control priorities by defining levels. And shall not be delivered at window boundaries. In short, control tuples shall never be treated as any other tuples in real time systems. On Thursday, November 3, 2016, David Yan wrote: > Hi all, > > I would like to renew the discussion of control tuples. > > Last time, we were in a debate about whether: > > 1) the platform should enforce that control tuples are delivered at window > boundaries only > > or: > > 2) the platform should deliver control tuples just as other tuples and it's > the operator developers' choice whether to handle the control tuples as > they arrive or delay the processing till the next window boundary. > > To summarize the pros and cons: > > Approach 1: If processing control tuples results in changes of the behavior > of the operator, if idempotency needs to be preserved, the processing must > be done at window boundaries. This approach will save the operator > developers headache to ensure that. However, this will take away the > choices from the operator developer if they just need to process the > control tuples as soon as possible. > > Approach 2: The operator has a chance to immediately process control > tuples. This would be useful if latency is more valued than correctness. > However, if this would open the possibility for operator developers to > shoot themselves in the foot. This is especially true if there are multiple > input ports. as there is no easy way to guarantee processing order for > multiple input ports. > > We would like to arrive to a consensus and close this discussion soon this > time so we can start the work on this important feature. > > Thanks! > > David > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov > > wrote: > > > It is not clear how operator will emit custom control tuple at window > > boundaries. One way is to cache/accumulate control tuples in the operator > > output port till window closes (END_WINDOW is inserted into the output > > sink) or only allow an operator to emit control tuples inside the > > endWindow(). The later is a slight variation of the operator output port > > caching behavior with the only difference that now the operator itself is > > responsible for caching/accumulating control tuples. Note that in many > > cases it will be necessary to postpone emitting payload tuples that > > logically come after the custom control tuple till the next window > begins. > > > > IMO, that too restrictive and in a case where input operator uses a push > > instead of a poll (for example, it provides an end point where remote > > agents may connect and publish/push data), control tuples may be used for > > connect/disconnect/watermark broadcast to (partitioned) downstream > > operators. In this case the platform just need to guarantee order barrier > > (any tuple emitted prior to a control tuple needs to be delivered prior > to > > the control tuple). > > > > Thank you, > > > > Vlad > > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > >> I agree with David. Allowing control tuples within a window (along with > >> data tuples) creates very dangerous situation where guarantees are > >> impacted. It is much safer to enable control tuples (send/receive) at > >> window boundaries (after END_WINDOW of window N, and before BEGIN_WINDOW > >> for window N+1). My take on David's list is > >> > >> 1. -> window boundaries -> Strong +1; there will be a big issue with > >> guarantees for operators with multiple ports. (see Thomas's response) > >> 2. -> All downstream windows -> +1, but there are situations; a caveat > >> could be "only to operators that implement control tuple > >> interface/listeners", which could effectively translates to "all > >> interested > >> downstream operators" > >> 3. Only Input operator can create control tuples -> -1; is restrictive > >> even > >> though most likely 95% of the time it will be input operators > >> > >> Thks, > >> Amol > >> > >> > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise > > >> wrote: > >> > >> The windowing we discuss here is in general event time based, arrival > time > >>> is a special case of it. > >>> > >>> I don't think state changes can be made independent of the streaming > >>> window > >>> boundary as it would prevent idempotent processing and transitively > >>> exactly > >>> once. For that to work, tuples need to be presented to the operator in > a > >>> guaranteed order *within* the streaming window, which is not possible > >>> with > >>> multiple ports (and partitions). > >>> > >>> Thomas > >>> > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan > > >>> wrote: > >>> > >>> I think for session tracking, if the session boundaries are allowed to > be > not aligned with the streaming window boundaries, the user will have a > > >>> much > >>> > bigger problem with idempotency. And in most cases, session tracking
Re: [DISCUSSION] Custom Control Tuples
I like approach 2 as it gives more flexibility and also allows for low-latency options. I think the following are important as well. 1. Delivering control tuples to all downstream partitions. 2. What mechanism will the operator developer use to send the control tuple? Will it be an additional mehod on the output port? Thanks On Wed, Nov 2, 2016 at 1:16 PM, David Yan wrote: > Hi all, > > I would like to renew the discussion of control tuples. > > Last time, we were in a debate about whether: > > 1) the platform should enforce that control tuples are delivered at window > boundaries only > > or: > > 2) the platform should deliver control tuples just as other tuples and it's > the operator developers' choice whether to handle the control tuples as > they arrive or delay the processing till the next window boundary. > > To summarize the pros and cons: > > Approach 1: If processing control tuples results in changes of the behavior > of the operator, if idempotency needs to be preserved, the processing must > be done at window boundaries. This approach will save the operator > developers headache to ensure that. However, this will take away the > choices from the operator developer if they just need to process the > control tuples as soon as possible. > > Approach 2: The operator has a chance to immediately process control > tuples. This would be useful if latency is more valued than correctness. > However, if this would open the possibility for operator developers to > shoot themselves in the foot. This is especially true if there are multiple > input ports. as there is no easy way to guarantee processing order for > multiple input ports. > > We would like to arrive to a consensus and close this discussion soon this > time so we can start the work on this important feature. > > Thanks! > > David > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov > wrote: > > > It is not clear how operator will emit custom control tuple at window > > boundaries. One way is to cache/accumulate control tuples in the operator > > output port till window closes (END_WINDOW is inserted into the output > > sink) or only allow an operator to emit control tuples inside the > > endWindow(). The later is a slight variation of the operator output port > > caching behavior with the only difference that now the operator itself is > > responsible for caching/accumulating control tuples. Note that in many > > cases it will be necessary to postpone emitting payload tuples that > > logically come after the custom control tuple till the next window > begins. > > > > IMO, that too restrictive and in a case where input operator uses a push > > instead of a poll (for example, it provides an end point where remote > > agents may connect and publish/push data), control tuples may be used for > > connect/disconnect/watermark broadcast to (partitioned) downstream > > operators. In this case the platform just need to guarantee order barrier > > (any tuple emitted prior to a control tuple needs to be delivered prior > to > > the control tuple). > > > > Thank you, > > > > Vlad > > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > >> I agree with David. Allowing control tuples within a window (along with > >> data tuples) creates very dangerous situation where guarantees are > >> impacted. It is much safer to enable control tuples (send/receive) at > >> window boundaries (after END_WINDOW of window N, and before BEGIN_WINDOW > >> for window N+1). My take on David's list is > >> > >> 1. -> window boundaries -> Strong +1; there will be a big issue with > >> guarantees for operators with multiple ports. (see Thomas's response) > >> 2. -> All downstream windows -> +1, but there are situations; a caveat > >> could be "only to operators that implement control tuple > >> interface/listeners", which could effectively translates to "all > >> interested > >> downstream operators" > >> 3. Only Input operator can create control tuples -> -1; is restrictive > >> even > >> though most likely 95% of the time it will be input operators > >> > >> Thks, > >> Amol > >> > >> > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise > >> wrote: > >> > >> The windowing we discuss here is in general event time based, arrival > time > >>> is a special case of it. > >>> > >>> I don't think state changes can be made independent of the streaming > >>> window > >>> boundary as it would prevent idempotent processing and transitively > >>> exactly > >>> once. For that to work, tuples need to be presented to the operator in > a > >>> guaranteed order *within* the streaming window, which is not possible > >>> with > >>> multiple ports (and partitions). > >>> > >>> Thomas > >>> > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan > >>> wrote: > >>> > >>> I think for session tracking, if the session boundaries are allowed to > be > not aligned with the streaming window boundaries, the user will have a > > >>> much > >>> > bigger problem with idempotency. And in most cases, ses
Re: [DISCUSSION] Custom Control Tuples
Hi all, I would like to renew the discussion of control tuples. Last time, we were in a debate about whether: 1) the platform should enforce that control tuples are delivered at window boundaries only or: 2) the platform should deliver control tuples just as other tuples and it's the operator developers' choice whether to handle the control tuples as they arrive or delay the processing till the next window boundary. To summarize the pros and cons: Approach 1: If processing control tuples results in changes of the behavior of the operator, if idempotency needs to be preserved, the processing must be done at window boundaries. This approach will save the operator developers headache to ensure that. However, this will take away the choices from the operator developer if they just need to process the control tuples as soon as possible. Approach 2: The operator has a chance to immediately process control tuples. This would be useful if latency is more valued than correctness. However, if this would open the possibility for operator developers to shoot themselves in the foot. This is especially true if there are multiple input ports. as there is no easy way to guarantee processing order for multiple input ports. We would like to arrive to a consensus and close this discussion soon this time so we can start the work on this important feature. Thanks! David On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov wrote: > It is not clear how operator will emit custom control tuple at window > boundaries. One way is to cache/accumulate control tuples in the operator > output port till window closes (END_WINDOW is inserted into the output > sink) or only allow an operator to emit control tuples inside the > endWindow(). The later is a slight variation of the operator output port > caching behavior with the only difference that now the operator itself is > responsible for caching/accumulating control tuples. Note that in many > cases it will be necessary to postpone emitting payload tuples that > logically come after the custom control tuple till the next window begins. > > IMO, that too restrictive and in a case where input operator uses a push > instead of a poll (for example, it provides an end point where remote > agents may connect and publish/push data), control tuples may be used for > connect/disconnect/watermark broadcast to (partitioned) downstream > operators. In this case the platform just need to guarantee order barrier > (any tuple emitted prior to a control tuple needs to be delivered prior to > the control tuple). > > Thank you, > > Vlad > > > > On 6/27/16 19:36, Amol Kekre wrote: > >> I agree with David. Allowing control tuples within a window (along with >> data tuples) creates very dangerous situation where guarantees are >> impacted. It is much safer to enable control tuples (send/receive) at >> window boundaries (after END_WINDOW of window N, and before BEGIN_WINDOW >> for window N+1). My take on David's list is >> >> 1. -> window boundaries -> Strong +1; there will be a big issue with >> guarantees for operators with multiple ports. (see Thomas's response) >> 2. -> All downstream windows -> +1, but there are situations; a caveat >> could be "only to operators that implement control tuple >> interface/listeners", which could effectively translates to "all >> interested >> downstream operators" >> 3. Only Input operator can create control tuples -> -1; is restrictive >> even >> though most likely 95% of the time it will be input operators >> >> Thks, >> Amol >> >> >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise >> wrote: >> >> The windowing we discuss here is in general event time based, arrival time >>> is a special case of it. >>> >>> I don't think state changes can be made independent of the streaming >>> window >>> boundary as it would prevent idempotent processing and transitively >>> exactly >>> once. For that to work, tuples need to be presented to the operator in a >>> guaranteed order *within* the streaming window, which is not possible >>> with >>> multiple ports (and partitions). >>> >>> Thomas >>> >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan >>> wrote: >>> >>> I think for session tracking, if the session boundaries are allowed to be not aligned with the streaming window boundaries, the user will have a >>> much >>> bigger problem with idempotency. And in most cases, session tracking is event time based, not ingression time or processing time based, so this >>> may >>> never be a problem. But if that ever happens, the user can always alter >>> the >>> default 500ms width. David On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov wrote: Ability to send custom control tuples within window may be useful, for > example, for sessions tracking, where session boundaries are not > aligned >>> with window boundaries and 500 ms latency is not acceptable for an > application. > > Thank you, >>>
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630269#comment-15630269 ] Pramod Immaneni commented on APEXCORE-570: -- For the inter-process case, if the downstream operator is slower than the speed of spooling then it causes the upstream to pull ahead. This is the scenario I am facing and working on addressing. It cannot be controlled with an absolute capacity limit (memory+spool) because of fault tolerance older windows are needed till committed, it would have to be a window difference, how much ahead the upstream operator is allowed to go from the minimum of all downstream operators before breaks are applied on its publishing. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630072#comment-15630072 ] Thomas Weise commented on APEXCORE-570: --- >From the JIRA description, this sounds like a combination of backpressure with >configurable limit on the queue. For the intra-process case the queue capacity >can be set. For buffer server, it will spill over to disk, hence the upstream >operator will produce at the speed of spooling. Are you thinking of a capacity >limit here? > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2274) AbstractFileInputOperator gets killed when there are a large number of files.
[ https://issues.apache.org/jira/browse/APEXMALHAR-2274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15629893#comment-15629893 ] Matt Zhang commented on APEXMALHAR-2274: The scanner in FileSplitterInput is more complicate. It retrieves the file status and supports regex. In our case we only need a light weight process to get the paths for all the files in directory. So from performance view it's better to use a dedicated lightweight process. > AbstractFileInputOperator gets killed when there are a large number of files. > - > > Key: APEXMALHAR-2274 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2274 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Munagala V. Ramanath >Assignee: Matt Zhang > > When there are a large number of files in the monitored directory, the call > to DirectoryScanner.scan() can take a long time since it calls > FileSystem.listStatus() which returns the entire list. Meanwhile, the > AppMaster deems this operator hung and restarts it which again results in the > same problem. > It should use FileSystem.listStatusIterator() [in Hadoop 2.7.X] or > FileSystem.listFiles() [in 2.6.X] or other similar calls that return > a remote iterator to limit the number files processed in a single call. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (APEXMALHAR-2326) Failures in SQS unit tests
Sanjay M Pujare created APEXMALHAR-2326: --- Summary: Failures in SQS unit tests Key: APEXMALHAR-2326 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2326 Project: Apache Apex Malhar Issue Type: Bug Components: adapters message bus Reporter: Sanjay M Pujare Assignee: Sanjay M Pujare Priority: Minor Currently SQS unit tests fail due to invalid AWS/SQS credentials. See com.datatorrent.lib.io.jms.SQSStringInputOperatorTest -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #480: [APEXMALHAR-2220] Move the FunctionOperator t...
GitHub user d9liang opened a pull request: https://github.com/apache/apex-malhar/pull/480 [APEXMALHAR-2220] Move the FunctionOperator to Malhar library Merge function operators under org.apache.apex.malhar.stream.api.operator and function interface under org.apache.apex.malhar.stream.api.function into org.apache.apex.malhar.lib.function. You can merge this pull request into a Git repository by running: $ git pull https://github.com/d9liang/apex-malhar APEXMALHAR-2220 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #480 commit 59d5d701fb9acc2d2b03f0338d5b7a22beee5764 Author: Dongming Liang Date: 2016-11-02T16:48:46Z Move the FunctionOperator to Malhar library --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2220) Move the FunctionOperator to Malhar library
[ https://issues.apache.org/jira/browse/APEXMALHAR-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15629562#comment-15629562 ] ASF GitHub Bot commented on APEXMALHAR-2220: GitHub user d9liang opened a pull request: https://github.com/apache/apex-malhar/pull/480 [APEXMALHAR-2220] Move the FunctionOperator to Malhar library Merge function operators under org.apache.apex.malhar.stream.api.operator and function interface under org.apache.apex.malhar.stream.api.function into org.apache.apex.malhar.lib.function. You can merge this pull request into a Git repository by running: $ git pull https://github.com/d9liang/apex-malhar APEXMALHAR-2220 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #480 commit 59d5d701fb9acc2d2b03f0338d5b7a22beee5764 Author: Dongming Liang Date: 2016-11-02T16:48:46Z Move the FunctionOperator to Malhar library > Move the FunctionOperator to Malhar library > --- > > Key: APEXMALHAR-2220 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2220 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Siyuan Hua >Assignee: Dongming Liang > > FunctionOperator initially is just designed for high-level API and we think > it can also useful if people want to build stateless transformation and work > with other operator directly. FunctionOperator can be reused. Thus we should > move FO to malhar library -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (APEXMALHAR-2302) Exposing few properties of FSSplitter and BlockReader operators to FSRecordReaderModule to tune Application
[ https://issues.apache.org/jira/browse/APEXMALHAR-2302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogi Devendra resolved APEXMALHAR-2302. --- Resolution: Fixed Fix Version/s: 3.6.0 > Exposing few properties of FSSplitter and BlockReader operators to > FSRecordReaderModule to tune Application > --- > > Key: APEXMALHAR-2302 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2302 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Deepak Narkhede >Assignee: Deepak Narkhede >Priority: Minor > Fix For: 3.6.0 > > > Exposing the blockSize property of FSSplitter operator to > FSRecordReaderModule. This will help end users to tune the blockSize value > based on application needs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #457: APEXMALHAR-2302 Exposing few properties of FS...
Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/457 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2302) Exposing few properties of FSSplitter and BlockReader operators to FSRecordReaderModule to tune Application
[ https://issues.apache.org/jira/browse/APEXMALHAR-2302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15629394#comment-15629394 ] ASF GitHub Bot commented on APEXMALHAR-2302: Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/457 > Exposing few properties of FSSplitter and BlockReader operators to > FSRecordReaderModule to tune Application > --- > > Key: APEXMALHAR-2302 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2302 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Deepak Narkhede >Assignee: Deepak Narkhede >Priority: Minor > Fix For: 3.6.0 > > > Exposing the blockSize property of FSSplitter operator to > FSRecordReaderModule. This will help end users to tune the blockSize value > based on application needs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2325) Same block id is emitting from FSInputModule
[ https://issues.apache.org/jira/browse/APEXMALHAR-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628804#comment-15628804 ] ASF GitHub Bot commented on APEXMALHAR-2325: GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/479 APEXMALHAR-2325 1) Set the file system default block size to the reader. 2) Set the block size to the reader context You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2325-SameBlockID Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #479 commit f06e3d8b79edbcdcd6c084e989a6bcbc87e210f9 Author: chaitanya Date: 2016-11-02T12:27:43Z APEXMALHAR-2325 1) Set the file system default block size to the reader. 2) Set the block size to the reader context > Same block id is emitting from FSInputModule > > > Key: APEXMALHAR-2325 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2325 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya >Priority: Minor > > Observation: Mismatch the block size between the filesplitter and block > reader in FSInput Module. > Default block size in block reader = conf.getLong("fs.local.block.size" ) > i.e, Local file system block size. > Default block size in filesplitter = fs.getDefaultBlockSize(File Path) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #479: APEXMALHAR-2325 1) Set the file system defaul...
GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/479 APEXMALHAR-2325 1) Set the file system default block size to the reader. 2) Set the block size to the reader context You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2325-SameBlockID Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #479 commit f06e3d8b79edbcdcd6c084e989a6bcbc87e210f9 Author: chaitanya Date: 2016-11-02T12:27:43Z APEXMALHAR-2325 1) Set the file system default block size to the reader. 2) Set the block size to the reader context --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (APEXMALHAR-2325) Same block id is emitting from FSInputModule
Chaitanya created APEXMALHAR-2325: - Summary: Same block id is emitting from FSInputModule Key: APEXMALHAR-2325 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2325 Project: Apache Apex Malhar Issue Type: Bug Reporter: Chaitanya Assignee: Chaitanya Priority: Minor Observation: Mismatch the block size between the filesplitter and block reader in FSInput Module. Default block size in block reader = conf.getLong("fs.local.block.size" ) i.e, Local file system block size. Default block size in filesplitter = fs.getDefaultBlockSize(File Path) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628460#comment-15628460 ] Pramod Immaneni commented on APEXCORE-570: -- Here is an example on how to do this from within the application https://github.com/PramodSSImmaneni/throttle In this JIRA the idea is to do this within the engine in the bufferserver itself to allow a configurable option to limit how far the upstream operator can get ahead of the downstream operator in terms of number of windows before it is blocked. Once the downstream catches up the upstream operator data will be unblocked. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
Pramod Immaneni created APEXCORE-570: Summary: Prevent upstream operators from getting too far ahead when downstream operators are slow Key: APEXCORE-570 URL: https://issues.apache.org/jira/browse/APEXCORE-570 Project: Apache Apex Core Issue Type: Improvement Reporter: Pramod Immaneni Assignee: Pramod Immaneni If the downstream operators are slower than upstream operators then the upstream operators will get ahead and the gap can continue to increase. Provide an option to slow down or temporarily pause the upstream operators when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [jira] [Commented] (APEXMALHAR-2303) S3 Line By Line Module
Hi Apex Dev Community, For Fixed Width S3 record Reader, the input is the block metadata containing the block offset and the block length. The length of the block may not be a factor of the length of the record. (For eg, block length can be 1MB, record length can be 23 bytes) Hence, the first byte in the block may belong to a record starting in the previous block. Similarly, the last record may not have all its bytes in this block and may spill over to next block. Since the record is fixed width, we can make some optimization in the way data is fetched from S3. We can change the start offset and end offset so that we fetch data from S3 such that records are also aligned and do not span multiple blocks. While retriving the block, we will retrive from X upto Y where *X is the startbyte of a record whose first byte in current block* *Y is the endbyte of the last record which exists in the current block* *startOffset = block.startOffset + (recordLength - block.startOffset % recordLength) % recordLength* endOffset = *block.endOffset + (recordLength - block.endOffset % recordLength) % recordLength - 1* This will ensure no multiple get requests to fetch entire record and also ensure no extra bytes are read from S3. Kindly let me know your views.alternative approaches for the same. *Regards* *Ajay* On Tue, Nov 1, 2016 at 2:16 PM, ASF GitHub Bot (JIRA) wrote: > > [ https://issues.apache.org/jira/browse/APEXMALHAR-2303? > page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel& > focusedCommentId=15624810#comment-15624810 ] > > ASF GitHub Bot commented on APEXMALHAR-2303: > > > GitHub user ajaygit158 opened a pull request: > > https://github.com/apache/apex-malhar/pull/478 > > APEXMALHAR-2303 Added S3RecordReaderModule for reading records line by > line > > @chaithu14 @yogidevendra Kindly review > > You can merge this pull request into a Git repository by running: > > $ git pull https://github.com/ajaygit158/apex-malhar APEXMALHAR-2303 > > Alternatively you can review and apply these changes as the patch at: > > https://github.com/apache/apex-malhar/pull/478.patch > > To close this pull request, make a commit to your master/trunk branch > with (at least) the following in the commit message: > > This closes #478 > > > commit b999cbd044b370a271ea8265f2b3e4b7be3935bc > Author: Ajay > Date: 2016-10-27T12:57:28Z > > Added S3 Record Reader module > > commit 426f8f6efc838ca754ad6070c3d0110537b1f222 > Author: Ajay > Date: 2016-10-28T13:42:51Z > > Changes to ensure compilation with jdk 1.7 > > commit a2e7d9892e00784b881c53e2d44cff12ceb6abb1 > Author: Ajay > Date: 2016-11-01T08:42:27Z > > Few corrections in S3RecordReader > > > > > > S3 Line By Line Module > > -- > > > > Key: APEXMALHAR-2303 > > URL: https://issues.apache.org/ > jira/browse/APEXMALHAR-2303 > > Project: Apache Apex Malhar > > Issue Type: Bug > >Reporter: Ajay Gupta > >Assignee: Ajay Gupta > > Original Estimate: 336h > > Remaining Estimate: 336h > > > > This is a new module which will consist of 2 operators > > 1) File Splitter -- Already existing in Malhar library > > 2) S3RecordReader -- Read a file from S3 and output the records > (delimited or fixed width) > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) >