Also, would it be possible for me to get wiki access so I will be able to update it / etc?
-Cameron On Wed, Nov 16, 2016 at 11:59 AM, Cameron Hatfield <kin...@gmail.com> wrote: > "A couple of questions" is what I originally wrote, and then the following > happened. Sorry about the large swath of them, making sure my understanding > of the code base, as well as the DL/Bookkeeper/ZK ecosystem interaction, > makes sense. > > ==General: > What is an exclusive session? What is it providing over a regular session? > > > ==Proxy side: > Should a new streamop be added for the fencing operation, or does it make > sense to piggyback on an existing one (such as write)? > > ====getLastDLSN: > What should be the return result for: > A new stream > A new session, after successful fencing > A new session, after a change in ownership / first starting up > > What is the main use case for getLastDLSN(<stream>, false)? Is this to > guarantee that the recovery process has happened in case of ownership > failure (I don't have a good understanding of what causes the recovery > process to happen, especially from the reader side)? Or is it to handle the > lost-ack problem? Since all the rest of the read related things go through > the read client, I'm not sure if I see the use case, but it seems like > there would be a large potential for confusion on which to use. What about > just a fenceSession op, that always fences, returning the DLSN of the > fence, and leave the normal getLastDLSN for the regular read client. > > ====Fencing: > When a fence session occurs, what call needs to be made to make sure any > outstanding writes are flushed and committed (so that we guarantee the > client will be able to read anything that was in the write queue)? > Is there a guaranteed ordering for things written in the future queue for > AsyncLogWriter (I'm not quite confident that I was able to accurately > follow the logic, as their are many parts of the code that write, have > queues, heartbeat, etc)? > > ====SessionID: > What is the default sessionid / transactionid for a new stream? I assume > this would just be the first control record > > ======Should all streams have a sessionid by default, regardless if it is > never used by a client (aka, everytime ownership changes, a new control > record is generated, and a sessionid is stored)? > Main edge case that would have to be handled is if a client writes with an > old sessionid, but the owner has changed and has yet to create a sessionid. > This should be handled by the "non-matching sessionid" rule, since the > invalid sessionid wouldn't match the passed sessionid, which should cause > the client to get a new sessionid. > > ======Where in the code does it make sense to own the session, the stream > interfaces / classes? Should they pass that information down to the ops, or > do the sessionid check within? > My first thought would be Stream owns the sessionid, passes it into the > ops (as either a nullable value, or an invalid default value), which then > do the sessionid check if they care. The main issue is updating the > sessionid is a bit backwards, as either every op has the ability to update > it through some type of return value / direct stream access / etc, or there > is a special case in the stream for the fence operation / any other > operation that can update the session. > > ======For "the owner of the log stream will first advance the transaction > id generator to claim a new transaction id and write a control record to > the log stream. ": > Should "DistributedLogConstants.CONTROL_RECORD_CONTENT" be the type of > control record written? > Should the "writeControlRecord" on the BKAsyncLogWriter be exposed in the > AsyncLogWriter interface be exposed? Or even in the one within the segment > writer? Or should the code be duplicated / pulled out into a helper / etc? > (Not a big Java person, so any suggestions on the "Java Way", or at least > the DL way, to do it would be appreciated) > > ======Transaction ID: > The BKLogSegmentWriter ignores the transaction ids from control records > when it records the "LastTXId." Would that be an issue here for anything? > It looks like it may do that because it assumes you're calling it's local > function for writing a controlrecord, which uses the lastTxId. > > > ==Thrift Interface: > ====Should the write response be split out for different calls? > It seems odd to have a single struct with many optional items that are > filled depending on the call made for every rpc call. This is mostly a > curiosity question, since I assume it comes from the general practices from > using thrift for a while. Would it at least make sense for the > getLastDLSN/fence endpoint to have a new struct? > > ====Any particular error code that makes sense for session fenced? If we > want to be close to the HTTP errors, looks like 412 (PRECONDITION FAILED) > might make the most sense, if a bit generic. > > 412 def: > "The precondition given in one or more of the request-header fields > evaluated to false when it was tested on the server. This response code > allows the client to place preconditions on the current resource > metainformation (header field data) and thus prevent the requested method > from being applied to a resource other than the one intended." > > 412 excerpt from the If-match doc: > "This behavior is most useful when the client wants to prevent an updating > method, such as PUT, from modifying a resource that has changed since the > client last retrieved it." > > ====Should we return the sessionid to the client in the "fencesession" > calls? > Seems like it may be useful when you fence, especially if you have some > type of custom sequencer where it would make sense for search, or for > debugging. > Main minus is that it would be easy for users to create an implicit > requirement that the sessionid is forever a valid transactionid, which may > not always be the case long term for the project. > > > ==Client: > ====What is the proposed process for the client retrieving the new > sessionid? > A full reconnect? No special case code, but intrusive on the client side, > and possibly expensive garbage/processing wise. (though this type of > failure should hopefully be rare enough to not be a problem) > A call to reset the sessionid? Less intrusive, all the issues you get with > mutable object methods that need to be called in a certain order, edge > cases such as outstanding/buffered requests to the old stream, etc. > The call could also return the new sessionid, making it a good call for > storing or debugging the value. > > ====Session Fenced failure: > Will this put the client into a failure state, stopping all future writes > until fixed? > Is it even possible to get this error when ownership changes? The > connection to the new owner should get a new sessionid on connect, so I > would expect not. > > Cheers, > Cameron > > On Tue, Nov 15, 2016 at 2:01 AM, Xi Liu <xi.liu....@gmail.com> wrote: > >> Thank you, Cameron. Look forward to your comments. >> >> - Xi >> >> On Sun, Nov 13, 2016 at 1:21 PM, Cameron Hatfield <kin...@gmail.com> >> wrote: >> >> > Sorry, I've been on vacation for the past week, and heads down for a >> > release that is using DL at the end of Nov. I'll take a look at this >> over >> > the next week, and add any relevant comments. After we are finished with >> > dev for this release, I am hoping to tackle this next. >> > >> > -Cameron >> > >> > On Fri, Nov 11, 2016 at 12:07 PM, Sijie Guo <si...@apache.org> wrote: >> > >> > > Xi, >> > > >> > > Thank you so much for your proposal. I took a look. It looks fine to >> me. >> > > Cameron, do you have any comments? >> > > >> > > Look forward to your pull requests. >> > > >> > > - Sijie >> > > >> > > >> > > On Wed, Nov 9, 2016 at 2:34 AM, Xi Liu <xi.liu....@gmail.com> wrote: >> > > >> > > > Cameron, >> > > > >> > > > Have you started any work for this? I just updated the proposal >> page - >> > > > https://cwiki.apache.org/confluence/display/DL/DP-2+-+ >> > > Epoch+Write+Support >> > > > Maybe we can work together with this. >> > > > >> > > > Sijie, Leigh, >> > > > >> > > > can you guys help review this to make sure our proposal is in the >> right >> > > > direction? >> > > > >> > > > - Xi >> > > > >> > > > On Tue, Nov 1, 2016 at 3:05 AM, Sijie Guo <si...@apache.org> wrote: >> > > > >> > > > > I created https://issues.apache.org/jira/browse/DL-63 for >> tracking >> > the >> > > > > proposed idea here. >> > > > > >> > > > > >> > > > > >> > > > > On Wed, Oct 26, 2016 at 4:53 PM, Sijie Guo >> > <sij...@twitter.com.invalid >> > > > >> > > > > wrote: >> > > > > >> > > > > > On Tue, Oct 25, 2016 at 11:30 AM, Cameron Hatfield < >> > kin...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > >> > > > > > > Yes, we are reading the HBase WAL (from their replication >> plugin >> > > > > > support), >> > > > > > > and writing that into DL. >> > > > > > > >> > > > > > >> > > > > > Gotcha. >> > > > > > >> > > > > > >> > > > > > > >> > > > > > > From the sounds of it, yes, it would. Only thing I would say >> is >> > > make >> > > > > the >> > > > > > > epoch requirement optional, so that if I client doesn't care >> > about >> > > > > dupes >> > > > > > > they don't have to deal with the process of getting a new >> epoch. >> > > > > > > >> > > > > > >> > > > > > Yup. This should be optional. I can start a wiki page on how we >> > want >> > > to >> > > > > > implement this. Are you interested in contributing to this? >> > > > > > >> > > > > > >> > > > > > > >> > > > > > > -Cameron >> > > > > > > >> > > > > > > On Wed, Oct 19, 2016 at 7:43 PM, Sijie Guo >> > > > <sij...@twitter.com.invalid >> > > > > > >> > > > > > > wrote: >> > > > > > > >> > > > > > > > On Wed, Oct 19, 2016 at 7:17 PM, Sijie Guo < >> sij...@twitter.com >> > > >> > > > > wrote: >> > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Monday, October 17, 2016, Cameron Hatfield < >> > > kin...@gmail.com> >> > > > > > > wrote: >> > > > > > > > > >> > > > > > > > >> Answer inline: >> > > > > > > > >> >> > > > > > > > >> On Mon, Oct 17, 2016 at 11:46 AM, Sijie Guo < >> > si...@apache.org >> > > > >> > > > > > wrote: >> > > > > > > > >> >> > > > > > > > >> > Cameron, >> > > > > > > > >> > >> > > > > > > > >> > Thank you for your summary. I liked the discussion >> here. I >> > > > also >> > > > > > > liked >> > > > > > > > >> the >> > > > > > > > >> > summary of your requirement - 'single-writer-per-key, >> > > > > > > > >> > multiple-writers-per-log'. If I understand correctly, >> the >> > > core >> > > > > > > concern >> > > > > > > > >> here >> > > > > > > > >> > is almost 'exact-once' write (or a way to explicit tell >> > if a >> > > > > write >> > > > > > > can >> > > > > > > > >> be >> > > > > > > > >> > retried or not). >> > > > > > > > >> > >> > > > > > > > >> > Comments inline. >> > > > > > > > >> > >> > > > > > > > >> > On Fri, Oct 14, 2016 at 11:17 AM, Cameron Hatfield < >> > > > > > > kin...@gmail.com> >> > > > > > > > >> > wrote: >> > > > > > > > >> > >> > > > > > > > >> > > > Ah- yes good point (to be clear we're not using the >> > > proxy >> > > > > this >> > > > > > > way >> > > > > > > > >> > > today). >> > > > > > > > >> > > >> > > > > > > > >> > > > > Due to the source of the >> > > > > > > > >> > > > > data (HBase Replication), we cannot guarantee >> that a >> > > > > single >> > > > > > > > >> partition >> > > > > > > > >> > > will >> > > > > > > > >> > > > > be owned for writes by the same client. >> > > > > > > > >> > > >> > > > > > > > >> > > > Do you mean you *need* to support multiple writers >> > > issuing >> > > > > > > > >> interleaved >> > > > > > > > >> > > > writes or is it just that they might sometimes >> > > interleave >> > > > > > writes >> > > > > > > > and >> > > > > > > > >> > you >> > > > > > > > >> > > >don't care? >> > > > > > > > >> > > How HBase partitions the keys being written wouldn't >> > have >> > > a >> > > > > > > one->one >> > > > > > > > >> > > mapping with the partitions we would have in HBase. >> Even >> > > if >> > > > we >> > > > > > did >> > > > > > > > >> have >> > > > > > > > >> > > that alignment when the cluster first started, HBase >> > will >> > > > > > > rebalance >> > > > > > > > >> what >> > > > > > > > >> > > servers own what partitions, as well as split and >> merge >> > > > > > partitions >> > > > > > > > >> that >> > > > > > > > >> > > already exist, causing eventual drift from one log >> per >> > > > > > partition. >> > > > > > > > >> > > Because we want ordering guarantees per key (row in >> > > hbase), >> > > > we >> > > > > > > > >> partition >> > > > > > > > >> > > the logs by the key. Since multiple writers are >> possible >> > > per >> > > > > > range >> > > > > > > > of >> > > > > > > > >> > keys >> > > > > > > > >> > > (due to the aforementioned rebalancing / splitting / >> etc >> > > of >> > > > > > > hbase), >> > > > > > > > we >> > > > > > > > >> > > cannot use the core library due to requiring a single >> > > writer >> > > > > for >> > > > > > > > >> > ordering. >> > > > > > > > >> > > >> > > > > > > > >> > > But, for a single log, we don't really care about >> > ordering >> > > > > aside >> > > > > > > > from >> > > > > > > > >> at >> > > > > > > > >> > > the per-key level. So all we really need to be able >> to >> > > > handle >> > > > > is >> > > > > > > > >> > preventing >> > > > > > > > >> > > duplicates when a failure occurs, and ordering >> > consistency >> > > > > > across >> > > > > > > > >> > requests >> > > > > > > > >> > > from a single client. >> > > > > > > > >> > > >> > > > > > > > >> > > So our general requirements are: >> > > > > > > > >> > > Write A, Write B >> > > > > > > > >> > > Timeline: A -> B >> > > > > > > > >> > > Request B is only made after A has successfully >> returned >> > > > > > (possibly >> > > > > > > > >> after >> > > > > > > > >> > > retries) >> > > > > > > > >> > > >> > > > > > > > >> > > 1) If the write succeeds, it will be durably exposed >> to >> > > > > clients >> > > > > > > > within >> > > > > > > > >> > some >> > > > > > > > >> > > bounded time frame >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> > Guaranteed. >> > > > > > > > >> > >> > > > > > > > >> >> > > > > > > > >> > > 2) If A succeeds and B succeeds, the ordering for the >> > log >> > > > will >> > > > > > be >> > > > > > > A >> > > > > > > > >> and >> > > > > > > > >> > > then B >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> > If I understand correctly here, B is only sent after A >> is >> > > > > > returned, >> > > > > > > > >> right? >> > > > > > > > >> > If that's the case, It is guaranteed. >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > > 3) If A fails due to an error that can be relied on >> to >> > > *not* >> > > > > be >> > > > > > a >> > > > > > > > lost >> > > > > > > > >> > ack >> > > > > > > > >> > > problem, it will never be exposed to the client, so >> it >> > may >> > > > > > > > (depending >> > > > > > > > >> on >> > > > > > > > >> > > the error) be retried immediately >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> > If it is not a lost-ack problem, the entry will be >> > exposed. >> > > it >> > > > > is >> > > > > > > > >> > guaranteed. >> > > > > > > > >> >> > > > > > > > >> Let me try rephrasing the questions, to make sure I'm >> > > > > understanding >> > > > > > > > >> correctly: >> > > > > > > > >> If A fails, with an error such as "Unable to create >> > connection >> > > > to >> > > > > > > > >> bookkeeper server", that would be the type of error we >> would >> > > > > expect >> > > > > > to >> > > > > > > > be >> > > > > > > > >> able to retry immediately, as that result means no action >> > was >> > > > > taken >> > > > > > on >> > > > > > > > any >> > > > > > > > >> log / etc, so no entry could have been created. This is >> > > > different >> > > > > > > then a >> > > > > > > > >> "Connection Timeout" exception, as we just might not have >> > > > gotten a >> > > > > > > > >> response >> > > > > > > > >> in time. >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > > Gotcha. >> > > > > > > > > >> > > > > > > > > The response code returned from proxy can tell if a >> failure >> > can >> > > > be >> > > > > > > > retried >> > > > > > > > > safely or not. (We might need to make them well >> documented) >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > > 4) If A fails due to an error that could be a lost >> ack >> > > > problem >> > > > > > > > >> (network >> > > > > > > > >> > > connectivity / etc), within a bounded time frame it >> > should >> > > > be >> > > > > > > > >> possible to >> > > > > > > > >> > > find out if the write succeed or failed. Either by >> > reading >> > > > > from >> > > > > > > some >> > > > > > > > >> > > checkpoint of the log for the changes that should >> have >> > > been >> > > > > made >> > > > > > > or >> > > > > > > > >> some >> > > > > > > > >> > > other possible server-side support. >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> > If I understand this correctly, it is a duplication >> issue, >> > > > > right? >> > > > > > > > >> > >> > > > > > > > >> > Can a de-duplication solution work here? Either DL or >> your >> > > > > client >> > > > > > > does >> > > > > > > > >> the >> > > > > > > > >> > de-duplication? >> > > > > > > > >> > >> > > > > > > > >> >> > > > > > > > >> The requirements I'm mentioning are the ones needed for >> > > > > client-side >> > > > > > > > >> dedupping. Since if I can guarantee writes being exposed >> > > within >> > > > > some >> > > > > > > > time >> > > > > > > > >> frame, and I can never get into an inconsistently ordered >> > > state >> > > > > when >> > > > > > > > >> successes happen, when an error occurs, I can always wait >> > for >> > > > max >> > > > > > time >> > > > > > > > >> frame, read the latest writes, and then dedup locally >> > against >> > > > the >> > > > > > > > request >> > > > > > > > >> I >> > > > > > > > >> just made. >> > > > > > > > >> >> > > > > > > > >> The main thing about that timeframe is that its basically >> > the >> > > > > > addition >> > > > > > > > of >> > > > > > > > >> every timeout, all the way down in the system, combined >> with >> > > > > > whatever >> > > > > > > > >> flushing / caching / etc times are at the bookkeeper / >> > client >> > > > > level >> > > > > > > for >> > > > > > > > >> when values are exposed >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > Gotcha. >> > > > > > > > > >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> > >> > > > > > > > >> > Is there any ways to identify your write? >> > > > > > > > >> > >> > > > > > > > >> > I can think of a case as follow - I want to know what >> is >> > > your >> > > > > > > expected >> > > > > > > > >> > behavior from the log. >> > > > > > > > >> > >> > > > > > > > >> > a) >> > > > > > > > >> > >> > > > > > > > >> > If a hbase region server A writes a change of key K to >> the >> > > > log, >> > > > > > the >> > > > > > > > >> change >> > > > > > > > >> > is successfully made to the log; >> > > > > > > > >> > but server A is down before receiving the change. >> > > > > > > > >> > region server B took over the region that contains K, >> what >> > > > will >> > > > > B >> > > > > > > do? >> > > > > > > > >> > >> > > > > > > > >> >> > > > > > > > >> HBase writes in large chunks (WAL Logs), which its >> > replication >> > > > > > system >> > > > > > > > then >> > > > > > > > >> handles by replaying in the case of failure. If I'm in a >> > > middle >> > > > > of a >> > > > > > > > log, >> > > > > > > > >> and the whole region goes down and gets rescheduled >> > > elsewhere, I >> > > > > > will >> > > > > > > > >> start >> > > > > > > > >> back up from the beginning of the log I was in the middle >> > of. >> > > > > Using >> > > > > > > > >> checkpointing + deduping, we should be able to find out >> > where >> > > we >> > > > > > left >> > > > > > > > off >> > > > > > > > >> in the log. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > b) same as a). but server A was just network >> partitioned. >> > > will >> > > > > > both >> > > > > > > A >> > > > > > > > >> and B >> > > > > > > > >> > write the change of key K? >> > > > > > > > >> > >> > > > > > > > >> >> > > > > > > > >> HBase gives us some guarantees around network partitions >> > > > > > (Consistency >> > > > > > > > over >> > > > > > > > >> availability for HBase). HBase is a single-master >> failover >> > > > > recovery >> > > > > > > type >> > > > > > > > >> of >> > > > > > > > >> system, with zookeeper-based guarantees for single owners >> > > > > (writers) >> > > > > > > of a >> > > > > > > > >> range of data. >> > > > > > > > >> >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > > 5) If A is turned into multiple batches (one large >> > request >> > > > > gets >> > > > > > > > split >> > > > > > > > >> > into >> > > > > > > > >> > > multiple smaller ones to the bookkeeper backend, due >> to >> > > log >> > > > > > > rolling >> > > > > > > > / >> > > > > > > > >> > size >> > > > > > > > >> > > / etc): >> > > > > > > > >> > > a) The ordering of entries within batches have >> > ordering >> > > > > > > > consistence >> > > > > > > > >> > with >> > > > > > > > >> > > the original request, when exposed in the log (though >> > they >> > > > may >> > > > > > be >> > > > > > > > >> > > interleaved with other requests) >> > > > > > > > >> > > b) The ordering across batches have ordering >> > consistence >> > > > > with >> > > > > > > the >> > > > > > > > >> > > original request, when exposed in the log (though >> they >> > may >> > > > be >> > > > > > > > >> interleaved >> > > > > > > > >> > > with other requests) >> > > > > > > > >> > > c) If a batch fails, and cannot be retried / is >> > > > > unsuccessfully >> > > > > > > > >> retried, >> > > > > > > > >> > > all batches after the failed batch should not be >> exposed >> > > in >> > > > > the >> > > > > > > log. >> > > > > > > > >> > Note: >> > > > > > > > >> > > The batches before and including the failed batch, >> that >> > > > ended >> > > > > up >> > > > > > > > >> > > succeeding, can show up in the log, again within some >> > > > bounded >> > > > > > time >> > > > > > > > >> range >> > > > > > > > >> > > for reads by a client. >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> > There is a method 'writeBulk' in DistributedLogClient >> can >> > > > > achieve >> > > > > > > this >> > > > > > > > >> > guarantee. >> > > > > > > > >> > >> > > > > > > > >> > However, I am not very sure about how will you turn A >> into >> > > > > > batches. >> > > > > > > If >> > > > > > > > >> you >> > > > > > > > >> > are dividing A into batches, >> > > > > > > > >> > you can simply control the application write sequence >> to >> > > > achieve >> > > > > > the >> > > > > > > > >> > guarantee here. >> > > > > > > > >> > >> > > > > > > > >> > Can you explain more about this? >> > > > > > > > >> > >> > > > > > > > >> >> > > > > > > > >> In this case, by batches I mean what the proxy does with >> the >> > > > > single >> > > > > > > > >> request >> > > > > > > > >> that I send it. If the proxy decides it needs to turn my >> > > single >> > > > > > > request >> > > > > > > > >> into multiple batches of requests, due to log rolling, >> size >> > > > > > > limitations, >> > > > > > > > >> etc, those would be the guarantees I need to be able to >> > > > > reduplicate >> > > > > > on >> > > > > > > > the >> > > > > > > > >> client side. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > A single record written by #write and A record set (set of >> > > > records) >> > > > > > > > > written by #writeRecordSet are atomic - they will not be >> > broken >> > > > > down >> > > > > > > into >> > > > > > > > > entries (batches). With the correct response code, you >> would >> > be >> > > > > able >> > > > > > to >> > > > > > > > > tell if it is a lost-ack failure or not. However there is >> a >> > > size >> > > > > > > > limitation >> > > > > > > > > for this - it can't not go beyond 1MB for current >> > > implementation. >> > > > > > > > > >> > > > > > > > > What is your expected record size? >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > > >> > > > > > > > >> > > Since we can guarantee per-key ordering on the client >> > > side, >> > > > we >> > > > > > > > >> guarantee >> > > > > > > > >> > > that there is a single writer per-key, just not per >> log. >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > Do you need fencing guarantee in the case of network >> > > partition >> > > > > > > causing >> > > > > > > > >> > two-writers? >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > > So if there was a >> > > > > > > > >> > > way to guarantee a single write request as being >> written >> > > or >> > > > > not, >> > > > > > > > >> within a >> > > > > > > > >> > > certain time frame (since failures should be rare >> > anyways, >> > > > > this >> > > > > > is >> > > > > > > > >> fine >> > > > > > > > >> > if >> > > > > > > > >> > > it is expensive), we can then have the client >> guarantee >> > > the >> > > > > > > ordering >> > > > > > > > >> it >> > > > > > > > >> > > needs. >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> > This sounds an 'exact-once' write (regarding retries) >> > > > > requirement >> > > > > > to >> > > > > > > > me, >> > > > > > > > >> > right? >> > > > > > > > >> > >> > > > > > > > >> Yes. I'm curious of how this issue is handled by >> Manhattan, >> > > > since >> > > > > > you >> > > > > > > > can >> > > > > > > > >> imagine a data store that ends up getting multiple writes >> > for >> > > > the >> > > > > > same >> > > > > > > > put >> > > > > > > > >> / get / etc, would be harder to use, and we are basically >> > > trying >> > > > > to >> > > > > > > > create >> > > > > > > > >> a log like that for HBase. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > Are you guys replacing HBase WAL? >> > > > > > > > > >> > > > > > > > > In Manhattan case, the request will be first written to DL >> > > > streams >> > > > > by >> > > > > > > > > Manhattan coordinator. The Manhattan replica then will >> read >> > > from >> > > > > the >> > > > > > DL >> > > > > > > > > streams and apply the change. In the lost-ack case, the MH >> > > > > > coordinator >> > > > > > > > will >> > > > > > > > > just fail the request to client. >> > > > > > > > > >> > > > > > > > > My feeling here is your usage for HBase is a bit different >> > from >> > > > how >> > > > > > we >> > > > > > > > use >> > > > > > > > > DL in Manhattan. It sounds like you read from a source >> (HBase >> > > > WAL) >> > > > > > and >> > > > > > > > > write to DL. But I might be wrong. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > > Cameron: >> > > > > > > > >> > > > Another thing we've discussed but haven't really >> > thought >> > > > > > > through - >> > > > > > > > >> > > > We might be able to support some kind of epoch >> write >> > > > > request, >> > > > > > > > where >> > > > > > > > >> the >> > > > > > > > >> > > > epoch is guaranteed to have changed if the writer >> has >> > > > > changed >> > > > > > or >> > > > > > > > the >> > > > > > > > >> > > ledger >> > > > > > > > >> > > > was ever fenced off. Writes include an epoch and >> are >> > > > > rejected >> > > > > > if >> > > > > > > > the >> > > > > > > > >> > > epoch >> > > > > > > > >> > > > has changed. >> > > > > > > > >> > > > With a mechanism like this, fencing the ledger off >> > > after a >> > > > > > > failure >> > > > > > > > >> > would >> > > > > > > > >> > > > ensure any pending writes had either been written >> or >> > > would >> > > > > be >> > > > > > > > >> rejected. >> > > > > > > > >> > > >> > > > > > > > >> > > The issue would be how I guarantee the write I wrote >> to >> > > the >> > > > > > server >> > > > > > > > was >> > > > > > > > >> > > written. Since a network issue could happen on the >> send >> > of >> > > > the >> > > > > > > > >> request, >> > > > > > > > >> > or >> > > > > > > > >> > > on the receive of the success response, an epoch >> > wouldn't >> > > > tell >> > > > > > me >> > > > > > > > if I >> > > > > > > > >> > can >> > > > > > > > >> > > successfully retry, as it could be successfully >> written >> > > but >> > > > > AWS >> > > > > > > > >> dropped >> > > > > > > > >> > the >> > > > > > > > >> > > connection for the success response. Since the epoch >> > would >> > > > be >> > > > > > the >> > > > > > > > same >> > > > > > > > >> > > (same ledger), I could write duplicates. >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > > We are currently proposing adding a transaction >> > semantic >> > > > to >> > > > > dl >> > > > > > > to >> > > > > > > > >> get >> > > > > > > > >> > rid >> > > > > > > > >> > > > of the size limitation and the unaware-ness in the >> > proxy >> > > > > > client. >> > > > > > > > >> Here >> > > > > > > > >> > is >> > > > > > > > >> > > > our idea - >> > > > > > > > >> > > > http://mail-archives.apache.or >> g/mod_mbox/incubator- >> > > > > > > distributedlog >> > > > > > > > >> > > -dev/201609.mbox/%3cCAAC6BxP5Y >> yEHwG0ZCF5soh42X=xuYwYm >> > > > > > > > >> > > <http://mail-archives.apache.org/mod_mbox/incubator- >> > > > > > > > >> > distributedlog%0A-dev/201609.mbox/% >> > > 3cCAAC6BxP5YyEHwG0ZCF5soh >> > > > > > > > 42X=xuYwYm> >> > > > > > > > >> > > l4nxsybyiofzxpv...@mail.gmail.com%3e >> > > > > > > > >> > > >> > > > > > > > >> > > > I am not sure if your idea is similar as ours. but >> > we'd >> > > > like >> > > > > > to >> > > > > > > > >> > > collaborate >> > > > > > > > >> > > > with the community if anyone has the similar idea. >> > > > > > > > >> > > >> > > > > > > > >> > > Our use case would be covered by transaction support, >> > but >> > > > I'm >> > > > > > > unsure >> > > > > > > > >> if >> > > > > > > > >> > we >> > > > > > > > >> > > would need something that heavy weight for the >> > guarantees >> > > we >> > > > > > need. >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> > > >> > > > > > > > >> > > Basically, the high level requirement here is >> "Support >> > > > > > consistent >> > > > > > > > >> write >> > > > > > > > >> > > ordering for single-writer-per-key, >> > multi-writer-per-log". >> > > > My >> > > > > > > hunch >> > > > > > > > is >> > > > > > > > >> > > that, with some added guarantees to the proxy (if it >> > isn't >> > > > > > already >> > > > > > > > >> > > supported), and some custom client code on our side >> for >> > > > > removing >> > > > > > > the >> > > > > > > > >> > > entries that actually succeed to write to >> DistributedLog >> > > > from >> > > > > > the >> > > > > > > > >> request >> > > > > > > > >> > > that failed, it should be a relatively easy thing to >> > > > support. >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> > Yup. I think it should not be very difficult to >> support. >> > > There >> > > > > > might >> > > > > > > > be >> > > > > > > > >> > some changes in the server side. >> > > > > > > > >> > Let's figure out what will the changes be. Are you guys >> > > > > interested >> > > > > > > in >> > > > > > > > >> > contributing? >> > > > > > > > >> > >> > > > > > > > >> > Yes, we would be. >> > > > > > > > >> >> > > > > > > > >> As a note, the one thing that we see as an issue with the >> > > client >> > > > > > side >> > > > > > > > >> dedupping is how to bound the range of data that needs >> to be >> > > > > looked >> > > > > > at >> > > > > > > > for >> > > > > > > > >> deduplication. As you can imagine, it is pretty easy to >> > bound >> > > > the >> > > > > > > bottom >> > > > > > > > >> of >> > > > > > > > >> the range, as that it just regular checkpointing of the >> DSLN >> > > > that >> > > > > is >> > > > > > > > >> returned. I'm still not sure if there is any nice way to >> > time >> > > > > bound >> > > > > > > the >> > > > > > > > >> top >> > > > > > > > >> end of the range, especially since the proxy owns >> sequence >> > > > numbers >> > > > > > > > (which >> > > > > > > > >> makes sense). I am curious if there is more that can be >> done >> > > if >> > > > > > > > >> deduplication is on the server side. However the main >> minus >> > I >> > > > see >> > > > > of >> > > > > > > > >> server >> > > > > > > > >> side deduplication is that instead of running contingent >> on >> > > > there >> > > > > > > being >> > > > > > > > a >> > > > > > > > >> failed client request, instead it would have to run every >> > > time a >> > > > > > write >> > > > > > > > >> happens. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > For a reliable dedup, we probably need >> fence-then-getLastDLSN >> > > > > > > operation - >> > > > > > > > > so it would guarantee that any non-completed requests >> issued >> > > > > > (lost-ack >> > > > > > > > > requests) before this fence-then-getLastDLSN operation >> will >> > be >> > > > > failed >> > > > > > > and >> > > > > > > > > they will never land at the log. >> > > > > > > > > >> > > > > > > > > the pseudo code would look like below - >> > > > > > > > > >> > > > > > > > > write(request) onFailure { t => >> > > > > > > > > >> > > > > > > > > if (t is timeout exception) { >> > > > > > > > > >> > > > > > > > > DLSN lastDLSN = fenceThenGetLastDLSN() >> > > > > > > > > DLSN lastCheckpointedDLSN = ...; >> > > > > > > > > // find if the request lands between [lastDLSN, >> > > > > > lastCheckpointedDLSN]. >> > > > > > > > > // if it exists, the write succeed; otherwise retry. >> > > > > > > > > >> > > > > > > > > } >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > } >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > Just realized the idea is same as what Leigh raised in the >> > > previous >> > > > > > email >> > > > > > > > about 'epoch write'. Let me explain more about this idea >> > (Leigh, >> > > > feel >> > > > > > > free >> > > > > > > > to jump in to fill up your idea). >> > > > > > > > >> > > > > > > > - when a log stream is owned, the proxy use the last >> > transaction >> > > > id >> > > > > as >> > > > > > > the >> > > > > > > > epoch >> > > > > > > > - when a client connects (handshake with the proxy), it will >> > get >> > > > the >> > > > > > > epoch >> > > > > > > > for the stream. >> > > > > > > > - the writes issued by this client will carry the epoch to >> the >> > > > proxy. >> > > > > > > > - add a new rpc - fenceThenGetLastDLSN - it would force the >> > proxy >> > > > to >> > > > > > bump >> > > > > > > > the epoch. >> > > > > > > > - if fenceThenGetLastDLSN happened, all the outstanding >> writes >> > > with >> > > > > old >> > > > > > > > epoch will be rejected with exceptions (e.g. EpochFenced). >> > > > > > > > - The DLSN returned from fenceThenGetLastDLSN can be used as >> > the >> > > > > bound >> > > > > > > for >> > > > > > > > deduplications on failures. >> > > > > > > > >> > > > > > > > Cameron, does this sound a solution to your use case? >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> >> > > > > > > > >> Maybe something that could fit a similar need that Kafka >> > does >> > > > (the >> > > > > > > last >> > > > > > > > >> store value for a particular key in a log), such that on >> a >> > per >> > > > key >> > > > > > > basis >> > > > > > > > >> there could be a sequence number that support >> deduplication? >> > > > Cost >> > > > > > > seems >> > > > > > > > >> like it would be high however, and I'm not even sure if >> > > > bookkeeper >> > > > > > > > >> supports >> > > > > > > > >> it. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> Cheers, >> > > > > > > > >> Cameron >> > > > > > > > >> >> > > > > > > > >> > >> > > > > > > > >> > > >> > > > > > > > >> > > Thanks, >> > > > > > > > >> > > Cameron >> > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > > >> > > On Sat, Oct 8, 2016 at 7:35 AM, Leigh Stewart >> > > > > > > > >> > <lstew...@twitter.com.invalid >> > > > > > > > >> > > > >> > > > > > > > >> > > wrote: >> > > > > > > > >> > > >> > > > > > > > >> > > > Cameron: >> > > > > > > > >> > > > Another thing we've discussed but haven't really >> > thought >> > > > > > > through - >> > > > > > > > >> > > > We might be able to support some kind of epoch >> write >> > > > > request, >> > > > > > > > where >> > > > > > > > >> the >> > > > > > > > >> > > > epoch is guaranteed to have changed if the writer >> has >> > > > > changed >> > > > > > or >> > > > > > > > the >> > > > > > > > >> > > ledger >> > > > > > > > >> > > > was ever fenced off. Writes include an epoch and >> are >> > > > > rejected >> > > > > > if >> > > > > > > > the >> > > > > > > > >> > > epoch >> > > > > > > > >> > > > has changed. >> > > > > > > > >> > > > With a mechanism like this, fencing the ledger off >> > > after a >> > > > > > > failure >> > > > > > > > >> > would >> > > > > > > > >> > > > ensure any pending writes had either been written >> or >> > > would >> > > > > be >> > > > > > > > >> rejected. >> > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > > >> > > > On Sat, Oct 8, 2016 at 7:10 AM, Sijie Guo < >> > > > si...@apache.org >> > > > > > >> > > > > > > > wrote: >> > > > > > > > >> > > > >> > > > > > > > >> > > > > Cameron, >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > I think both Leigh and Xi had made a few good >> points >> > > > about >> > > > > > > your >> > > > > > > > >> > > question. >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > To add one more point to your question - "but I >> am >> > not >> > > > > > > > >> > > > > 100% of how all of the futures in the code handle >> > > > > failures. >> > > > > > > > >> > > > > If not, where in the code would be the relevant >> > places >> > > > to >> > > > > > add >> > > > > > > > the >> > > > > > > > >> > > ability >> > > > > > > > >> > > > > to do this, and would the project be interested >> in a >> > > > pull >> > > > > > > > >> request?" >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > The current proxy and client logic doesn't do >> > > perfectly >> > > > on >> > > > > > > > >> handling >> > > > > > > > >> > > > > failures (duplicates) - the strategy now is the >> > client >> > > > > will >> > > > > > > > retry >> > > > > > > > >> as >> > > > > > > > >> > > best >> > > > > > > > >> > > > > at it can before throwing exceptions to users. >> The >> > > code >> > > > > you >> > > > > > > are >> > > > > > > > >> > looking >> > > > > > > > >> > > > for >> > > > > > > > >> > > > > - it is on BKLogSegmentWriter for the proxy >> handling >> > > > > writes >> > > > > > > and >> > > > > > > > >> it is >> > > > > > > > >> > > on >> > > > > > > > >> > > > > DistributedLogClientImpl for the proxy client >> > handling >> > > > > > > responses >> > > > > > > > >> from >> > > > > > > > >> > > > > proxies. Does this help you? >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > And also, you are welcome to contribute the pull >> > > > requests. >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > - Sijie >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > On Tue, Oct 4, 2016 at 3:39 PM, Cameron Hatfield >> < >> > > > > > > > >> kin...@gmail.com> >> > > > > > > > >> > > > wrote: >> > > > > > > > >> > > > > >> > > > > > > > >> > > > > > I have a question about the Proxy Client. >> > Basically, >> > > > for >> > > > > > our >> > > > > > > > use >> > > > > > > > >> > > cases, >> > > > > > > > >> > > > > we >> > > > > > > > >> > > > > > want to guarantee ordering at the key level, >> > > > > irrespective >> > > > > > of >> > > > > > > > the >> > > > > > > > >> > > > ordering >> > > > > > > > >> > > > > > of the partition it may be assigned to as a >> whole. >> > > Due >> > > > > to >> > > > > > > the >> > > > > > > > >> > source >> > > > > > > > >> > > of >> > > > > > > > >> > > > > the >> > > > > > > > >> > > > > > data (HBase Replication), we cannot guarantee >> > that a >> > > > > > single >> > > > > > > > >> > partition >> > > > > > > > >> > > > > will >> > > > > > > > >> > > > > > be owned for writes by the same client. This >> means >> > > the >> > > > > > proxy >> > > > > > > > >> client >> > > > > > > > >> > > > works >> > > > > > > > >> > > > > > well (since we don't care which proxy owns the >> > > > partition >> > > > > > we >> > > > > > > > are >> > > > > > > > >> > > writing >> > > > > > > > >> > > > > > to). >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > However, the guarantees we need when writing a >> > batch >> > > > > > > consists >> > > > > > > > >> of: >> > > > > > > > >> > > > > > Definition of a Batch: The set of records sent >> to >> > > the >> > > > > > > > writeBatch >> > > > > > > > >> > > > endpoint >> > > > > > > > >> > > > > > on the proxy >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > 1. Batch success: If the client receives a >> success >> > > > from >> > > > > > the >> > > > > > > > >> proxy, >> > > > > > > > >> > > then >> > > > > > > > >> > > > > > that batch is successfully written >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > 2. Inter-Batch ordering : Once a batch has been >> > > > written >> > > > > > > > >> > successfully >> > > > > > > > >> > > by >> > > > > > > > >> > > > > the >> > > > > > > > >> > > > > > client, when another batch is written, it will >> be >> > > > > > guaranteed >> > > > > > > > to >> > > > > > > > >> be >> > > > > > > > >> > > > > ordered >> > > > > > > > >> > > > > > after the last batch (if it is the same >> stream). >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > 3. Intra-Batch ordering: Within a batch of >> writes, >> > > the >> > > > > > > records >> > > > > > > > >> will >> > > > > > > > >> > > be >> > > > > > > > >> > > > > > committed in order >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > 4. Intra-Batch failure ordering: If an >> individual >> > > > record >> > > > > > > fails >> > > > > > > > >> to >> > > > > > > > >> > > write >> > > > > > > > >> > > > > > within a batch, all records after that record >> will >> > > not >> > > > > be >> > > > > > > > >> written. >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > 5. Batch Commit: Guarantee that if a batch >> > returns a >> > > > > > > success, >> > > > > > > > it >> > > > > > > > >> > will >> > > > > > > > >> > > > be >> > > > > > > > >> > > > > > written >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > 6. Read-after-write: Once a batch is committed, >> > > > within a >> > > > > > > > limited >> > > > > > > > >> > > > > time-frame >> > > > > > > > >> > > > > > it will be able to be read. This is required in >> > the >> > > > case >> > > > > > of >> > > > > > > > >> > failure, >> > > > > > > > >> > > so >> > > > > > > > >> > > > > > that the client can see what actually got >> > > committed. I >> > > > > > > believe >> > > > > > > > >> the >> > > > > > > > >> > > > > > time-frame part could be removed if the client >> can >> > > > send >> > > > > in >> > > > > > > the >> > > > > > > > >> same >> > > > > > > > >> > > > > > sequence number that was written previously, >> since >> > > it >> > > > > > would >> > > > > > > > then >> > > > > > > > >> > fail >> > > > > > > > >> > > > and >> > > > > > > > >> > > > > > we would know that a read needs to occur. >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > So, my basic question is if this is currently >> > > possible >> > > > > in >> > > > > > > the >> > > > > > > > >> > proxy? >> > > > > > > > >> > > I >> > > > > > > > >> > > > > > don't believe it gives these guarantees as it >> > stands >> > > > > > today, >> > > > > > > > but >> > > > > > > > >> I >> > > > > > > > >> > am >> > > > > > > > >> > > > not >> > > > > > > > >> > > > > > 100% of how all of the futures in the code >> handle >> > > > > > failures. >> > > > > > > > >> > > > > > If not, where in the code would be the relevant >> > > places >> > > > > to >> > > > > > > add >> > > > > > > > >> the >> > > > > > > > >> > > > ability >> > > > > > > > >> > > > > > to do this, and would the project be interested >> > in a >> > > > > pull >> > > > > > > > >> request? >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > Thanks, >> > > > > > > > >> > > > > > Cameron >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > >> > > > > > > > >> > > > >> > > > > > > > >> > > >> > > > > > > > >> > >> > > > > > > > >> >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >