Xi, The "stream level" transaction makes more sense to me. It is an extension of 'atomic write' records over the size limitation. I can see the value of having such ability.
But I am not convinced by the namespace level transaction. Do you have any concrente use cases that you can talk about more? - KN On Mon, Sep 12, 2016 at 5:20 PM, Xi Liu <xi.liu....@gmail.com> wrote: > Hello, > > I asked the transaction support in distributedlog user group two months > ago. I want to raise this up again, as we are looking for using > distributedlog for building a transactional data service. It is a major > feature that is missing in distributedlog. We have some ideas to add this > to distributedlog and want to know if they make sense or not. If they are > good, we'd like to contribute and develop with the community. > > Here are the thoughts: > > ------------------------------------------------- > > From our understanding, DL can provide "at-least-once" delivery semantic > (if not, please correct me) but not "exactly-once" delivery semantic. That > means that a message can be delivered one or more times if the reader > doesn't handle duplicates. > > The duplicates come from two places, one is at writer side (this assumes > using write proxy not the core library), while the other one is at reader > side. > > - writer side: if the client attempts to write a record to the write > proxies and gets a network error (e.g timeouts) then retries, the retrying > will potentially result in duplicates. > - reader side:if the reader reads a message from a stream and then crashes, > when the reader restarts it would restart from last known position (DLSN). > If the reader fails after processing a record and before recording the > position, the processed record will be delivered again. > > The reader problem can be properly addressed by making use of the sequence > numbers of records and doing proper checkpointing. For example, in > database, it can checkpoint the indexed data with the sequence number of > records; in flink, it can checkpoint the state with the sequence numbers. > > The writer problem can be addressed by implementing an idempotent writer. > However, an alternative and more powerful approach is to support > transactions. > > *What does transaction mean?* > > A transaction means a collection of records can be written transactionally > within a stream or across multiple streams. They will be consumed by the > reader together when a transaction is committed, or will never be consumed > by the reader when the transaction is aborted. > > The transaction will expose following guarantees: > > - The reader should not be exposed to records written from uncommitted > transactions (mandatory) > - The reader should consume the records in the transaction commit order > rather than the record written order (mandatory) > - No duplicated records within a transaction (mandatory) > - Allow interleaving transactional writes and non-transactional writes > (optional) > > *Stream Transaction & Namespace Transaction* > > There will be two types of transaction, one is Stream level transaction > (local transaction), while the other one is Namespace level transaction > (global transaction). > > The stream level transaction is a transactional operation on writing > records to one stream; the namespace level transaction is a transactional > operation on writing records to multiple streams. > > *Implementation Thoughts* > > - A transaction is consist of begin control record, a series of data > records and commit/abort control record. > - The begin/commit/abort control record is written to a `commit` log > stream, while the data records will be written to normal data log streams. > - The `commit` log stream will be the same log stream for stream-level > transaction, while it will be a *system* stream (or multiple system > streams) for namespace-level transactions. > - The transaction code looks like as below: > > <code> > > Transaction txn = client.transaction(); > Future<DLSN> result1 = txn.write(stream-0, record); > Future<DLSN> result2 = txn.write(stream-1, record); > Future<DLSN> result3 = txn.write(stream-2, record); > Future<Pair<DLSN, DLSN>> result = txn.commit(); > > </code> > > if the txn is committed, all the write futures will be satisfied with their > written DLSNs. if the txn is aborted, all the write futures will be failed > together. there is no partial failure state. > > - The actually data flow will be: > > 1. writer get a transaction id from the owner of the `commit' log stream > 1. write the begin control record (synchronously) with the transaction id > 2. for each write within the same txn, it will be assigned a local sequence > number starting from 0. the combination of transaction id and local > sequence number will be used later on by the readers to de-duplicate > records. > 3. the commit/abort control record will be written based on the results > from 2. > > - Application can supply a timeout for the transaction when #begin() a > transaction. The owner of the `commit` log stream can abort transactions > that never be committed/aborted within their timeout. > > - Failures: > > * all the log records can be simply retried as they will be de-duplicated > probably at the reader side. > > - Reader: > > * Reader can be configured to read uncommitted records or committed records > only (by default read uncommitted records) > * If reader is configured to read committed records only, the read ahead > cache will be changed to maintain one additional pending committed records. > the pending committed records map is bounded and records will be dropped > when read ahead is moving. > * when the reader hits a commit record, it will rewind to the begin record > and start reading from there. leveraging the proper read ahead cache and > pending commit records cache, it would be good for both short transactions > and long transactions. > > - DLSN, SequenceId: > > * We will add a fourth field to DLSN. It is `local sequence number` within > a transaction session. So the new DLSN of records in a transaction will be > the DLSN of commit control record plus its local sequence number. > * The sequence id will be still the position of the commit record plus its > local sequence number. The position will be advanced with total number of > written records on writing the commit control record. > > - Transaction Group & Namespace Transaction > > using one single log stream for namespace transaction can cause the > bottleneck problem since all the begin/commit/end control records will have > to go through one log stream. > > the idea of 'transaction group' is to allow partitioning the writers into > different transaction groups. > > clients can specify the `group-name` when starting the transaction. if > there is no `group-name` specified, it will use the default `commit` log in > the namespace for creating transactions. > > ------------------------------------------------- > > I'd like to collect feedbacks on this idea. Appreciate any comments and if > anyone is also interested in this idea, we'd like to collaborate with the > community. > > > - Xi >