One more option: We can keep track of windowId & batchId i.e. we save batchId with windowId when we commit a batch within a window. e.g. we are in window 10 and we have written second batch in window 10, we commit windowId=10 and batchId=2 to DB. While recovery we won't process batches within last window which are marked as committed.
-Priyanka On Tue, Dec 29, 2015 at 3:54 PM, Sandeep Deshmukh <[email protected]> wrote: > Not sure if "At least once" is right behavior for databases. We may not > always have primary key to update or insert. > > > Regards, > Sandeep > > On Tue, Dec 29, 2015 at 2:23 PM, Priyanka Gugale <[email protected]> > wrote: > > > Hi, > > > > Thanks for your inputs Chandni. I guess what you are suggesting is > similar > > to AbstractJdbcNonTransactionableBatchOutputOperator which is batch non > > transactional operation. That is one of the good option. > > > > I am also thinking of a possibility of having "At least once" behavior > with > > Transactional store. In this, we keep committing batches within a window. > > Each batch commit will be a transaction. On recovery we start processing > > from last committed window (don't exclude last committed window, as it > > could be partially written). If the queries are update or insert queries > > using primary key, it shouldn't be a problem if we reply insert/update > > command. It will have same effect on database (of course not applicable > for > > all usecases). Does this look better? > > > > -Priyanka > > > > On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh <[email protected] > > > > wrote: > > > > > Yeah I understand there is a problem that app window size is time based > > > here not number of events based. However I don't think having a max > batch > > > size in this class will help because that causes problems with saving > the > > > tuples exactly once and idempotency. > > > > > > I was just trying to let you know why the batch transactional store is > > how > > > it is. > > > > > > Checkout the non-transactional store output operator > > > (AbstractStoreOutputOperator) and its implementations where window id > is > > > saved with each update. I think having a batch extension of that can > > > achieve what is needed here in a way that the operator will still be > > > fault-tolerant and idempotent. > > > > > > Thanks, > > > Chandni > > > > > > On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar < > > > [email protected]> > > > wrote: > > > > > > > Hi Chandni, > > > > > > > > I totally agree with you that the transactions should be idempotent. > > And > > > > that needs to be taken care of if the batch size is configurable. > > > > > > > > Though, I have a question related to the second part where batch size > > is > > > > controlled by by controlling app window size. > > > > I agree with you that aggregation window is a unit of aggregation > > > provided > > > > by platform. But, if I understand correctly, that is time based. > > > > If I want to aggregate based on number of tuples, would this be > > suitable? > > > > > > > > To give you an example, lets say I have a store on which the > > transaction > > > > size should never exceed 1000 operations. > > > > And as a streaming application, it would be difficult to guess what > > would > > > > be the input rate, hence its not possible to guess how many tuples > will > > > > become part of a single application window. In such case, how can the > > > > application window size can be used to configure transaction batch > > size? > > > > Wouldn't it make more sense to have the control via exact number of > > > tuples? > > > > > > > > Thanks, > > > > Chinmay. > > > > > > > > > > > > ~ Chinmay. > > > > > > > > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh < > > [email protected] > > > > > > > > wrote: > > > > > > > > > Hey Chinmay/Priyanka, > > > > > > > > > > We need to write tuples exactly once in the store. Please address > the > > > > > failure scenarios on how to achieve exactly once and idempotency. I > > > > > mentioned in my previous mail why multiple batches in a window is a > > > > problem > > > > > with exactly once. > > > > > > > > > > Control via app window would mean, tuning the functionality by > > > > controlling > > > > > the platform params. I think it's best one gets option to seperate > > the > > > > > concerns of platform and that of app logic. > > > > > > > > > > Application window is a unit of aggregation. Every operator in a > DAG > > > can > > > > > have different application window which is the support platform > > > provides > > > > > for application logic. > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar < > > > > > [email protected] > > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > Just a thought on how it can possibly be done. > > > > > > > > > > > > The pseudo code might look like this: > > > > > > > > > > > > processTuple() > > > > > > { > > > > > > If(batchSize < configuredBatchSize){ > > > > > > //add to the batch > > > > > > } > > > > > > Else { > > > > > > // process the batch as a transaction > > > > > > // empty the data structure of batch. > > > > > > } > > > > > > } > > > > > > > > > > > > endWindow() > > > > > > { > > > > > > // process the batch as transaction. > > > > > > // empty the data structure of batch. > > > > > > } > > > > > > > > > > > > This way, user can get better/direct control over what > transaction > > > > means. > > > > > > > > > > > > As chandni rightly said, one can reduce the application window > size > > > for > > > > > the > > > > > > operator, and that would reduce the batch size. But that's not > > > > something > > > > > > which looks intuitive from user's perspective. > > > > > > Control via app window would mean, tuning the functionality by > > > > > controlling > > > > > > the platform params. I think it's best one gets option to > seperate > > > the > > > > > > concerns of platform and that of app logic. > > > > > > > > > > > > If one wants to control the batch size, he/she should be able to > do > > > > that > > > > > by > > > > > > just setting the property of batch size(a number), and not by > > > changing > > > > > app > > > > > > window size (an indirect time unit). > > > > > > > > > > > > ~ Chinmay > > > > > > On 28 Dec 2015 22:53, "Chandni Singh" <[email protected]> > > > wrote: > > > > > > > > > > > > > But you will not allow multiple batches in the same window? > > > > > > > Can you please elaborate on failure scenarios and how it > affects > > > > > > > idempotency. > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale < > > > > > > [email protected] > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > Sorry if I was not clear, but I am trying to propose the > > MAX_SIZE > > > > per > > > > > > > > window which the operator could process. The size could be > less > > > > than > > > > > > the > > > > > > > > MAX_SIZE, no restriction about that. > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh < > > > > > > [email protected]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > How do you propose to to restrict the no. of tuples > processed > > > in > > > > an > > > > > > > > > application window < batch size. > > > > > > > > > > > > > > > > > > I don't see a way to enforce that batch size can never be > > less > > > > > tuples > > > > > > > > > processed in an application window. > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale < > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Chandni, > > > > > > > > > > > > > > > > > > > > How about restricting tuples which can be processed per > > > window. > > > > > If > > > > > > > > > someone > > > > > > > > > > wants to process small and frequent batches, he can set > > batch > > > > > size > > > > > > to > > > > > > > > > some > > > > > > > > > > small value and also reduce the window size. This would > > build > > > > > some > > > > > > > back > > > > > > > > > > pressure of course. But that could be acceptable if one > > > really > > > > > want > > > > > > > to > > > > > > > > > > restrict batch size. > > > > > > > > > > The though was triggered while working on Cassandra > output > > > > > > operator. > > > > > > > > > > Cassandra creates problem in processing batches of size > > > greater > > > > > > than > > > > > > > > some > > > > > > > > > > value (don't recall exact number right now). Other > > databases > > > > may > > > > > > want > > > > > > > > to > > > > > > > > > > restrict the batch size for similar or other reasons. > > > > > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh < > > > > > > > > [email protected]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Priyanka, > > > > > > > > > > > > > > > > > > > > > > AbstractBatchTransactionableStore assumes all tuples in > > one > > > > > > > > application > > > > > > > > > > as > > > > > > > > > > > a batch because it needs to store the tuples in the > store > > > > > > > > exactly-once. > > > > > > > > > > > > > > > > > > > > > > If there is more than one batch in an application > window, > > > > then > > > > > to > > > > > > > > store > > > > > > > > > > the > > > > > > > > > > > tuples exactly once the window Id needs to be written > > with > > > > > every > > > > > > > > tuple > > > > > > > > > as > > > > > > > > > > > well which is not that efficient. Therefore we take > > > advantage > > > > > of > > > > > > > the > > > > > > > > > > > transaction support by saving just the window id once > > (not > > > > with > > > > > > > every > > > > > > > > > > > tuple) but this necessitates all the tuples to be > > > considered > > > > > as a > > > > > > > > > batch. > > > > > > > > > > > > > > > > > > > > > > Every operator in a DAG can have its own application > > window > > > > > size. > > > > > > > So > > > > > > > > to > > > > > > > > > > > reduce the size per batch, the application window > > attribute > > > > > needs > > > > > > > to > > > > > > > > be > > > > > > > > > > > modified. > > > > > > > > > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar < > > > > > > > > > > > [email protected]> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > +1 for this. > > > > > > > > > > > > > > > > > > > > > > > > ~ Chinmay. > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale < > > > > > > > > [email protected]> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > In Malhar we have an > > > > > > > > > > > > > operator > > > AbstractBatchTransactionableStoreOutputOperator > > > > > > which > > > > > > > > > > creates > > > > > > > > > > > > > batches based on tuples received in a window. At > the > > > end > > > > of > > > > > > the > > > > > > > > > > window > > > > > > > > > > > > > these batches are sent to database for processing. > > > > > > > > > > > > > There is no way to configure MAX_SIZE on these > > batches. > > > > > Based > > > > > > > on > > > > > > > > > > input > > > > > > > > > > > > rate > > > > > > > > > > > > > the batch sizes can grow very high, and we might > want > > > to > > > > > > > restrict > > > > > > > > > > batch > > > > > > > > > > > > > size. > > > > > > > > > > > > > > > > > > > > > > > > > > Any operator can extend and do batch management on > > > their > > > > > own, > > > > > > > > but I > > > > > > > > > > see > > > > > > > > > > > > it > > > > > > > > > > > > > as generic requirement and IMO we should change > base > > > > class > > > > > > i.e. > > > > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator > class > > > to > > > > > > accept > > > > > > > > > > > MAX_SIZE > > > > > > > > > > > > > for batch from outside. > > > > > > > > > > > > > > > > > > > > > > > > > > Any opinion on this? > > > > > > > > > > > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
