It could work as you have described, but indeed only for one "local-only"
transaction. But that's not the case for me. It even wouldn't work with two
Kafka producers. If I could get away with just one single Kafka Producer I
wouldn't  probably need distributed system to solve such use case.

Maybe we are making a digression here, but as I have stated earlier, in
most of the other distributed systems, "pre-commit" involves only flushing
the data to temporary files, and actual "commit" phase only moves the
temporary files to target directory. And you need to flush the data
regardless whether you want transactions or not. Moving files in commit
phase also looks the same in two phase commits as it looks in single phase
commits. Thus implementing those "pre-commit" and "commit" phases properly
shouldn't add any additional overhead.

By the way, I don't want to implement fully atomic distributed
transactions, but I just would like to be able to achieve exactly-once
semantic for events processed in my system. Without those two phase
commits, I can not efficiently (without buffering and serializing all the
events in some permanent storage before writing it to target Kafka's topic)
achieve that with two Kafka Producers.

Thanks again for your responses! :)

Piotrek



2017-06-19 16:15 GMT+02:00 Michal Borowiecki <michal.borowie...@openbet.com>
:

> I'm not sure if I understood correctly, but if you want to integrate a
> single kafka producer transaction (or any transaction manager that only
> supports local transaction) into a distributed transaction, I think you can
> do so as long as all other involved transaction managers support 2-phase
> commit. In other words, you can include one (and only one) local-only
> transaction into a distributed transaction.
>
> The steps the distributed transaction coordinator would have to take to
> commit the transaction would be:
>
>    1. call prepare on each of the transaction participants that support
>    2-phase commit
>    2. if any of them fails, abort all transactions, otherwise, proceed
>    3. call commit on the one transaction participant that does not
>    support 2-phase commit (kafka producer in this case)
>    4. if that fails, abort all transactions, otherwise, proceed
>    5. call commit on all the transaction participants that support
>    2-phase commit (since prepare on these succeeded they should not refuse to
>    commit at this point)
>
> So as to your concern about getting "clearance" (I take it as the
> equivalent of the "prepare" call) from the kafka producer, you don't really
> need it IMO, as if commit fails on the kafka producer, you can still abort
> the remaining transactions.
>
> Of course you can't do that if you have more than one transaction that
> doesn't support 2-phase commit in play.
>
> Having said that, the advice these days seems to be to design distributed
> systems for eventual consistency, as using distributed transactions, while
> tempting, often leads to resource exhaustion as transaction managers have
> to go the extra mile to ensure they can commit any transaction that had
> prepare return successfully.
>
> Just my 5c. I may be wrong in any of the above, please point it out if so.
>
> Cheers,
>
> Michał
>
> On 19/06/17 14:57, Piotr Nowojski wrote:
>
> Sorry for responding to my own message, but when I sent an original
> message/question I was not subscribed to this mailing list and now I can
> not respond to Matthias answer directly.
>
> I don't want to share a transaction between multiple Producers
> threads/processes, I just would like to resume an interrupted transaction
> after a machine crash.
>
> Let me try to phrase the problem differently:
>
> From the perspective of a producer that writes to Kafka, we have the
> following situation:
>
> We integrate the producer with transaction in another system. A number or
> records should go together atomically (a transaction). Before committing
> the transaction, we frequently need to ask for a "clearance" status, and if
> we get the "go ahead" we want to commit the transaction.
>
> Unfortunately, as soon as we get that "clearance", we cannot reproduce the
> records any more (the are dropped from the original data stream).
> If something fails between the "go ahead" and the committing, we need to
> retry the transaction, so we need to come up again with all records. As a
> result we have to persist the records before we start the write
> transaction. That is a painful overhead to pay, and a lot of additional
> operational complexity.
>
> The simplest way to support that pattern without extra overhead would we
> could "resume" a transaction:
>
>   - Each transaction as a unique Transaction ID
>   - If a crash of the producer occurs, the transaction is NOT aborted
> automatically.
>   - Instead, the restarted producer process reconnects to the transaction
> and decides to commit it or abort it.
>   - The transaction timeout aborts the transaction after a while if
> inactivity.
>
> Maybe this could be easily supported?
>
> Thanks, Piotrek
>
> 2017-06-16 17:59 GMT+02:00 Piotr Nowojski <piotr.nowoj...@gmail.com>:
>
>> But isn't it a low hanging fruit at this moment? Isn't that just an API
>> limitation and wouldn't the backend for transactions support it with only
>> minor changes to the API (do not fail automatically dangling transactions
>> on Producer restart)? Flushing is already there so that _should_ handle the
>> pre-commit. Again, maybe I'm missing something and for sure I am not
>> familiar with Kafka's internals.
>>
>> Piotrek
>>
>> 2017-06-16 15:47 GMT+02:00 Michal Borowiecki <
>> michal.borowie...@openbet.com>:
>>
>>> I don't think KIP-98 is as ambitious as to provide support for
>>> distributed transactions (2 phase commit).
>>>
>>> It would be great if I was wrong though :P
>>>
>>> Cheers,
>>>
>>> Michał
>>>
>>> On 16/06/17 14:21, Piotr Nowojski wrote:
>>>
>>> Hi,
>>>
>>> I'm looking into Kafka's transactions API as proposed in KIP-98. I've read
>>> both this KIP-98 document and I looked into the code that is on the master
>>> branch. I would like to use it to implement some two phase commit mechanism
>>> on top of the Kafka's transactions, that would allow me to tie multiple
>>> systems (some of them might not be Kafka) in one transaction.
>>>
>>> Maybe I'm missing something but the problem is I don't see a way to
>>> implement it using proposed Kafka's transactions API. Even if I have just
>>> two processes writing to Kafka topics, I don't know how can I guarantee
>>> that if one's transaction is committed, the other will also eventually be
>>> committed. This is because if first KafkaProducer successfully commits, but
>>> the second one fails before committing it's data, after restart the second
>>> one's "initTransactions" call will (according to my understanding of the
>>> API) abort previously non completed transactions.
>>>
>>> Usually transactional systems expose API like 
>>> this<http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>
>>>  
>>> <http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>.
>>> Namely there is a known identifier for a transaction and you can pre-commit
>>> it (void prepare(...) method in before mentioned example) and then commit
>>> or you can abort this transaction. Usually pre-commit involves flushing
>>> stuff to some temporary files and commit move those files to the final
>>> directory. In case of machine/process failure, if it was before
>>> "pre-commit", we can just rollback all transactions from all of the
>>> processes. However once every process acknowledge that it completed
>>> "pre-commit", each process should call "commit". If some process fails at
>>> that stage, after restarting this process, I would expect to be able to
>>> restore it's "pre-committed" transaction (having remembered transaction's
>>> id) and re attempt to commit it - which should be guaranteed to eventually
>>> succeed.
>>>
>>> In other words, it seems to me like the missing features of this API for me
>>> are:
>>> 1. possibility to resume transactions after machine/process crash. At least
>>> I would expect to be able to commit "flushed"/"pre-committed" data for such
>>> transactions.
>>> 2. making sure that committing already committed transactions doesn't brake
>>> anything
>>>
>>> Or maybe there is some other way to integrate Kafka into such two phase
>>> commit system that I'm missing?
>>>
>>> Thanks, Piotrek
>>>
>>>
>>>
>>> --
>>> <http://www.openbet.com/> Michal Borowiecki
>>> Senior Software Engineer L4
>>> T: +44 208 742 1600 <+44%2020%208742%201600>
>>>
>>>
>>> +44 203 249 8448 <+44%2020%203249%208448>
>>>
>>>
>>>
>>> E: michal.borowie...@openbet.com
>>> W: www.openbet.com
>>> OpenBet Ltd
>>>
>>> Chiswick Park Building 9
>>>
>>> 566 Chiswick High Rd
>>>
>>> London
>>>
>>> W4 5XT
>>>
>>> UK
>>> <https://www.openbet.com/email_promo>
>>> This message is confidential and intended only for the addressee. If you
>>> have received this message in error, please immediately notify the
>>> postmas...@openbet.com and delete it from your system as well as any
>>> copies. The content of e-mails as well as traffic data may be monitored by
>>> OpenBet for employment and security purposes. To protect the environment
>>> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
>>> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
>>> United Kingdom. A company registered in England and Wales. Registered no.
>>> 3134634. VAT no. GB927523612
>>>
>>
>>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowie...@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Reply via email to