[ https://issues.apache.org/jira/browse/KAFKA-6080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341287#comment-16341287 ]
Randall Hauch commented on KAFKA-6080: -------------------------------------- Here are some initial thoughts. First, we want source connector's tasks to be able to optionally define the transaction boundaries. Since tasks are not able to coordinate or collaborate directly (they may be in different processes), that implies that tasks cannot collaborate on a single transaction. Second, a source task can't call a separate API to demarcate the transaction boundaries, since the source task is being asked to returning messages via the poll method. So, the transaction demarcation needs to be expressable within the sequence of {{SourceRecord}} objects that are returned. Doing this will also allow a single call to {{poll()}} return messages in multiple transactions. However, we may want to _recommend_ that when possible (or require?) tasks always begin and commit/rollback a transaction within the same set of records returned by a single call to {{poll()}}. Obviously this won't be a requirement, but Connect does not currently place any guarantees on the timing of subsequent calls to {{poll()}}. We could add to the Kafka Connect API three new subtypes of {{SourceRecord}} so that source connectors can explicitly control the boundary of these EOS transactions: * {{BeginTransaction}} represents the beginning of a transaction in the source system * {{CommitTransaction}} represents the successful completion of a transaction in the source system * {{RollbackTransaction}} represents the cancellation of a transaction in the source system These record types would include the source partition and source offset, but would have a null value for the key, key schema, value, value schema, topic, and partition number fields of {{SourceRecord}}. Source connectors include these records with normal {{SourceRecord}} objects and return them in the {{List<SourceRecord>}} results. Each transaction is specified with a {{BeginTransaction}}, followed by one or more normal records, and ending with either {{CommitTransaction}} or {{RollbackTransaction}}. A transaction sequence with no source records between a {{BeginTransaction}} and {{CommitTransaction}} would simply update the source offsets for the source partition, allowing the connector to record it has made progress without producing new records (see KAFKA-3821). And any transaction sequence that ends with a {{RollbackTransaction}} would correspond to an aborted EOS transaction. Any {{SourceRecord}} objects outside of an explicit transaction sequence will be also written using Kafka's EOS feature, although the framework would be free to determine the EOS transaction boundaries. Some things we still need to consider: * Currently each source task is given its own {{Producer}} instance, so that should eliminate cross-talk between the transaction boundaries from different tasks writing to the same topics. * Should Connect always use EOS transactions, even when the source connector/task does not use them? * Should offsets be committed within the same EOS transactions? If so, what happens when transactions are rolled back? Any offset committing during that transaction would also be rolled back, but is that actually what we want? * Should connectors or tasks be required to declare that they may use transactions? If so, how? > Transactional EoS for source connectors > --------------------------------------- > > Key: KAFKA-6080 > URL: https://issues.apache.org/jira/browse/KAFKA-6080 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Antony Stubbs > Priority: Major > Labels: needs-kip > > Exactly once (eos) message production for source connectors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)