Hello,

I asked the transaction support in distributedlog user group two months
ago. I want to raise this up again, as we are looking for using
distributedlog for building a transactional data service. It is a major
feature that is missing in distributedlog. We have some ideas to add this
to distributedlog and want to know if they make sense or not. If they are
good, we'd like to contribute and develop with the community.

Here are the thoughts:

-------------------------------------------------

>From our understanding, DL can provide "at-least-once" delivery semantic
(if not, please correct me) but not "exactly-once" delivery semantic. That
means that a message can be delivered one or more times if the reader
doesn't handle duplicates.

The duplicates come from two places, one is at writer side (this assumes
using write proxy not the core library), while the other one is at reader
side.

- writer side: if the client attempts to write a record to the write
proxies and gets a network error (e.g timeouts) then retries, the retrying
will potentially result in duplicates.
- reader side:if the reader reads a message from a stream and then crashes,
when the reader restarts it would restart from last known position (DLSN).
If the reader fails after processing a record and before recording the
position, the processed record will be delivered again.

The reader problem can be properly addressed by making use of the sequence
numbers of records and doing proper checkpointing. For example, in
database, it can checkpoint the indexed data with the sequence number of
records; in flink, it can checkpoint the state with the sequence numbers.

The writer problem can be addressed by implementing an idempotent writer.
However, an alternative and more powerful approach is to support
transactions.

*What does transaction mean?*

A transaction means a collection of records can be written transactionally
within a stream or across multiple streams. They will be consumed by the
reader together when a transaction is committed, or will never be consumed
by the reader when the transaction is aborted.

The transaction will expose following guarantees:

- The reader should not be exposed to records written from uncommitted
transactions (mandatory)
- The reader should consume the records in the transaction commit order
rather than the record written order (mandatory)
- No duplicated records within a transaction (mandatory)
- Allow interleaving transactional writes and non-transactional writes
(optional)

*Stream Transaction & Namespace Transaction*

There will be two types of transaction, one is Stream level transaction
(local transaction), while the other one is Namespace level transaction
(global transaction).

The stream level transaction is a transactional operation on writing
records to one stream; the namespace level transaction is a transactional
operation on writing records to multiple streams.

*Implementation Thoughts*

- A transaction is consist of begin control record, a series of data
records and commit/abort control record.
- The begin/commit/abort control record is written to a `commit` log
stream, while the data records will be written to normal data log streams.
- The `commit` log stream will be the same log stream for stream-level
transaction,  while it will be a *system* stream (or multiple system
streams) for namespace-level transactions.
- The transaction code looks like as below:

<code>

Transaction txn = client.transaction();
Future<DLSN> result1 = txn.write(stream-0, record);
Future<DLSN> result2 = txn.write(stream-1, record);
Future<DLSN> result3 = txn.write(stream-2, record);
Future<Pair<DLSN, DLSN>> result = txn.commit();

</code>

if the txn is committed, all the write futures will be satisfied with their
written DLSNs. if the txn is aborted, all the write futures will be failed
together. there is no partial failure state.

- The actually data flow will be:

1. writer get a transaction id from the owner of the `commit' log stream
1. write the begin control record (synchronously) with the transaction id
2. for each write within the same txn, it will be assigned a local sequence
number starting from 0. the combination of transaction id and local
sequence number will be used later on by the readers to de-duplicate
records.
3. the commit/abort control record will be written based on the results
from 2.

- Application can supply a timeout for the transaction when #begin() a
transaction. The owner of the `commit` log stream can abort transactions
that never be committed/aborted within their timeout.

- Failures:

* all the log records can be simply retried as they will be de-duplicated
probably at the reader side.

- Reader:

* Reader can be configured to read uncommitted records or committed records
only (by default read uncommitted records)
* If reader is configured to read committed records only, the read ahead
cache will be changed to maintain one additional pending committed records.
the pending committed records map is bounded and records will be dropped
when read ahead is moving.
* when the reader hits a commit record, it will rewind to the begin record
and start reading from there. leveraging the proper read ahead cache and
pending commit records cache, it would be good for both short transactions
and long transactions.

- DLSN, SequenceId:

* We will add a fourth field to DLSN. It is `local sequence number` within
a transaction session. So the new DLSN of records in a transaction will be
the DLSN of commit control record plus its local sequence number.
* The sequence id will be still the position of the commit record plus its
local sequence number. The position will be advanced with total number of
written records on writing the commit control record.

- Transaction Group & Namespace Transaction

using one single log stream for namespace transaction can cause the
bottleneck problem since all the begin/commit/end control records will have
to go through one log stream.

the idea of 'transaction group' is to allow partitioning the writers into
different transaction groups.

clients can specify the `group-name` when starting the transaction. if
there is no `group-name` specified, it will use the default `commit` log in
the namespace for creating transactions.

-------------------------------------------------

I'd like to collect feedbacks on this idea. Appreciate any comments and if
anyone is also interested in this idea, we'd like to collaborate with the
community.


- Xi

Reply via email to