Thanks Priyanka, and Ashwin! On Fri, Dec 18, 2015 at 2:19 AM, Priyanka Gugale <[email protected]> wrote:
> Agree with Tim and Chandni that we should go to disk only when output DB is > not reachable or slow. > As suggested the best approach will be to use combination of > AbstractReconsiler and WAL (spill to disk only when in memory queue size is > reached). > > I can take it up to integrate enhanced reconsiler with DB output operator. > Also can help in using WAL with AbstractReconsiler. > > -Priyanka > > On Fri, Dec 18, 2015 at 4:06 AM, Ashwin Chandra Putta < > [email protected]> wrote: > > > I will send a PR for my first implementation soon. > > > > On Thu, Dec 17, 2015 at 2:34 PM, Timothy Farkas <[email protected]> > > wrote: > > > > > It looks like Ashwin has an initial implementation of a reconciler. > Could > > > we add that to Malhar and add WAL optimizations to it once the WAL is > > added > > > to Malhar? > > > > > > On Thu, Dec 17, 2015 at 1:31 PM, Chandni Singh < > [email protected]> > > > wrote: > > > > > > > Pramod, > > > > > > > > Agreed it can be done using the reconciler and optimizing it but that > > > means > > > > there is some work to be done in Malhar/library. We have a ticket now > > to > > > > address that work. > > > > > > > > Using WAL to spool the tuples is all missing from Malhar/lib which > > means > > > > the user needs to write more code. > > > > > > > > Thanks, > > > > Chandni > > > > > > > > On Thu, Dec 17, 2015 at 1:07 PM, Ashwin Chandra Putta < > > > > [email protected]> wrote: > > > > > > > > > Tim, > > > > > > > > > > I don't think there is an implementation in Malhar yet. I have an > > > > > implementation in my fork that I sent you. > > > > > > > > > > Regards, > > > > > Ashwin. > > > > > > > > > > On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas < > > [email protected]> > > > > > wrote: > > > > > > > > > > > Ashwin is there an implementation of that in Malhar? I could only > > > find > > > > an > > > > > > in memory only version: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java > > > > > > > > > > > > This in memory implementation won't work in this use case since > > > > committed > > > > > > may not be called for hours or a day so data will be held in > memory > > > for > > > > > > some time. > > > > > > > > > > > > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta < > > > > > > [email protected]> wrote: > > > > > > > > > > > > > Tim, > > > > > > > > > > > > > > Are you saying HDFS is slower than a database? :) > > > > > > > > > > > > > > I think Reconciler is the best approach. The tuples need not be > > > > written > > > > > > to > > > > > > > hdfs, they can be queued in memory. You can spool them to hdfs > > only > > > > > when > > > > > > it > > > > > > > reaches the limits of the queue. The reconciler solves a few > > major > > > > > > problems > > > > > > > as you described above. > > > > > > > > > > > > > > 1. Graceful reconnection. When the external system we are > writing > > > to > > > > is > > > > > > > down, the reconciler is spooling the messages to the queue and > > then > > > > to > > > > > > > hdfs. The tuples are written to the external system only after > it > > > is > > > > > back > > > > > > > up again. > > > > > > > 2. Handling surges. There will be cases when the throughput may > > > get a > > > > > > > sudden surge for some period and the external system may not be > > > fast > > > > > > enough > > > > > > > for the writes to it. In those cases, by using reconciler, we > are > > > > > > spooling > > > > > > > the incoming tuples to queue/hdfs and then writing at the pace > of > > > > > > external > > > > > > > system. > > > > > > > 3. Dag slowdown. Again in case of external system failure or > slow > > > > > > > connection, we do not want to block the windows moving forward. > > If > > > > the > > > > > > > windows are blocked for a long time, then stram will > > unnecessarily > > > > kill > > > > > > the > > > > > > > operator. Reconciler makes sure that the incoming messages are > > just > > > > > > > queued/spooled to hdfs (external system is not blocking the > dag), > > > so > > > > > the > > > > > > > dag is not slowed down. > > > > > > > > > > > > > > Regards, > > > > > > > Ashwin. > > > > > > > > > > > > > > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas < > > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > Yes that is true Chandni, and considering how slow HDFS is we > > > > should > > > > > > > avoid > > > > > > > > writing to it if we can. > > > > > > > > > > > > > > > > It would be great if someone could pick up the ticket :). > > > > > > > > > > > > > > > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh < > > > > > > [email protected] > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > +1 for Tim's suggestion. > > > > > > > > > > > > > > > > > > Using reconciler employs always writing to HDFS and then > read > > > > from > > > > > > > that. > > > > > > > > > Tim's suggestion is that we only write to hdfs when > database > > > > > > connection > > > > > > > > is > > > > > > > > > down. This is analogous to spooling. > > > > > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni < > > > > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Tim we have a pattern for this called Reconciler that > > Gaurav > > > > has > > > > > > also > > > > > > > > > > mentioned. There are some examples for it in Malhar > > > > > > > > > > > > > > > > > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas < > > > > > > [email protected] > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > > > > > > > One of our users is outputting to Cassandra, but they > > want > > > to > > > > > > > handle > > > > > > > > a > > > > > > > > > > > Cassandra failure or Cassandra down time gracefully > from > > an > > > > > > output > > > > > > > > > > > operator. Currently a lot of our database operators > will > > > just > > > > > > fail > > > > > > > > and > > > > > > > > > > > redeploy continually until the database comes back. > This > > > is a > > > > > bad > > > > > > > > idea > > > > > > > > > > for > > > > > > > > > > > a couple of reasons: > > > > > > > > > > > > > > > > > > > > > > 1 - We rely on buffer server spooling to prevent data > > loss. > > > > If > > > > > > the > > > > > > > > > > database > > > > > > > > > > > is down for a long time (several hours or a day) we may > > run > > > > out > > > > > > of > > > > > > > > > space > > > > > > > > > > to > > > > > > > > > > > spool for buffer server since it spools to local disk, > > and > > > > data > > > > > > is > > > > > > > > > purged > > > > > > > > > > > only after a window is committed. Furthermore this > buffer > > > > > server > > > > > > > > > problem > > > > > > > > > > > will exist for all the Streaming Containers in the dag, > > not > > > > > just > > > > > > > the > > > > > > > > > one > > > > > > > > > > > immediately upstream from the output operator, since > data > > > is > > > > > > > spooled > > > > > > > > to > > > > > > > > > > > disk for all operators and only removed for windows > once > > a > > > > > window > > > > > > > is > > > > > > > > > > > committed. > > > > > > > > > > > > > > > > > > > > > > 2 - If there is another failure further upstream in the > > > dag, > > > > > > > upstream > > > > > > > > > > > operators will be redeployed to a checkpoint less than > or > > > > equal > > > > > > to > > > > > > > > the > > > > > > > > > > > checkpoint of the database operator in the At leas once > > > case. > > > > > > This > > > > > > > > > could > > > > > > > > > > > mean redoing several hours or a day worth of > computation. > > > > > > > > > > > > > > > > > > > > > > We should support a mechanism to detect when the > > connection > > > > to > > > > > a > > > > > > > > > database > > > > > > > > > > > is lost and then spool to hdfs using a WAL, and then > > write > > > > the > > > > > > > > contents > > > > > > > > > > of > > > > > > > > > > > the WAL into the database once it comes back online. > This > > > > will > > > > > > save > > > > > > > > the > > > > > > > > > > > local disk space of all the nodes used in the dag and > > allow > > > > it > > > > > to > > > > > > > be > > > > > > > > > used > > > > > > > > > > > for only the data being output to the output operator. > > > > > > > > > > > > > > > > > > > > > > Ticket here if anyone is interested in working on it: > > > > > > > > > > > > > > > > > > > > > > https://malhar.atlassian.net/browse/MLHR-1951 > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Tim > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Regards, > > > > > > > Ashwin. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Regards, > > > > > Ashwin. > > > > > > > > > > > > > > > > > > > > -- > > > > Regards, > > Ashwin. > > >
