[ 
https://issues.apache.org/jira/browse/KAFKA-6080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341287#comment-16341287
 ] 

Randall Hauch commented on KAFKA-6080:
--------------------------------------

Here are some initial thoughts.

First, we want source connector's tasks to be able to optionally define the 
transaction boundaries. Since tasks are not able to coordinate or collaborate 
directly (they may be in different processes), that implies that tasks cannot 
collaborate on a single transaction.

Second, a source task can't call a separate API to demarcate the transaction 
boundaries, since the source task is being asked to returning messages via the 
poll method. So, the transaction demarcation needs to be expressable within the 
sequence of {{SourceRecord}} objects that are returned. Doing this will also 
allow a single call to {{poll()}} return messages in multiple transactions. 
However, we may want to _recommend_ that when possible (or require?) tasks 
always begin and commit/rollback a transaction within the same set of records 
returned by a single call to {{poll()}}. Obviously this won't be a requirement, 
but Connect does not currently place any guarantees on the timing of subsequent 
calls to {{poll()}}.

We could add to the Kafka Connect API three new subtypes of {{SourceRecord}} so 
that source connectors can explicitly control the boundary of these EOS 
transactions:
 * {{BeginTransaction}} represents the beginning of a transaction in the source 
system
 * {{CommitTransaction}} represents the successful completion of a transaction 
in the source system
 * {{RollbackTransaction}} represents the cancellation of a transaction in the 
source system

These record types would include the source partition and source offset, but 
would have a null value for the key, key schema, value, value schema, topic, 
and partition number fields of {{SourceRecord}}.

Source connectors include these records with normal {{SourceRecord}} objects 
and return them in the {{List<SourceRecord>}} results. Each transaction is 
specified with a {{BeginTransaction}}, followed by one or more normal records, 
and ending with either {{CommitTransaction}} or {{RollbackTransaction}}. A 
transaction sequence with no source records between a {{BeginTransaction}} and 
{{CommitTransaction}} would simply update the source offsets for the source 
partition, allowing the connector to record it has made progress without 
producing new records (see KAFKA-3821). And any transaction sequence that ends 
with a {{RollbackTransaction}} would correspond to an aborted EOS transaction.

Any {{SourceRecord}} objects outside of an explicit transaction sequence will 
be also written using Kafka's EOS feature, although the framework would be free 
to determine the EOS transaction boundaries.

Some things we still need to consider:

* Currently each source task is given its own {{Producer}} instance, so that 
should eliminate cross-talk between the transaction boundaries from different 
tasks writing to the same topics.
* Should Connect always use EOS transactions, even when the source 
connector/task does not use them?
* Should offsets be committed within the same EOS transactions? If so, what 
happens when transactions are rolled back? Any offset committing during that 
transaction would also be rolled back, but is that actually what we want?
* Should connectors or tasks be required to declare that they may use 
transactions? If so, how?

> Transactional EoS for source connectors
> ---------------------------------------
>
>                 Key: KAFKA-6080
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6080
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Antony Stubbs
>            Priority: Major
>              Labels: needs-kip
>
> Exactly once (eos) message production for source connectors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to